Source code for cerebralcortex.examples.brushing_detection

# TODO: incomplete code

import argparse

from cerebralcortex.kernel import Kernel
from cerebralcortex.markers.brushing.util import get_max_features, reorder_columns, classify_brushing
from cerebralcortex.markers.brushing.util import get_orientation_data, get_candidates, filter_candidates


[docs]def generate_candidates(CC, user_id, accel_stream_name, gyro_stream_name, output_stream_name): ds_accel = CC.get_stream(accel_stream_name, user_id=user_id) ds_gyro = CC.get_stream(gyro_stream_name, user_id=user_id) # interpolation - default interpolation is linear ds_accel_interpolated = ds_accel.interpolate() ds_gyro_interpolated = ds_gyro.interpolate() ##compute magnitude - xyz axis of accell and gyro ds_accel_magnitude = ds_accel_interpolated.compute_magnitude( col_names=["accelerometer_x", "accelerometer_y", "accelerometer_z"], magnitude_col_name="accel_magnitude") ds_gyro_magnitude = ds_gyro_interpolated.compute_magnitude(col_names=["gyroscope_x", "gyroscope_y", "gyroscope_z"], magnitude_col_name="gyro_magnitude") # join accel and gyro streams ds_ag = ds_accel_magnitude.join(ds_gyro_magnitude, on=['user', 'timestamp', 'localtime', 'version'], how='full').dropna() # get wrist orientation ds_ag_orientation = get_orientation_data(ds_ag, wrist="left") ## apply complementary filter - roll, pitch, yaw ds_ag_complemtary_filtered = ds_ag_orientation.complementary_filter() # get brushing candidate groups ds_ag_candidates = get_candidates(ds_ag_complemtary_filtered) # remove where group==0 - non-candidates ds_ag_candidates = filter_candidates(ds_ag_candidates) ## set metadata details for candidate stream ds_ag_candidates.metadata.name = output_stream_name ds_ag_candidates.metadata.description = "This stream contains the 'Brushing Candidates Based on Event'. group column contains the ids of each candidate group and candidate column represents whether a datapoint is a candidate or not." CC.save_stream(ds_ag_candidates, overwrite=True) print("Generated candidates stream.")
[docs]def generate_features(CC, user_id, candidate_stream_name, output_stream_name): ds_candidates = CC.get_stream(candidate_stream_name, user_id=user_id) ## compute features ds_fouriar_features = ds_candidates.compute_FFT_features( exclude_col_names=['group', 'candidate', "accel_magnitude", "gyro_magnitude"], groupByColumnName=["group"]) ds_statistical_features = ds_candidates.compute_statistical_features( exclude_col_names=['group', 'candidate', "accel_magnitude", "gyro_magnitude"], groupByColumnName=["group"], feature_names=['mean', 'median', 'stddev', 'skew', 'kurt', 'sqr', 'zero_cross_rate']) ds_corr_mse_features = ds_candidates.compute_corr_mse_accel_gyro( exclude_col_names=['group', 'candidate', "accel_magnitude", "gyro_magnitude"], groupByColumnName=["group"]) ds_features = ds_fouriar_features \ .join(ds_statistical_features, on=['user', 'timestamp', 'localtime', 'version', 'start_time', 'end_time'], how='full') \ .join(ds_corr_mse_features, on=['user', 'timestamp', 'localtime', 'version', 'start_time', 'end_time'], how='full') ds_features = ds_features.withColumn("duration", (ds_features.end_time.cast("long") - ds_features.start_time.cast("long"))) ds_features = get_max_features(ds_features) ## set metadata details for candidate stream ds_features.metadata.name = output_stream_name ds_features.metadata.description = "This stream contains the brushing features computed for each brushing candidate window." CC.save_stream(ds_features, overwrite=True) print("Generated brushing features.")
[docs]def predict_brushing(CC, user_id, features_stream_name): features = CC.get_stream(features_stream_name, user_id=user_id) features = reorder_columns(features) pdf_features = features.toPandas() pdf_predictions = classify_brushing(pdf_features, model_file_name="./model/AB_model_brushing_all_features.model") pdf_features['predictions'] = pdf_predictions detected_episodes = pdf_features.loc[pdf_features['predictions'] == 1] print("PREDICTED BRUSHING ESPISODES\n") print(detected_episodes[["start_time", "end_time"]])
if __name__ == "__main__": parser = argparse.ArgumentParser(description="Brushing episode detection") parser.add_argument('-c', '--config_dir', help='CC Configuration directory path', required=True) parser.add_argument('-a', '--accel_stream_name', help='Accel stream name', required=True) parser.add_argument('-g', '--gyro_stream_name', help='Gyro stream name', required=True) parser.add_argument('-w', '--wrist', help="wrist ('left', 'right')", required=True) parser.add_argument('-u', '--user_id', help='User ID. Optional if you want to process data for just one user', required=True) args = vars(parser.parse_args()) config_dir = str(args["config_dir"]).strip() accel_stream_name = str(args["accel_stream_name"]).strip() gyro_stream_name = str(args["gyro_stream_name"]).strip() wrist = str(args["wrist"]).strip() user_id = str(args["user_id"]).strip() CC = Kernel(config_dir, study_name="moral") candidate_stream_name = "brushing-candidates--org.md2k.motionsense--motion_sense--" + wrist + "_wrist" features_stream_name = "brushing-features--org.md2k.motionsense--motion_sense--" + wrist + "_wrist" generate_candidates(CC, user_id=user_id, accel_stream_name=accel_stream_name, gyro_stream_name=gyro_stream_name, output_stream_name=candidate_stream_name) generate_features(CC, user_id=user_id, candidate_stream_name=candidate_stream_name, output_stream_name=features_stream_name) predict_brushing(CC, user_id=user_id, features_stream_name=features_stream_name)