# Copyright (c) 2020, MD2K Center of Excellence
# - Nasir Ali <nasir.ali08@gmail.com>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import math
import numpy as np
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.group import GroupedData
from pyspark.sql.types import *
from pyspark.sql.types import StructType
from pyspark.sql.window import Window
from cerebralcortex.core.datatypes.datastream import DataStream
from cerebralcortex.core.metadata_manager.stream.metadata import Metadata
[docs]def complementary_filter(ds, freq: int = 16, accelerometer_x: str = "accelerometer_x",
accelerometer_y: str = "accelerometer_y", accelerometer_z: str = "accelerometer_z",
gyroscope_x: str = "gyroscope_x", gyroscope_y: str = "gyroscope_y",
gyroscope_z: str = "gyroscope_z"):
"""
Compute complementary filter on gyro and accel data.
Args:
ds (DataStream ): Non-Windowed/grouped dataframe
freq (int): frequency of accel/gryo. Assumption is that frequency is equal for both gyro and accel.
accelerometer_x (str): name of the column
accelerometer_y (str): name of the column
accelerometer_z (str): name of the column
gyroscope_x (str): name of the column
gyroscope_y (str): name of the column
gyroscope_z (str): name of the column
"""
dt = 1.0 / freq # 1/16.0;
M_PI = math.pi;
hpf = 0.90;
lpf = 0.10;
window = Window.partitionBy(ds._data['user']).orderBy(ds._data['timestamp'])
data = ds._data.withColumn("thetaX_accel",
((F.atan2(-F.col(accelerometer_z), F.col(accelerometer_y)) * 180 / M_PI)) * lpf) \
.withColumn("roll",
(F.lag("thetaX_accel").over(window) + F.col(gyroscope_x) * dt) * hpf + F.col("thetaX_accel")).drop(
"thetaX_accel") \
.withColumn("thetaY_accel",
((F.atan2(-F.col(accelerometer_x), F.col(accelerometer_z)) * 180 / M_PI)) * lpf) \
.withColumn("pitch",
(F.lag("thetaY_accel").over(window) + F.col(gyroscope_y) * dt) * hpf + F.col("thetaY_accel")).drop(
"thetaY_accel") \
.withColumn("thetaZ_accel",
((F.atan2(-F.col(accelerometer_y), F.col(accelerometer_x)) * 180 / M_PI)) * lpf) \
.withColumn("yaw",
(F.lag("thetaZ_accel").over(window) + F.col(gyroscope_z) * dt) * hpf + F.col("thetaZ_accel")).drop(
"thetaZ_accel")
return DataStream(data=data.dropna(), metadata=Metadata())
[docs]def compute_zero_cross_rate(ds, exclude_col_names: list = [],
feature_names=['zero_cross_rate']):
"""
Compute statistical features.
Args:
ds (DataStream ): Windowed/grouped dataframe
exclude_col_names list(str): name of the columns on which features should not be computed
feature_names list(str): names of the features. Supported features are ['mean', 'median', 'stddev', 'variance', 'max', 'min', 'skew',
'kurt', 'sqr', 'zero_cross_rate'
windowDuration (int): duration of a window in seconds
slideDuration (int): slide duration of a window
groupByColumnName List[str]: groupby column names, for example, groupby user, col1, col2
startTime (datetime): The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15... provide startTime as 15 minutes. First time of data will be used as startTime if none is provided
Returns:
DataStream object
"""
exclude_col_names.extend(["timestamp", "localtime", "user", "version"])
data = ds._data.drop(*exclude_col_names)
df_column_names = data.columns
basic_schema = StructType([
StructField("timestamp", TimestampType()),
StructField("localtime", TimestampType()),
StructField("user", StringType()),
StructField("version", IntegerType()),
StructField("start_time", TimestampType()),
StructField("end_time", TimestampType())
])
features_list = []
for cn in df_column_names:
for sf in feature_names:
features_list.append(StructField(cn + "_" + sf, FloatType(), True))
features_schema = StructType(basic_schema.fields + features_list)
def calculate_zero_cross_rate(series):
"""
How often the signal changes sign (+/-)
"""
series_mean = np.mean(series)
series = [v - series_mean for v in series]
zero_cross_count = (np.diff(np.sign(series)) != 0).sum()
return zero_cross_count / len(series)
@pandas_udf(features_schema, PandasUDFType.GROUPED_MAP)
def get_features_udf(df):
results = []
timestamp = df['timestamp'].iloc[0]
localtime = df['localtime'].iloc[0]
user = df['user'].iloc[0]
version = df['version'].iloc[0]
start_time = timestamp
end_time = df['timestamp'].iloc[-1]
df.drop(exclude_col_names, axis=1, inplace=True)
if "zero_cross_rate" in feature_names:
df_zero_cross_rate = df.apply(calculate_zero_cross_rate)
df_zero_cross_rate.index += '_zero_cross_rate'
results.append(df_zero_cross_rate)
output = pd.DataFrame(pd.concat(results)).T
basic_df = pd.DataFrame([[timestamp, localtime, user, int(version), start_time, end_time]],
columns=['timestamp', 'localtime', 'user', 'version', 'start_time', 'end_time'])
return basic_df.assign(**output)
# check if datastream object contains grouped type of DataFrame
if not isinstance(ds._data, GroupedData):
raise Exception(
"DataStream object is not grouped data type. Please use 'window' operation on datastream object before running this algorithm")
data = ds._data.apply(get_features_udf)
return DataStream(data=data, metadata=Metadata())
[docs]def compute_FFT_features(ds, exclude_col_names: list = [],
feature_names=["fft_centroid", 'fft_spread', 'spectral_entropy', 'fft_flux',
'spectral_falloff']):
"""
Transforms data from time domain to frequency domain.
Args:
exclude_col_names list(str): name of the columns on which features should not be computed
feature_names list(str): names of the features. Supported features are fft_centroid, fft_spread, spectral_entropy, spectral_entropy_old, fft_flux, spectral_falloff
windowDuration (int): duration of a window in seconds
slideDuration (int): slide duration of a window
groupByColumnName List[str]: groupby column names, for example, groupby user, col1, col2
startTime (datetime): The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15... provide startTime as 15 minutes. First time of data will be used as startTime if none is provided
Returns:
DataStream object with all the existing data columns and FFT features
"""
eps = 0.00000001
exclude_col_names.extend(["timestamp", "localtime", "user", "version"])
data = ds._data.drop(*exclude_col_names)
df_column_names = data.columns
basic_schema = StructType([
StructField("timestamp", TimestampType()),
StructField("localtime", TimestampType()),
StructField("user", StringType()),
StructField("version", IntegerType()),
StructField("start_time", TimestampType()),
StructField("end_time", TimestampType())
])
features_list = []
for cn in df_column_names:
for sf in feature_names:
features_list.append(StructField(cn + "_" + sf, FloatType(), True))
features_schema = StructType(basic_schema.fields + features_list)
def stSpectralCentroidAndSpread(X, fs):
"""Computes spectral centroid of frame (given abs(FFT))"""
ind = (np.arange(1, len(X) + 1)) * (fs / (2.0 * len(X)))
Xt = X.copy()
Xt = Xt / Xt.max()
NUM = np.sum(ind * Xt)
DEN = np.sum(Xt) + eps
# Centroid:
C = (NUM / DEN)
# Spread:
S = np.sqrt(np.sum(((ind - C) ** 2) * Xt) / DEN)
# Normalize:
C = C / (fs / 2.0)
S = S / (fs / 2.0)
return (C, S)
def stSpectralFlux(X, Xprev):
"""
Computes the spectral flux feature of the current frame
ARGUMENTS:
X: the abs(fft) of the current frame
Xpre: the abs(fft) of the previous frame
"""
# compute the spectral flux as the sum of square distances:
sumX = np.sum(X + eps)
sumPrevX = np.sum(Xprev + eps)
F = np.sum((X / sumX - Xprev / sumPrevX) ** 2)
return F
def stSpectralRollOff(X, c, fs):
"""Computes spectral roll-off"""
totalEnergy = np.sum(X ** 2)
fftLength = len(X)
Thres = c * totalEnergy
# Ffind the spectral rolloff as the frequency position where the respective spectral energy is equal to c*totalEnergy
CumSum = np.cumsum(X ** 2) + eps
[a, ] = np.nonzero(CumSum > Thres)
if len(a) > 0:
mC = np.float64(a[0]) / (float(fftLength))
else:
mC = 0.0
return (mC)
def stSpectralEntropy(X, numOfShortBlocks=10):
"""Computes the spectral entropy"""
L = len(X) # number of frame samples
Eol = np.sum(X ** 2) # total spectral energy
subWinLength = int(np.floor(L / numOfShortBlocks)) # length of sub-frame
if L != subWinLength * numOfShortBlocks:
X = X[0:subWinLength * numOfShortBlocks]
subWindows = X.reshape(subWinLength, numOfShortBlocks,
order='F').copy() # define sub-frames (using matrix reshape)
s = np.sum(subWindows ** 2, axis=0) / (Eol + eps) # compute spectral sub-energies
En = -np.sum(s * np.log2(s + eps)) # compute spectral entropy
return En
def spectral_entropy(data, sampling_freq, bands=None):
psd = np.abs(np.fft.rfft(data)) ** 2
psd /= np.sum(psd) # psd as a pdf (normalised to one)
if bands is None:
power_per_band = psd[psd > 0]
else:
freqs = np.fft.rfftfreq(data.size, 1 / float(sampling_freq))
bands = np.asarray(bands)
freq_limits_low = np.concatenate([[0.0], bands])
freq_limits_up = np.concatenate([bands, [np.Inf]])
power_per_band = [np.sum(psd[np.bitwise_and(freqs >= low, freqs < up)])
for low, up in zip(freq_limits_low, freq_limits_up)]
power_per_band = power_per_band[power_per_band > 0]
return -np.sum(power_per_band * np.log2(power_per_band))
def fourier_features_pandas_udf(data, frequency: float = 16.0):
Fs = frequency # the sampling freq (in Hz)
results = []
# fourier transforms!
# data_fft = abs(np.fft.rfft(data))
X = abs(np.fft.fft(data))
nFFT = int(len(X) / 2) + 1
X = X[0:nFFT] # normalize fft
X = X / len(X)
if "fft_centroid" or "fft_spread" in feature_names:
C, S = stSpectralCentroidAndSpread(X, Fs) # spectral centroid and spread
if "fft_centroid" in feature_names:
results.append(C)
if "fft_spread" in feature_names:
results.append(S)
if "spectral_entropy" in feature_names:
se = stSpectralEntropy(X) # spectral entropy
results.append(se)
if "spectral_entropy_old" in feature_names:
se_old = spectral_entropy(X, frequency) # spectral flux
results.append(se_old)
if "fft_flux" in feature_names:
flx = stSpectralFlux(X, X.copy()) # spectral flux
results.append(flx)
if "spectral_folloff" in feature_names:
roff = stSpectralRollOff(X, 0.90, frequency) # spectral rolloff
results.append(roff)
return pd.Series(results)
@pandas_udf(features_schema, PandasUDFType.GROUPED_MAP)
def get_fft_features(df):
timestamp = df['timestamp'].iloc[0]
localtime = df['localtime'].iloc[0]
user = df['user'].iloc[0]
version = df['version'].iloc[0]
start_time = timestamp
end_time = df['timestamp'].iloc[-1]
df.drop(exclude_col_names, axis=1, inplace=True)
df_ff = df.apply(fourier_features_pandas_udf)
df3 = df_ff.T
pd.set_option('display.max_colwidth', -1)
df3.columns = feature_names
# multiple rows to one row
output = df3.unstack().to_frame().sort_index(level=1).T
output.columns = [f'{j}_{i}' for i, j in output.columns]
basic_df = pd.DataFrame([[timestamp, localtime, user, int(version), start_time, end_time]],
columns=['timestamp', 'localtime', 'user', 'version', 'start_time', 'end_time'])
# df.insert(loc=0, columns=, value=basic_cols)
return basic_df.assign(**output)
# check if datastream object contains grouped type of DataFrame
if not isinstance(ds._data, GroupedData):
raise Exception(
"DataStream object is not grouped data type. Please use 'window' operation on datastream object before running this algorithm")
data = ds._data.apply(get_fft_features)
return DataStream(data=data, metadata=Metadata())