import os
import glob
import json
import subprocess
import multiprocessing as mp
import numpy as np
from scipy.io.wavfile import read
from .plot import fourier_plot, song_plot
import logging
logger = logging.getLogger('foucluster')
logger.setLevel('DEBUG')
[docs]def mp3_to_wav(mp3_file, wav_file, encoder='mpg123'):
"""
Transform mp3 file into wav format calling bash and using mpg123
or ffmpeg.
:param str mp3_file: path to the mp3 file.
:param str wav_file: path to the new wav file.
:param str encoder: Encode to use. It could be mpg123 or ffmpeg.
:return:
"""
if encoder == 'mpg123':
bash_command = ['mpg123', '-w', wav_file, '--mono', mp3_file]
else:
bash_command = ['ffmpeg', '-i', mp3_file, wav_file]
subprocess.run(bash_command)
[docs]def wav_to_fourier(wav_file,
rate_limit=6000.0,
step=1.0):
"""
WAV file is loaded and transformed into Fourier Series.
This Fourier Series is limited.
:param str wav_file:
:param float rate_limit:
:param float step:
:return:
"""
rate, aud_data = read(wav_file)
# Should be mono
if len(aud_data) != len(aud_data.ravel()):
aud_data = np.mean(aud_data, axis=1)
# Zero padding
len_data = aud_data.shape[0]
channel_1 = np.zeros(2 ** (int(np.ceil(np.log2(len_data)))))
channel_1[0:len_data] = aud_data
# Fourier analysis
fourier = np.abs(np.fft.fft(channel_1))
freq = np.linspace(0, rate, fourier.shape[0])
freq, fourier = limit_by_freq(freq,
fourier,
upper_limit=rate_limit)
freq, fourier = group_by_freq(freq,
fourier,
step=step)
# Max frequency should be 100.0
a = np.max(np.abs(fourier)) / 100.0
fourier = fourier / a
return freq, fourier
def group_by_freq(freq, features, step=1.0):
"""
:param freq:
:param features:
:param step:
:return:
"""
min_freq = int(np.min(freq))
max_freq = int(np.max(freq))
length = int((max_freq - min_freq) / step) + 1
new_freq = np.empty(length, dtype=np.float)
new_features = np.empty(length, dtype=np.float)
i = 0
for freq_i in np.arange(min_freq, max_freq, step):
mask_1 = freq >= freq_i
mask_2 = freq < freq_i + step
mask = mask_1 * mask_2
new_freq[i] = np.mean(freq[mask])
new_features[i] = np.mean(features[mask])
i += 1
new_freq = np.array(new_freq, dtype=np.float)
new_features = np.array(new_features, dtype=np.float)
return new_freq, new_features
def limit_by_freq(freq, features, upper_limit, lower_limit=None):
"""
Limit arrays of frequency and features by maximum frequency and
bottom frequency.
:param freq: array of frequencies.
:param features: array of amplitude.
:param float upper_limit: maximum frequency.
:param float lower_limit: minimum frequency.
:return:
"""
# Copy into arrays, in order to apply mask
freq = np.array(freq, dtype=np.float)
features = np.array(features, dtype=np.float)
# Mask for bottom limit
if lower_limit is not None:
bottom_mask = freq >= lower_limit
features = features[bottom_mask]
freq = freq[bottom_mask]
# Mask for upper limit
upper_mask = freq <= upper_limit
features = features[upper_mask]
freq = freq[upper_mask]
return freq, features
def dict_to_array(song_dict):
"""
:param dict song_dict: load form dictionary to array
:return:
"""
freq = np.array([k for k in song_dict.keys()], dtype=np.float)
features = np.array([v for v in song_dict.values()], dtype=np.float)
return freq, features
def check_wav(song, source_folder, temp_folder, encoder='mpg123'):
"""
Check if song is already transformed into temp.
:param str song:
:param str source_folder:
:param str temp_folder:
:param str encoder:
:return:
"""
# Name of files
song_name = os.path.splitext(song)[0]
mp3_file = os.path.join(source_folder, song)
wav_file = os.path.join(temp_folder, song_name + '.wav')
try:
if not os.path.isfile(wav_file):
mp3_to_wav(
mp3_file=mp3_file,
wav_file=wav_file,
encoder=encoder)
else:
pass
except MemoryError:
logger.error('MemoryError: %s MP3 couldn\'t be transformed into WAV', song_name)
[docs]def time_to_frequency(song,
temp_folder,
output_folder,
rate_limit=6000.0,
overwrite=True,
plot=True,
image_folder=None,
step=5.0):
"""
Transform a MP3 song into WAV format, and then into
Fourier series.
:param str song: name of the song, with MP3 extension.
:param str output_folder: folder where pickle files from
frequency series are saved.
:param str temp_folder: folder where wav files are saved.
:param float rate_limit: maximum frequency of the frequency
series.
:param bool overwrite:
:param bool plot: if True, frequency series is plotted.
:param image_folder: if plotting is True, is the folder
where the Fourier data is saved.
:param float step: step of the Fourier series.
:return:
"""
# Name of files
song_name = os.path.splitext(song)[0]
json_name = os.path.join(output_folder, song_name + '.json')
wav_file = os.path.join(temp_folder, song_name + '.wav')
if not os.path.isfile(json_name) or overwrite is True:
# Fourier transformation
try:
frequencies, fourier_series = wav_to_fourier(wav_file=wav_file,
rate_limit=rate_limit,
step=step)
# Save as JSON
json_to_save = {song: {str(x): y for x, y in
zip(frequencies, fourier_series)}}
with open(json_name, 'w') as output:
json.dump(json_to_save, output)
# Plotting
if plot is True:
fourier_plot(freq=frequencies,
features=fourier_series,
folder=image_folder,
filename=song_name)
logger.debug('%s transformed', song_name)
except MemoryError:
logger.error('MemoryError: %s couldn\'t be Fourier transformed', song_name)