Source code for cerebralcortex.algorithms.stats.features

# Copyright (c) 2020, MD2K Center of Excellence
# - Nasir Ali <nasir.ali08@gmail.com>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import numpy as np
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.group import GroupedData
from pyspark.sql.types import *
from pyspark.sql.types import StructType

from cerebralcortex.core.datatypes.datastream import DataStream
from cerebralcortex.core.metadata_manager.stream.metadata import Metadata


[docs]def magnitude(ds, col_names=[]): """ Compute magnitude of columns Args: ds (DataStream): Windowed/grouped DataStream object col_names (list[str]): column names Returns: DataStream """ if len(col_names) < 1: raise Exception("col_names param cannot be empty list.") tmp = "" for col_name in col_names: tmp += 'F.col("' + col_name + '")*F.col("' + col_name + '")+' tmp = tmp.rstrip("+") data = ds._data.withColumn("magnitude", F.sqrt(eval(tmp))) return DataStream(data=data, metadata=Metadata())
# stat
[docs]def interpolate(ds, freq=16, method='linear', axis=0, limit=None, inplace=False, limit_direction='forward', limit_area=None, downcast=None): """ Interpolate values according to different methods. This method internally uses pandas interpolation. Args: ds (DataStream): Windowed/grouped DataStream object freq (int): Frequency of the signal method (str): default ‘linear’ - ‘linear’: Ignore the index and treat the values as equally spaced. This is the only method supported on MultiIndexes. - ‘time’: Works on daily and higher resolution data to interpolate given length of interval. - ‘index’, ‘values’: use the actual numerical values of the index. - ‘pad’: Fill in NaNs using existing values. - ‘nearest’, ‘zero’, ‘slinear’, ‘quadratic’, ‘cubic’, ‘spline’, ‘barycentric’, ‘polynomial’: Passed to scipy.interpolate.interp1d. These methods use the numerical values of the index. Both ‘polynomial’ and ‘spline’ require that you also specify an order (int), e.g. df.interpolate(method='polynomial', order=5). - ‘krogh’, ‘piecewise_polynomial’, ‘spline’, ‘pchip’, ‘akima’: Wrappers around the SciPy interpolation methods of similar names. See Notes. - ‘from_derivatives’: Refers to scipy.interpolate.BPoly.from_derivatives which replaces ‘piecewise_polynomial’ interpolation method in scipy 0.18. axis {0 or ‘index’, 1 or ‘columns’, None}: default None. Axis to interpolate along. limit (int): optional. Maximum number of consecutive NaNs to fill. Must be greater than 0. inplace (bool): default False. Update the data in place if possible. limit_direction {‘forward’, ‘backward’, ‘both’}: default ‘forward’. If limit is specified, consecutive NaNs will be filled in this direction. limit_area {None, ‘inside’, ‘outside’}: default None. If limit is specified, consecutive NaNs will be filled with this restriction. - None: No fill restriction. - ‘inside’: Only fill NaNs surrounded by valid values (interpolate). - ‘outside’: Only fill NaNs outside valid values (extrapolate). downcast optional, ‘infer’ or None: defaults to None **kwargs: Keyword arguments to pass on to the interpolating function. Returns DataStream: interpolated data """ schema = ds._data.schema sample_freq = 1000 / freq @pandas_udf(schema, PandasUDFType.GROUPED_MAP) def interpolate_data(pdf): pdf.set_index("timestamp", inplace=True) pdf = pdf.resample(str(sample_freq) + "ms").bfill(limit=1).interpolate(method=method, axis=axis, limit=limit, inplace=inplace, limit_direction=limit_direction, limit_area=limit_area, downcast=downcast) pdf.ffill(inplace=True) pdf.reset_index(drop=False, inplace=True) pdf.sort_index(axis=1, inplace=True) return pdf data = ds._data.groupby(["user", "version"]).apply(interpolate_data) return DataStream(data=data, metadata=Metadata())
[docs]def statistical_features(ds, exclude_col_names: list = [], feature_names=['mean', 'median', 'stddev', 'variance', 'max', 'min', 'skew', 'kurt', 'sqr']): """ Compute statistical features. Args: ds (DataStream): Windowed/grouped DataStream object exclude_col_names list(str): name of the columns on which features should not be computed feature_names list(str): names of the features. Supported features are ['mean', 'median', 'stddev', 'variance', 'max', 'min', 'skew', 'kurt', 'sqr', 'zero_cross_rate' Returns: DataStream object with all the existing data columns and FFT features """ exclude_col_names.extend(["timestamp", "localtime", "user", "version"]) data = ds._data._df.drop(*exclude_col_names) df_column_names = data.columns basic_schema = StructType([ StructField("timestamp", TimestampType()), StructField("localtime", TimestampType()), StructField("user", StringType()), StructField("version", IntegerType()), StructField("start_time", TimestampType()), StructField("end_time", TimestampType()) ]) features_list = [] for cn in df_column_names: for sf in feature_names: features_list.append(StructField(cn + "_" + sf, FloatType(), True)) features_schema = StructType(basic_schema.fields + features_list) def calculate_zero_cross_rate(series): """ How often the signal changes sign (+/-) """ series_mean = np.mean(series) series = [v - series_mean for v in series] zero_cross_count = (np.diff(np.sign(series)) != 0).sum() return zero_cross_count / len(series) def get_sqr(series): sqr = np.mean([v * v for v in series]) return sqr @pandas_udf(features_schema, PandasUDFType.GROUPED_MAP) def get_stats_features_udf(df): results = [] timestamp = df['timestamp'].iloc[0] localtime = df['localtime'].iloc[0] user = df['user'].iloc[0] version = df['version'].iloc[0] start_time = timestamp end_time = df['timestamp'].iloc[-1] df.drop(exclude_col_names, axis=1, inplace=True) if "mean" in feature_names: df_mean = df.mean() df_mean.index += '_mean' results.append(df_mean) if "median" in feature_names: df_median = df.median() df_median.index += '_median' results.append(df_median) if "stddev" in feature_names: df_stddev = df.std() df_stddev.index += '_stddev' results.append(df_stddev) if "variance" in feature_names: df_var = df.var() df_var.index += '_variance' results.append(df_var) if "max" in feature_names: df_max = df.max() df_max.index += '_max' results.append(df_max) if "min" in feature_names: df_min = df.min() df_min.index += '_min' results.append(df_min) if "skew" in feature_names: df_skew = df.skew() df_skew.index += '_skew' results.append(df_skew) if "kurt" in feature_names: df_kurt = df.kurt() df_kurt.index += '_kurt' results.append(df_kurt) if "sqr" in feature_names: df_sqr = df.apply(get_sqr) df_sqr.index += '_sqr' results.append(df_sqr) output = pd.DataFrame(pd.concat(results)).T basic_df = pd.DataFrame([[timestamp, localtime, user, int(version), start_time, end_time]], columns=['timestamp', 'localtime', 'user', 'version', 'start_time', 'end_time']) return basic_df.assign(**output) # check if datastream object contains grouped type of DataFrame if not isinstance(ds._data, GroupedData): raise Exception( "DataStream object is not grouped data type. Please use 'window' operation on datastream object before running this algorithm") data = ds._data.apply(get_stats_features_udf) return DataStream(data=data, metadata=Metadata())