import json
import glob
import os
import numpy as np
import pandas as pd
from .transform import dict_to_array
from itertools import combinations_with_replacement
import multiprocessing as mp
import copy
# sqrt(2) with default precision np.float64
_SQRT2 = np.sqrt(2)
class Data:
"""
Dummy class in order to store into the dataframe.
"""
def __init__(self, columns, shape):
self.columns = columns
self.index = columns
self.dict_ = {c_1: {c_2: [] for c_2 in columns}
for c_1 in columns}
self.shape = shape
def loc(self, pos_x, pos_y, vector):
"""
:param pos_x:
:param pos_y:
:param vector:
:return:
"""
self.dict_[pos_x][pos_y] = vector
self.dict_[pos_y][pos_x] = self.dict_[pos_x][pos_y]
def copy(self, deep=True):
"""
:param deep:
:return:
"""
if deep is True:
return copy.deepcopy(self)
else:
return copy.copy(self)
def to_df(self):
"""
Export data as a pandas.DataFrame.
:return:
"""
if self.shape > 1:
range_str = [s for s in range(self.shape)]
iterables = [self.columns, range_str]
multiindex = pd.MultiIndex.from_product(iterables, names=['song', 'frame'])
# multiindex = [i for i in itertools.product(self.columns, range_str, repeat=1)]
df = pd.DataFrame(columns=multiindex, index=self.columns, dtype=np.float64)
for c_1 in self.columns:
for c_2 in self.columns:
for s in range_str:
df.loc[c_1][c_2, s] = self.dict_[c_1][c_2][int(s)]
else:
df = pd.DataFrame(columns=self.columns + ['song'], dtype=np.float64)
df['song'] = self.columns
df = df.set_index('song')
for c_1 in self.columns:
for c_2 in self.columns:
df.loc[c_1, c_2] = self.max_diff(c_1, c_2)
return df.T
def to_json(self):
"""
Export data as a JSON.
:return:
"""
return None
def min_diff(self, song_x, song_y):
"""
:param song_x:
:param song_y:
:return:
"""
array = self.dict_[song_x][song_y]
return np.min(array)
def max_diff(self, song_x, song_y):
"""
:param song_x:
:param song_y:
:return:
"""
array = self.dict_[song_x][song_y]
return np.max(array)
def pos_diff(self, song_x, song_y, pos):
"""
:param song_x:
:param song_y:
:param pos:
:return:
"""
array = self.dict_[song_x][song_y]
return array[pos]
# DISTANCE METRICS
def positive_error(x, y):
"""
:param np.array x:
:param np.array y:
:return:
"""
return np.sum(np.abs(x - y))
def hellinger(x, y):
"""
:param np.array x:
:param np.array y:
:return:
"""
return np.linalg.norm(np.sqrt(x) / np.sum(x) -
np.sqrt(y) / np.sum(y)) / _SQRT2
def l2_norm(x, y):
"""
L2 norm, adapted to dtw format
:param x:
:param y:
:return: euclidean norm
"""
return np.linalg.norm(x - y)
def integrate(x, y):
"""
:param x:
:param y:
:return:
"""
diff = np.abs(x - y)
return np.trapz(diff)
distance_dict = {'positive': positive_error,
'hellinger': hellinger,
'l2_norm': l2_norm,
'integrate': integrate}
def warp_distance(distance_metric, x, y, warp=200):
"""
DEPRECATED. Calculate the minimum distance among
x and y arrays after warping.
:param str distance_metric:
:param np.array x:
:param np.array y:
:param int warp:
:return:
"""
# Selecting the array
distance_func = distance_dict[distance_metric]
# Copying the value
x_copy = copy.deepcopy(x)
y_copy = copy.deepcopy(y)
# Starting the warping
min_diff = distance_func(x, y)
for i in range(1, int(warp)):
# Moving forward
forward_diff = distance_func(x_copy[i:], y_copy[:-i])
if forward_diff < min_diff:
min_diff = forward_diff
# Moving backward
backward_diff = distance_func(x_copy[:-i], y_copy[i:])
if backward_diff < forward_diff:
min_diff = backward_diff
return min_diff
[docs]def pair_distance(freq_x,
features_x,
freq_y,
features_y,
frames=None,
distance_metric='l2_norm'):
"""
Distance between song x (with frequencies and features)
and song y is calculated.
:param numpy.array freq_x: frequencies of the song x.
:param numpy.array features_x: features (fourier amplitude) of song x.
:param numpy.array freq_y: frequencies of the song y.
:param numpy.array features_y: features (fourier amplitude) of song y.
:param frames: number of frames to calculate distances. If None,
only one frame is considered
:param str distance_metric: name of the metric to use. Options are:
- 'positive': positive_error.
- 'hellinger': hellinger.
- 'l2_norm': l2_norm.
- 'integrate': integrate.
:return: distance in float.
"""
if frames is None:
frames = 1
freq_x_frames = np.array_split(freq_x, frames)
features_x_frames = np.array_split(features_x, frames)
distance_array = np.empty(frames)
for frame in range(frames):
# Get the frames
freq_x_frame = freq_x_frames[frame]
features_x_frame = features_x_frames[frame]
# Interpolate to get features from song y
features_y_frame = np.interp(freq_x_frame,
freq_y,
features_y)
distance = distance_dict[distance_metric](features_x_frame,
features_y_frame)
distance_array[frame] = distance # / np.max(features_x_frame)
return distance_array
[docs]def distance_matrix(fourier_folder: str,
multiprocess: bool = False,
frames: int =1,
distance_metric: str ='l2_norm'):
"""
A distance matrix with all the songs of a folder
can be calculated.
:param fourier_folder:
:param int frames:
:param distance_metric:
:param bool multiprocess:
:param str distance_metric:
:return:
"""
merged_file = os.path.join(fourier_folder, 'merged_file.json')
if os.path.isfile(merged_file):
os.remove(merged_file)
read_files = glob.glob(os.path.join(fourier_folder, '*.json'))
merged_file_list = [json.load(open(f)) for f in read_files]
merged_file = merged_file_list[0]
[merged_file.update(d) for d in merged_file_list]
# Creating a squared DataFrame as matrix distance
song_names = list(merged_file.keys())
data = Data(columns=song_names, shape=frames)
if multiprocess is True:
ff_dict = {}
for song_name in song_names:
freq, features = dict_to_array(merged_file[song_name])
ff_dict.update({song_name: {'freq': freq, 'features': features}})
mgr = mp.Manager()
ns = mgr.Namespace()
ns.distance_metric = distance_metric
ns.ff_dict = ff_dict
ns.frames = frames
# Distances are saved in a shared dict
shared_dict = mgr.dict()
for song_name in song_names:
shared_dict[song_name] = mgr.dict()
ns.dict = shared_dict
# Args must be in list
song_names_tuple = [comb for comb in combinations_with_replacement(song_names, r=2)]
args_to_mp = [(names[0], names[1], ns) for names in song_names_tuple]
with mp.Pool(processes=max(mp.cpu_count() - 1, 1)) as p:
p.starmap(multiprocess_matrix, args_to_mp)
# Retrieve the information and save into the dataframe
for k_1 in ns.dict.keys():
for k_2 in ns.dict.keys():
data.loc(k_1, k_2, ns.dict[k_1][k_2])
else:
for i in range(len(song_names)):
for j in range(i, len(song_names)):
song_x = song_names[i]
if j > i:
# Song_x
freq_x, features_x = dict_to_array(merged_file[song_x])
song_y = song_names[j]
freq_y, features_y = dict_to_array(merged_file[song_y])
distance = pair_distance(freq_x=freq_x,
features_x=features_x,
freq_y=freq_y,
features_y=features_y,
frames=frames,
distance_metric=distance_metric)
data.loc(song_x, song_y, distance)
# Save also in reverse
data.loc(song_y, song_x, distance)
else:
data.loc(song_x, song_x, np.zeros(frames))
# df = data.unpack()
# return df
return data
def multiprocess_matrix(song_x, song_y, ns):
"""
:param song_x:
:param song_y:
:param ns: Namespace.
:return:
"""
if song_x == song_y:
ns.dict[song_x][song_x] = 0.0
else:
# Song_x
freq_x = ns.ff_dict[song_x]['freq']
features_x = ns.ff_dict[song_x]['features']
# Song_y
freq_y = ns.ff_dict[song_y]['freq']
features_y = ns.ff_dict[song_y]['features']
# Distance
distance = pair_distance(freq_x=freq_x,
features_x=features_x,
freq_y=freq_y,
features_y=features_y,
frames=ns.frames,
distance_metric=ns.distance_metric)
ns.dict[song_x][song_y] = distance
# Save also in reverse
ns.dict[song_y][song_x] = distance