Source code for cerebralcortex.markers.brushing.features

from typing import List

import numpy as np
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
# from pyspark.sql.functions import pandas_udf,PandasUDFType
from pyspark.sql.types import StructType

from cerebralcortex.core.datatypes import DataStream
from cerebralcortex.core.metadata_manager.stream.metadata import Metadata


[docs]def compute_corr_mse_accel_gyro(self, exclude_col_names: list = [], accel_column_names: list = ['accelerometer_x', 'accelerometer_y', 'accelerometer_z'], gyro_column_names: list = ['gyroscope_y', 'gyroscope_x', 'gyroscope_z'], windowDuration: int = None, slideDuration: int = None, groupByColumnName: List[str] = [], startTime=None): """ Compute correlation and mean standard error of accel and gyro sensors Args: exclude_col_names list(str): name of the columns on which features should not be computed accel_column_names list(str): name of accel data column gyro_column_names list(str): name of gyro data column windowDuration (int): duration of a window in seconds slideDuration (int): slide duration of a window groupByColumnName List[str]: groupby column names, for example, groupby user, col1, col2 startTime (datetime): The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15... provide startTime as 15 minutes. First time of data will be used as startTime if none is provided Returns: DataStream object with all the existing data columns and FFT features """ feature_names = ["ax_ay_corr", 'ax_az_corr', 'ay_az_corr', 'gx_gy_corr', 'gx_gz_corr', 'gy_gz_corr', 'ax_ay_mse', 'ax_az_mse', 'ay_az_mse', 'gx_gy_mse', 'gx_gz_mse', 'gy_gz_mse'] exclude_col_names.extend(["timestamp", "localtime", "user", "version"]) data = self._data.drop(*exclude_col_names) basic_schema = StructType([ StructField("timestamp", TimestampType()), StructField("localtime", TimestampType()), StructField("user", StringType()), StructField("version", IntegerType()), StructField("start_time", TimestampType()), StructField("end_time", TimestampType()) ]) features_list = [] for fn in feature_names: features_list.append(StructField(fn, FloatType(), True)) features_schema = StructType(basic_schema.fields + features_list) @pandas_udf(features_schema, PandasUDFType.GROUPED_MAP) def get_corr_mse_features_udf(df): timestamp = df['timestamp'].iloc[0] localtime = df['localtime'].iloc[0] user = df['user'].iloc[0] version = df['version'].iloc[0] start_time = timestamp end_time = df['timestamp'].iloc[-1] ax_ay_corr = df[accel_column_names[0]].corr(df[accel_column_names[1]]) ax_az_corr = df[accel_column_names[0]].corr(df[accel_column_names[2]]) ay_az_corr = df[accel_column_names[1]].corr(df[accel_column_names[2]]) gx_gy_corr = df[gyro_column_names[0]].corr(df[gyro_column_names[1]]) gx_gz_corr = df[gyro_column_names[0]].corr(df[gyro_column_names[2]]) gy_gz_corr = df[gyro_column_names[1]].corr(df[gyro_column_names[2]]) ax_ay_mse = ((df[accel_column_names[0]] - df[accel_column_names[1]]) ** 2).mean() ax_az_mse = ((df[accel_column_names[0]] - df[accel_column_names[2]]) ** 2).mean() ay_az_mse = ((df[accel_column_names[1]] - df[accel_column_names[2]]) ** 2).mean() gx_gy_mse = ((df[accel_column_names[0]] - df[accel_column_names[1]]) ** 2).mean() gx_gz_mse = ((df[accel_column_names[0]] - df[accel_column_names[2]]) ** 2).mean() gy_gz_mse = ((df[accel_column_names[1]] - df[accel_column_names[2]]) ** 2).mean() basic_df = pd.DataFrame([[timestamp, localtime, user, int(version), start_time, end_time, ax_ay_corr, ax_az_corr, ay_az_corr, gx_gy_corr, gx_gz_corr, gy_gz_corr, ax_ay_mse, ax_az_mse, ay_az_mse, gx_gy_mse, gx_gz_mse, gy_gz_mse]], columns=['timestamp', 'localtime', 'user', 'version', 'start_time', 'end_time', "ax_ay_corr", 'ax_az_corr', 'ay_az_corr', 'gx_gy_corr', 'gx_gz_corr', 'gy_gz_corr', 'ax_ay_mse', 'ax_az_mse', 'ay_az_mse', 'gx_gy_mse', 'gx_gz_mse', 'gy_gz_mse']) return basic_df data = self.compute(get_corr_mse_features_udf, windowDuration=windowDuration, slideDuration=slideDuration, groupByColumnName=groupByColumnName, startTime=startTime) return DataStream(data=data._data, metadata=Metadata())
[docs]def compute_fourier_features(self, exclude_col_names: list = [], feature_names=["fft_centroid", 'fft_spread', 'spectral_entropy', 'spectral_entropy_old', 'fft_flux', 'spectral_falloff'], windowDuration: int = None, slideDuration: int = None, groupByColumnName: List[str] = [], startTime=None): """ Transforms data from time domain to frequency domain. Args: exclude_col_names list(str): name of the columns on which features should not be computed feature_names list(str): names of the features. Supported features are fft_centroid, fft_spread, spectral_entropy, spectral_entropy_old, fft_flux, spectral_falloff windowDuration (int): duration of a window in seconds slideDuration (int): slide duration of a window groupByColumnName List[str]: groupby column names, for example, groupby user, col1, col2 startTime (datetime): The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15... provide startTime as 15 minutes. First time of data will be used as startTime if none is provided Returns: DataStream object with all the existing data columns and FFT features """ eps = 0.00000001 exclude_col_names.extend(["timestamp", "localtime", "user", "version"]) data = self._data.drop(*exclude_col_names) df_column_names = data.columns basic_schema = StructType([ StructField("timestamp", TimestampType()), StructField("localtime", TimestampType()), StructField("user", StringType()), StructField("version", IntegerType()), StructField("start_time", TimestampType()), StructField("end_time", TimestampType()) ]) features_list = [] for cn in df_column_names: for sf in feature_names: features_list.append(StructField(cn + "_" + sf, FloatType(), True)) features_schema = StructType(basic_schema.fields + features_list) def stSpectralCentroidAndSpread(X, fs): """Computes spectral centroid of frame (given abs(FFT))""" ind = (np.arange(1, len(X) + 1)) * (fs / (2.0 * len(X))) Xt = X.copy() Xt = Xt / Xt.max() NUM = np.sum(ind * Xt) DEN = np.sum(Xt) + eps # Centroid: C = (NUM / DEN) # Spread: S = np.sqrt(np.sum(((ind - C) ** 2) * Xt) / DEN) # Normalize: C = C / (fs / 2.0) S = S / (fs / 2.0) return (C, S) def stSpectralFlux(X, Xprev): """ Computes the spectral flux feature of the current frame ARGUMENTS: X: the abs(fft) of the current frame Xpre: the abs(fft) of the previous frame """ # compute the spectral flux as the sum of square distances: sumX = np.sum(X + eps) sumPrevX = np.sum(Xprev + eps) F = np.sum((X / sumX - Xprev / sumPrevX) ** 2) return F def stSpectralRollOff(X, c, fs): """Computes spectral roll-off""" totalEnergy = np.sum(X ** 2) fftLength = len(X) Thres = c * totalEnergy # Ffind the spectral rolloff as the frequency position where the respective spectral energy is equal to c*totalEnergy CumSum = np.cumsum(X ** 2) + eps [a, ] = np.nonzero(CumSum > Thres) if len(a) > 0: mC = np.float64(a[0]) / (float(fftLength)) else: mC = 0.0 return (mC) def stSpectralEntropy(X, numOfShortBlocks=10): """Computes the spectral entropy""" L = len(X) # number of frame samples Eol = np.sum(X ** 2) # total spectral energy subWinLength = int(np.floor(L / numOfShortBlocks)) # length of sub-frame if L != subWinLength * numOfShortBlocks: X = X[0:subWinLength * numOfShortBlocks] subWindows = X.reshape(subWinLength, numOfShortBlocks, order='F').copy() # define sub-frames (using matrix reshape) s = np.sum(subWindows ** 2, axis=0) / (Eol + eps) # compute spectral sub-energies En = -np.sum(s * np.log2(s + eps)) # compute spectral entropy return En def spectral_entropy(data, sampling_freq, bands=None): psd = np.abs(np.fft.rfft(data)) ** 2 psd /= np.sum(psd) # psd as a pdf (normalised to one) if bands is None: power_per_band = psd[psd > 0] else: freqs = np.fft.rfftfreq(data.size, 1 / float(sampling_freq)) bands = np.asarray(bands) freq_limits_low = np.concatenate([[0.0], bands]) freq_limits_up = np.concatenate([bands, [np.Inf]]) power_per_band = [np.sum(psd[np.bitwise_and(freqs >= low, freqs < up)]) for low, up in zip(freq_limits_low, freq_limits_up)] power_per_band = power_per_band[power_per_band > 0] return -np.sum(power_per_band * np.log2(power_per_band)) def fourier_features_pandas_udf(data, frequency: float = 16.0): Fs = frequency # the sampling freq (in Hz) results = [] # fourier transforms! # data_fft = abs(np.fft.rfft(data)) X = abs(np.fft.fft(data)) nFFT = int(len(X) / 2) + 1 X = X[0:nFFT] # normalize fft X = X / len(X) if "fft_centroid" or "fft_spread" in feature_names: C, S = stSpectralCentroidAndSpread(X, Fs) # spectral centroid and spread if "fft_centroid" in feature_names: results.append(C) if "fft_spread" in feature_names: results.append(S) if "spectral_entropy" in feature_names: se = stSpectralEntropy(X) # spectral entropy results.append(se) if "spectral_entropy_old" in feature_names: se_old = spectral_entropy(X, frequency) # spectral flux results.append(se_old) if "fft_flux" in feature_names: flx = stSpectralFlux(X, X.copy()) # spectral flux results.append(flx) if "spectral_folloff" in feature_names: roff = stSpectralRollOff(X, 0.90, frequency) # spectral rolloff results.append(roff) return pd.Series(results) @pandas_udf(features_schema, PandasUDFType.GROUPED_MAP) def get_fft_features(df): timestamp = df['timestamp'].iloc[0] localtime = df['localtime'].iloc[0] user = df['user'].iloc[0] version = df['version'].iloc[0] start_time = timestamp end_time = df['timestamp'].iloc[-1] df.drop(exclude_col_names, axis=1, inplace=True) df_ff = df.apply(fourier_features_pandas_udf) df3 = df_ff.T pd.set_option('display.max_colwidth', -1) # split column into multiple columns # df3 = pd.DataFrame(df_ff.values.tolist(), index=df_ff.index) # print("**"*50) # print(type(df), type(df_ff), type(df3)) # print(df) # print(df_ff) # print(df_ff.values.tolist()) # print(df3) # print("**" * 50) # print("FEATURE-NAMES", feature_names) df3.columns = feature_names # multiple rows to one row output = df3.unstack().to_frame().sort_index(level=1).T output.columns = [f'{j}_{i}' for i, j in output.columns] basic_df = pd.DataFrame([[timestamp, localtime, user, int(version), start_time, end_time]], columns=['timestamp', 'localtime', 'user', 'version', 'start_time', 'end_time']) # df.insert(loc=0, columns=, value=basic_cols) return basic_df.assign(**output) return self.compute(get_fft_features, windowDuration=windowDuration, slideDuration=slideDuration, groupByColumnName=groupByColumnName, startTime=startTime)