Source code for foucluster.cluster

from sklearn import cluster
from sklearn.preprocessing import minmax_scale
from scipy.spatial.distance import cdist
import pandas as pd
import numpy as np
from itertools import groupby

eps = 10**(-10)

n_cluster_methods = {'AgglomerativeClustering': cluster.AgglomerativeClustering,
                     'SpectralClustering': cluster.SpectralClustering,
                     'KMeans': cluster.KMeans}

non_n_cluster_methods = {'AffinityPropagation': cluster.AffinityPropagation,
                         'MeanShift': cluster.MeanShift}

cluster_methods = n_cluster_methods.copy()

[docs]def determinist_cluster(dist_df, method, n_clusters): """ Clustering of the songs from the dataframe, indicating the number of clusters to use. :param pandas.DataFrame dist_df: :param str method: name of the sklearn.cluster. - cluster.AgglomerativeClustering. - cluster.SpectralClustering. - cluster.KMeans. :param int n_clusters: :return: pandas.DataFrame with a column with clusters. """ if not isinstance(dist_df, pd.DataFrame): dist_df = dist_df.to_df().T df_matrix = minmax_scale(dist_df) y = n_cluster_methods[method](n_clusters=n_clusters).fit_predict(df_matrix) cluster_df = pd.DataFrame(df_matrix, index=dist_df.index, columns=dist_df.columns) cluster_df['Cluster'] = pd.Series(y, index=cluster_df.index) return cluster_df
[docs]def automatic_cluster(dist_df, method): """ :param pd.DataFrame dist_df: :param str method: name of the sklearn.cluster. - cluster.AffinityPropagation. - cluster.MeanShift. - cluster.AgglomerativeClustering. - cluster.SpectralClustering. - cluster.KMeans. :return: pandas.DataFrame with a column with clusters. """ if not isinstance(dist_df, pd.DataFrame): dist_df = dist_df.to_df().T df_matrix = minmax_scale(dist_df) if method in n_cluster_methods.keys(): n_clusters = jump_method(dist_df=df_matrix) clf = n_cluster_methods[method](n_clusters=n_clusters) else: clf = non_n_cluster_methods[method]() y = clf.fit_predict(df_matrix) cluster_df = pd.DataFrame(df_matrix, index=dist_df.index, columns=dist_df.columns) cluster_df['Cluster'] = pd.Series(y, index=cluster_df.index) return cluster_df
[docs]def jump_method(dist_df, n_max=50): """ Method based on information theory to determine best number of clusters. :param pandas.DataFrame dist_df: :param int n_max: maximum number of clusters to test. :return: optimal number of clusters """ dim = dist_df.shape[0] if n_max > dim: n_max = dim Y = dim / 2 distortions = np.empty(n_max + 1) jump_vector = np.empty(n_max) distortions[0] = 0.0 for k in range(1, n_max + 1): kmean_model = cluster.KMeans(n_clusters=k).fit(dist_df) distortion = np.min(cdist(dist_df, kmean_model.cluster_centers_, 'euclidean').ravel()) / dim + eps distortions[k] = distortion**(- Y) jump_vector[k - 1] = distortions[k] - distortions[k - 1] n_cluster = np.argmax(jump_vector) + 1 # Avoiding let an instance alone instance_alone = True while instance_alone is True: y = cluster.KMeans(n_clusters=n_cluster).fit_predict(dist_df) group_member = [len(list(group)) for key, group in groupby(np.sort(y))] if np.min(group_member) > 1 or n_cluster == 2: instance_alone = False else: n_cluster -= 1 return n_cluster
def score_cluster(cluster_df): """ When `automatic_cluster` is used, then the clusters must be grouped into the categories we want into predict, in order to score our method. :param pandas.DataFrame cluster_df: :return: accuracy score. cluster_df have now `Cluster_corrected` column. """ accurate_class = [int(n[0][0]) for n in cluster_df.index.tolist()] accurate_class -= np.unique(accurate_class)[0] # Move to 0, 1, ... notation accurate_class = np.array(accurate_class, dtype=int) cluster_class = np.array(cluster_df['Cluster'].tolist(), dtype=int) # Find correspondences between given classes and cluster classes correspondence_dict = {} for p in np.unique(cluster_class): max_c = 0.0 pos_p = cluster_class == p for e in np.unique(accurate_class): pos_e = accurate_class == e c = (pos_p == pos_e).sum() if c > max_c: correspondence_dict.update({p: e}) max_c = c # Finding the accuracy cluster_class_corrected = [correspondence_dict[p] for p in cluster_class] cluster_df['Cluster_corrected'] = pd.Series(cluster_class_corrected, index=cluster_df.index) score_vector = [e == p_c for e, p_c in zip(accurate_class, cluster_class_corrected)] return np.average(score_vector) def party_list(song_df, song=None): """ A list of song of all the songs from the cluster dataframe sorted, from similarity between them. :param pandas.DataFrame song_df: :param str song: :return: """ song_df_rev = song_df.T if song is None or song not in song_df_rev.index: song = song_df_rev.index[0] # TODO: to implement final_index = list(song_df_rev.sort_values(song, axis='columns')[song].index) return final_index