pythonで多次元データの度数分布表を作成(マルチフラクタル次元計算の準備) - 雑感等
与えるwavファイルは同じだが処理時間は940秒ほどになった.
import numpy as np
import wave
from scipy import fromstring, int16, int64
import itertools
import seaborn as sns
import matplotlib.pyplot as plt
import time
from joblib import Parallel, delayed
def probability_distribution(X, div_num=2 * 3 * 5 * 7):
"""
多次元データの度数分布表を作成する.
多次元ヒストグラムあるいは多次元ヒートマップのような,数値の分布を計算する.
:param X:入力データ.第0方向が時系列.第1方向が特徴量次元
:param div_num:ヒストグラムで言うところのビンの数
:return: 合計(div_num^D)個の要素を持つ,D次元配列を返す.ここで,Dは特徴量次元数.
"""
X_max = int64(np.max(X, axis=0))
X_min = int64(np.min(X, axis=0))
bin_margin = 0.2
X_max = X_max * (1 + np.sign(X_max) * bin_margin)
X_min = X_min * (1 - np.sign(X_min) * bin_margin)
bin_resolution = np.array([(X_max - X_min) / div_num])
D = []
N = []
len_X_shape = X.shape.__len__()
if len_X_shape == 2:
D = X.shape[1]
N = X.shape[0]
elif len_X_shape == 1:
D = 1
N = X.shape[0]
else:
return None
too_many_D = 30
if D > too_many_D:
import sys
print(
f"High-dimensional data(data dimension={D}) was input to the function \"{sys._getframe().f_code.co_name}\".\n"
f"Do you want to ontinue? [Y]/[N]")
if "Y" != input():
print(f"Interrupted was processing of the function \"{sys._getframe().f_code.co_name}.")
return None
ind_tmp = np.array(list(range(div_num)))
ind = np.array(list(itertools.product(ind_tmp, repeat=D)))
N_i = np.zeros(np.repeat(div_num, D))
print("ループ1")
start1 = time.time()
def process(i):
k = ind[i]
range_min = bin_resolution * k + X_min
range_max = bin_resolution * (k + 1) + X_min
temp_bin_cond = sum(map(all, np.logical_and(range_min <= X, X < range_max)))
return temp_bin_cond
pass
bin_conds = Parallel(n_jobs=-1)([delayed(process)(i) for i in range(len(ind))])
end = time.time()
print(f"ループ1の処理全体に掛かった時間:{(end - start1)}")
print(f"ループ1の一周あたりの平均時間:{(end - start1) / len(ind)}")
print("ループ2")
start2 = time.time()
N_i = np.array(bin_conds).reshape(tuple(np.repeat(div_num, X.shape[1])))
end = time.time()
print(f"ループ2の処理全体に掛かった時間:{(end - start2)}")
print(f"ループ2の一周あたりの平均時間:{(end - start2) / len(ind)}")
print(f"ループ処理全体に掛かった時間:{(end - start1)}")
print(f"ループ一周あたりの平均時間:{(end - start1) / len(ind)}")
sns.heatmap(np.log10(N_i + 1e-10))
print(f"sum N_i:{np.sum(N_i)} (入力データのデータ数(次元数ではなく)に一致するはず)")
return N_i
pass
if __name__ == "__main__":
file_name = "RIFF wavファイルのパス"
with wave.open(file_name, "r") as wf:
fs = wf.getframerate()
n = wf.getnframes()
data = wf.readframes(wf.getnframes())
X = fromstring(data, dtype=int16)
data = np.concatenate([np.array([X[0:-1]]), np.array([np.diff(X)])], axis=0).T
probability_distribution(X=data, div_num=2 * 3 * 5 * 7)
plt.figure()
plt.plot(data[:, 0], data[:, 1])
plt.show()
pass