import pickle
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import *
from pyspark.sql.types import StructType
from pyspark.sql.window import Window
#re-orient-signal
[docs]def get_orientation_data(ds, wrist, ori=1, is_new_device=False,
accelerometer_x="accelerometer_x", accelerometer_y="accelerometer_y",
accelerometer_z="accelerometer_z",
gyroscope_x="gyroscope_x", gyroscope_y="gyroscope_y", gyroscope_z="gyroscope_z"):
"""
Get the orientation of hand using accel and gyro data.
Args:
ds: DataStream object
wrist: name of the wrist smart watch was worn
ori:
is_new_device: this param is for motionsense smart watch version
accelerometer_x (float):
accelerometer_y (float):
accelerometer_z (float):
gyroscope_x (float):
gyroscope_y (float):
gyroscope_z (float):
Returns:
DataStream object
"""
left_ori = {"old": {0: [1, 1, 1], 1: [1, 1, 1], 2: [-1, -1, 1], 3: [-1, 1, 1], 4: [1, -1, 1]},
"new": {0: [-1, 1, 1], 1: [-1, 1, 1], 2: [1, -1, 1], 3: [1, 1, 1], 4: [-1, -1, 1]}}
right_ori = {"old": {0: [1, -1, 1], 1: [1, -1, 1], 2: [-1, 1, 1], 3: [-1, -1, 1], 4: [1, 1, 1]},
"new": {0: [1, 1, 1], 1: [1, 1, 1], 2: [-1, -1, 1], 3: [-1, 1, 1], 4: [1, -1, 1]}}
if is_new_device:
left_fac = left_ori.get("new").get(ori)
right_fac = right_ori.get("new").get(ori)
else:
left_fac = left_ori.get("old").get(ori)
right_fac = right_ori.get("old").get(ori)
if wrist == "left":
fac = left_fac
elif wrist == "right":
fac = right_fac
else:
raise Exception("wrist can only be left or right.")
data = ds.withColumn(gyroscope_x, ds[gyroscope_x] * fac[0]) \
.withColumn(gyroscope_y, ds[gyroscope_y] * fac[1]) \
.withColumn(gyroscope_z, ds[gyroscope_z] * fac[2]) \
.withColumn(accelerometer_x, ds[accelerometer_x] * fac[0]) \
.withColumn(accelerometer_y, ds[accelerometer_y] * fac[1]) \
.withColumn(accelerometer_z, ds[accelerometer_z] * fac[2])
return data
[docs]def get_candidates(ds, uper_limit: float = 0.1, threshold: float = 0.5):
"""
Get brushing candidates. Data is windowed into potential brushing candidate
Args:
ds (DataStream):
uper_limit (float): threashold for accel. This is used to know how high the hand is
threshold (float):
Returns:
"""
window = Window.partitionBy(["user", "version"]).rowsBetween(-3, 3).orderBy("timestamp")
window2 = Window.orderBy("timestamp")
df1 = ds.withColumn("candidate", F.when(F.col("accelerometer_y") > uper_limit, F.lit(1)).otherwise(F.lit(0)))
df = df1.withColumn("candidate",
F.when((F.avg(df1.candidate).over(window)) >= threshold, F.lit(1))
.otherwise(F.lit(0)))
df2 = df.withColumn(
"userChange",
(F.col("user") != F.lag("user").over(window2)).cast("int")
) \
.withColumn(
"candidateChange",
(F.col("candidate") != F.lag("candidate").over(window2)).cast("int")
) \
.fillna(
0,
subset=["userChange", "candidateChange"]
) \
.withColumn(
"indicator",
(~((F.col("userChange") == 0) & (F.col("candidateChange") == 0))).cast("int")
) \
.withColumn(
"group",
F.sum(F.col("indicator")).over(window2.rangeBetween(Window.unboundedPreceding, 0))
).drop("userChange").drop("candidateChange").drop("indicator")
return df2
[docs]def get_max_features(ds):
"""
This method will compute what are the max values for accel and gyro statistical/FFT features
Args:
ds (DataStream):
Returns:
DataStream
"""
basic_schema = ds.schema
max_feature_schema = [
StructField("max_accl_mean", FloatType()),
StructField("max_accl_median", FloatType()),
StructField("max_accl_stddev", FloatType()),
StructField("max_accl_skew", FloatType()),
StructField("max_accl_kurt", FloatType()),
StructField("max_accl_sqr", FloatType()),
StructField("max_accl_zero_cross_rate", FloatType()),
StructField("max_accl_fft_centroid", FloatType()),
StructField("max_accl_fft_spread", FloatType()),
StructField("max_accl_spectral_entropy", FloatType()),
StructField("max_accl_spectral_entropy_old", FloatType()),
StructField("max_accl_fft_flux", FloatType()),
StructField("max_accl_spectral_folloff", FloatType())
]
features_schema = StructType(basic_schema.fields + max_feature_schema)
@pandas_udf(features_schema, PandasUDFType.GROUPED_MAP)
def get_max_vals_features(df):
vals = []
max_accl_mean, max_accl_median, max_accl_stddev, max_accl_skew, max_accl_kurt, max_accl_sqr, max_accl_zero_cross_rate, max_accl_fft_centroid, max_accl_fft_spread, max_accl_spectral_entropy, max_accl_spectral_entropy_old, max_accl_fft_flux, max_accl_spectral_folloff = (
[] for i in range(13))
vals.append(df['timestamp'].iloc[0])
vals.append(df['localtime'].iloc[0])
vals.append(df['user'].iloc[0])
vals.append(df['version'].iloc[0])
vals.append(df['timestamp'].iloc[0])
vals.append(df['timestamp'].iloc[-1])
for indx, row in df.iterrows():
max_accl_mean.append(
max(row["accelerometer_x_mean"], row["accelerometer_y_mean"], row["accelerometer_z_mean"]))
max_accl_median.append(
max(row["accelerometer_x_median"], row["accelerometer_y_median"], row["accelerometer_z_median"]))
max_accl_stddev.append(
max(row["accelerometer_x_stddev"], row["accelerometer_y_stddev"], row["accelerometer_z_stddev"]))
max_accl_skew.append(
max(row["accelerometer_x_skew"], row["accelerometer_y_skew"], row["accelerometer_z_skew"]))
max_accl_kurt.append(
max(row["accelerometer_x_kurt"], row["accelerometer_y_kurt"], row["accelerometer_z_kurt"]))
max_accl_sqr.append(
max(row["accelerometer_x_sqr"], row["accelerometer_y_sqr"], row["accelerometer_z_sqr"]))
max_accl_zero_cross_rate.append(
max(row["accelerometer_x_zero_cross_rate"], row["accelerometer_y_zero_cross_rate"],
row["accelerometer_z_zero_cross_rate"]))
max_accl_fft_centroid.append(max(row["accelerometer_x_fft_centroid"], row["accelerometer_y_fft_centroid"],
row["accelerometer_z_fft_centroid"]))
max_accl_fft_spread.append(max(row["accelerometer_x_fft_spread"], row["accelerometer_y_fft_spread"],
row["accelerometer_z_fft_spread"]))
max_accl_spectral_entropy.append(
max(row["accelerometer_x_spectral_entropy"], row["accelerometer_y_spectral_entropy"],
row["accelerometer_z_spectral_entropy"]))
max_accl_spectral_entropy_old.append(
max(row["accelerometer_x_spectral_entropy_old"], row["accelerometer_y_spectral_entropy_old"],
row["accelerometer_z_spectral_entropy_old"]))
max_accl_fft_flux.append(
max(row["accelerometer_x_fft_flux"], row["accelerometer_y_fft_flux"], row["accelerometer_z_fft_flux"]))
max_accl_spectral_folloff.append(
max(row["accelerometer_x_spectral_folloff"], row["accelerometer_y_spectral_folloff"],
row["accelerometer_z_spectral_folloff"]))
df["max_accl_mean"] = max_accl_mean
df["max_accl_median"] = max_accl_median
df["max_accl_stddev"] = max_accl_stddev
df["max_accl_skew"] = max_accl_skew
df["max_accl_kurt"] = max_accl_kurt
df["max_accl_sqr"] = max_accl_sqr
df["max_accl_zero_cross_rate"] = max_accl_zero_cross_rate
df["max_accl_fft_centroid"] = max_accl_fft_centroid
df["max_accl_fft_spread"] = max_accl_fft_spread
df["max_accl_spectral_entropy"] = max_accl_spectral_entropy
df["max_accl_spectral_entropy_old"] = max_accl_spectral_entropy_old
df["max_accl_fft_flux"] = max_accl_fft_flux
df["max_accl_spectral_folloff"] = max_accl_spectral_folloff
return df
return ds.compute(get_max_vals_features)
[docs]def filter_candidates(ds):
features_schema = ds.schema
MIN_SEGMENT_DURATION = 15 # seconds
MAX_SEGMENT_DURATION = 5 * 60 # seconds
@pandas_udf(features_schema, PandasUDFType.GROUPED_MAP)
def get_max_vals_features(df):
if df['candidate'].iloc[0] == 1:
duration = (df['timestamp'].iloc[-1] - df['timestamp'].iloc[0]).seconds
if duration >= MIN_SEGMENT_DURATION and duration <= MAX_SEGMENT_DURATION:
return df
else:
return pd.DataFrame(columns=df.columns)
else:
return pd.DataFrame(columns=df.columns)
return ds.compute(get_max_vals_features, groupByColumnName=["group"])
[docs]def reorder_columns(ds):
feature_names = ['accelerometer_x', 'accelerometer_y', 'accelerometer_z', 'max_accl', 'gyroscope_y', 'gyroscope_x',
'gyroscope_z', 'roll', 'pitch', 'yaw']
sensor_names = ['mean', 'median', 'stddev', 'skew', 'kurt', 'sqr', 'zero_cross_rate', "fft_centroid",
'fft_spread', 'spectral_entropy', 'spectral_entropy_old', 'fft_flux', 'spectral_folloff']
extra_features = ["ax_ay_corr", 'ax_az_corr', 'ay_az_corr', 'gx_gy_corr', 'gx_gz_corr', 'gy_gz_corr', 'ax_ay_mse',
'ax_az_mse', 'ay_az_mse', 'gx_gy_mse', 'gx_gz_mse', 'gy_gz_mse']
col_names = ["timestamp", "localtime", "user", "version", "start_time", "end_time", "duration"]
for fn in feature_names:
for sn in sensor_names:
col_names.append(fn + "_" + sn)
col_names.extend(extra_features)
ds = ds.select(*col_names)
ds = ds.orderBy("timestamp")
return ds
[docs]def classify_brushing(X: pd.DataFrame, model_file_name: str):
with open(model_file_name, 'rb') as handle:
clf = pickle.load(handle)
X = X.values
X = X[:, 6:]
preds = clf.predict(X)
return preds