雑感等

音楽,数学,語学,その他に関するメモを記す.

pythonで多次元データの度数分布表を作成(マルチフラクタル次元計算の準備)+joblibで並列化

pythonで多次元データの度数分布表を作成(マルチフラクタル次元計算の準備) - 雑感等

与えるwavファイルは同じだが処理時間は940秒ほどになった.

import numpy as np
import wave
from scipy import fromstring, int16, int64
import itertools
import seaborn as sns
import matplotlib.pyplot as plt
import time
from joblib import Parallel, delayed


def probability_distribution(X, div_num=2 * 3 * 5 * 7):
    """
    多次元データの度数分布表を作成する.
    多次元ヒストグラムあるいは多次元ヒートマップのような,数値の分布を計算する.
    :param X:入力データ.第0方向が時系列.第1方向が特徴量次元
    :param div_num:ヒストグラムで言うところのビンの数
    :return: 合計(div_num^D)個の要素を持つ,D次元配列を返す.ここで,Dは特徴量次元数.
    """
    X_max = int64(np.max(X, axis=0))  # 入力データが小数点数ならint64以外にする
    X_min = int64(np.min(X, axis=0))

    # ビン分割の最大値と最小値に余裕を持たせる
    bin_margin = 0.2
    X_max = X_max * (1 + np.sign(X_max) * bin_margin)
    X_min = X_min * (1 - np.sign(X_min) * bin_margin)
    bin_resolution = np.array([(X_max - X_min) / div_num])

    D = []  # 特徴量次元数
    N = []  # データ点数
    len_X_shape = X.shape.__len__()
    if len_X_shape == 2:
        D = X.shape[1]
        N = X.shape[0]
    elif len_X_shape == 1:
        D = 1
        N = X.shape[0]
    else:
        return None

    too_many_D = 30  # 次元が多すぎる場合
    if D > too_many_D:
        import sys
        print(
            f"High-dimensional data(data dimension={D}) was input to the function \"{sys._getframe().f_code.co_name}\".\n"
            f"Do you want to ontinue? [Y]/[N]")
        if "Y" != input():
            print(f"Interrupted was processing of the function \"{sys._getframe().f_code.co_name}.")
            return None

    ind_tmp = np.array(list(range(div_num)))
    ind = np.array(list(itertools.product(ind_tmp, repeat=D)))
    N_i = np.zeros(np.repeat(div_num, D))  # 度数分布表の配列

    print("ループ1")
    start1 = time.time()  # 下のループにかかる時間を測定する.

    def process(i):
        k = ind[i]
        range_min = bin_resolution * k + X_min
        range_max = bin_resolution * (k + 1) + X_min
        temp_bin_cond = sum(map(all, np.logical_and(range_min <= X, X < range_max)))
        return temp_bin_cond
        pass

    bin_conds = Parallel(n_jobs=-1)([delayed(process)(i) for i in range(len(ind))])
    end = time.time()
    print(f"ループ1の処理全体に掛かった時間:{(end - start1)}")
    print(f"ループ1の一周あたりの平均時間:{(end - start1) / len(ind)}")

    print("ループ2")
    start2 = time.time()  # 下のループにかかる時間を測定する.
    N_i = np.array(bin_conds).reshape(tuple(np.repeat(div_num, X.shape[1])))
    end = time.time()
    print(f"ループ2の処理全体に掛かった時間:{(end - start2)}")
    print(f"ループ2の一周あたりの平均時間:{(end - start2) / len(ind)}")
    print(f"ループ処理全体に掛かった時間:{(end - start1)}")
    print(f"ループ一周あたりの平均時間:{(end - start1) / len(ind)}")

    sns.heatmap(np.log10(N_i + 1e-10))  # ヒートマップを描画.値の大きさは対数にする.
    print(f"sum N_i:{np.sum(N_i)} (入力データのデータ数(次元数ではなく)に一致するはず)")
    return N_i  # 度数分布表を返す
    pass


if __name__ == "__main__":
    file_name = "RIFF wavファイルのパス"
    with wave.open(file_name, "r") as wf:
        fs = wf.getframerate()
        n = wf.getnframes()
        data = wf.readframes(wf.getnframes())
    X = fromstring(data, dtype=int16)

    # t = np.linspace(0, 10, 1000)
    # X = np.sin(2 * np.pi * np.sqrt(101) * t) * 100

    # 特徴量が2次元の時系列データを作成(X[:,0]は時系列データ,X[:,1]は時系列データの時間差分(時間微分))
    data = np.concatenate([np.array([X[0:-1]]), np.array([np.diff(X)])], axis=0).T

    # 上で作成したデータから多次元度数分布表を計算する.ビン分割数は2*3*5*7個としている.
    probability_distribution(X=data, div_num=2 * 3 * 5 * 7)
    plt.figure()
    plt.plot(data[:, 0], data[:, 1])
    plt.show()
    pass