Source code for cerebralcortex.algorithms.stress_prediction.stress_episodes

# Copyright (c) 2017, MD2K Center of Excellence
# All rights reserved.
# Anand
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import numpy as np
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructField, StructType, StringType, FloatType, TimestampType, IntegerType

from cerebralcortex.core.datatypes import DataStream
from cerebralcortex.algorithms.utils.mprov_helper import CC_MProvAgg
from cerebralcortex.core.metadata_manager.stream.metadata import Metadata, DataDescriptor, \
    ModuleMetadata


[docs]def compute_stress_episodes(ecg_stress_probability, macd_param_fast = 7, macd_param_slow = 19, macd_param_signal = 2, threshold_stressed = 0.36, threshold_not_stressed= 0.36): """ Compute stress episodes using MACD Args: ecg_stress_probability (DataStream): macd_param_fast (int): macd_param_slow (int): macd_param_signal (into): threshold_stressed (float): threshold_not_stressed (float): Returns: DataStream: with a column stress_episodes """ NOTSTRESS = "NOTSTRESSED" UNSURE = 'UNSURE' YESSTRESS = 'STRESSED' UNKNOWN = 'UNKNOWN' UNCLASSIFIED = 'UNCLASSIFIED' stream_name = "org.md2k.autosense.ecg.stress_episodes" def get_metadata(): stream_metadata = Metadata() stream_metadata.set_name(stream_name).set_description("Stress episodes computed using MACD formula.") \ .add_input_stream(ecg_stress_probability.metadata.get_name()) \ .add_dataDescriptor(DataDescriptor().set_name("timestamp").set_type("datetime")) \ .add_dataDescriptor(DataDescriptor().set_name("localtime").set_type("datetime")) \ .add_dataDescriptor(DataDescriptor().set_name("version").set_type("int")) \ .add_dataDescriptor(DataDescriptor().set_name("user").set_type("string")) \ .add_dataDescriptor( DataDescriptor().set_name("stress_probability").set_type("float")) \ .add_dataDescriptor( DataDescriptor().set_name("stress_episode").set_type("string").set_attribute("description", \ "stress episodes calculated using MACD")) \ .add_module( ModuleMetadata().set_name("cerebralcortex.algorithm.stress_prediction.stress_episodes.compute_stress_episodes") .set_attribute("url", "http://md2k.org/").set_author( "Anandatirtha Nandugudi", "dev@md2k.org")) return stream_metadata schema = StructType([ StructField("timestamp", TimestampType()), StructField("localtime", TimestampType()), StructField("stress_probability", FloatType()), StructField("stress_episode", StringType()), StructField("user", StringType()), StructField("version", IntegerType()) ]) @pandas_udf(schema, PandasUDFType.GROUPED_MAP) @CC_MProvAgg('org.md2k.autosense.ecg.stress.probability.imputed', 'stress_episodes_estimation', stream_name, ['user', 'timestamp'], ['user', 'timestamp']) def stress_episodes_estimation(stress_data: object) -> object: """ smooth stress probabilities and use MACD to label stress episodes Args: stress_data (pandas.df): Returns: pandas.df """ smoothing_window = 3 # FIXME - 3 minutes user_id = stress_data.iloc[0].user # Smooth the stress values stress_smoothed_list = [] for indx in range(len(stress_data['stress_probability'].values)): if indx<2: smoothed_stress = stress_data['stress_probability'].values[indx] else: smoothed_stress = (stress_data['stress_probability'].values[indx] + \ stress_data['stress_probability'].values[indx-1] + \ stress_data['stress_probability'].values[indx-2]) / smoothing_window stress_smoothed_list.append((stress_data['timestamp'].values[indx], smoothed_stress, stress_data['localtime'].values[indx])) # EMA = Exponential Moving Average ema_fast_list = [] ema_fast_list.append(stress_smoothed_list[0]) ema_slow_list = [] ema_slow_list.append(stress_smoothed_list[0]) ema_signal_list = [] ema_signal_list.append((0,0)) histogram_list = [] stress_episode_start = [] stress_episode_peak = [] stress_episode_classification = [] stress_episode_intervals = [] for indx in range(len(stress_smoothed_list)): ema_fast_prev = ema_fast_list[-1][1] ema_fast_current = stress_smoothed_list[indx][1] ema_fast = ewma(ema_fast_current, ema_fast_prev, 2.0/(macd_param_fast + 1)) ema_fast_list.append((stress_smoothed_list[indx][0], ema_fast)) ema_slow_prev = ema_slow_list[-1][1] ema_slow_current = stress_smoothed_list[indx][1] ema_slow = ewma(ema_slow_current, ema_slow_prev, 2.0/(macd_param_slow + 1)) ema_slow_list.append((stress_smoothed_list[indx][0], ema_slow)) macd_prev = ema_fast_prev - ema_slow_prev macd_current = ema_fast_current - ema_slow_current ema_signal_prev = ema_signal_list[-1][1] ema_signal = ewma(macd_current, macd_prev, 2.0/(macd_param_signal + 1)) ema_signal_list.append((stress_smoothed_list[indx][0], ema_signal)) histogram_prev = macd_prev - ema_signal_prev histogram = macd_current - ema_signal histogram_list.append((stress_smoothed_list[indx][0], histogram)) if histogram_prev <=0 and histogram > 0: # Episode has ended, started increasing again start_timestamp = None; peak_timestamp = None; end_timestamp = stress_smoothed_list[indx][0] stress_class = None if len(stress_episode_start): start_timestamp = stress_episode_start[-1][0] if len(stress_episode_classification): peak_timestamp = stress_episode_classification[-1][0] stress_class = stress_episode_classification[-1][1] if stress_class: #TODO - Handle this????? stress_episode_timestamps = [] stress_episode_timestamps.append(start_timestamp) stress_episode_timestamps.append(peak_timestamp) stress_episode_timestamps.append(end_timestamp) stress_episode_timestamps.append(stress_class) stress_episode_intervals.append(stress_episode_timestamps) if histogram_prev >=0 and histogram < 0: # Episode is in the middle, started decreasing episode_start_timestamp = get_episode_start_timestamp(stress_episode_classification, histogram_list, stress_smoothed_list[indx][0]) if episode_start_timestamp is None: stress_episode_start.append((episode_start_timestamp, UNCLASSIFIED)) stress_episode_peak.append((stress_smoothed_list[indx][0], UNCLASSIFIED)) stress_episode_classification.append([stress_smoothed_list[indx][0], stress_smoothed_list[indx][2], stress_smoothed_list[indx][1], UNCLASSIFIED]) else: proportion_available = get_proportion_available(stress_data, episode_start_timestamp, stress_smoothed_list[indx][0]) if proportion_available < 0.5: stress_episode_start.append((episode_start_timestamp, UNKNOWN)) stress_episode_peak.append((stress_smoothed_list[indx][0], UNKNOWN)) stress_episode_classification.append([stress_smoothed_list[indx][0], stress_smoothed_list[indx][2], stress_smoothed_list[indx][1], UNKNOWN]) else: historical_stress = get_historical_values_timestamp_based(stress_smoothed_list, episode_start_timestamp, stress_smoothed_list[indx][0]) if not len(historical_stress): stress_episode_start.append((episode_start_timestamp, UNKNOWN)) stress_episode_peak.append((stress_smoothed_list[indx][0], UNKNOWN)) stress_episode_classification.append([stress_smoothed_list[indx][0], stress_smoothed_list[indx][2], stress_smoothed_list[indx][1], UNKNOWN]) else: cumu_sum = 0.0 for hs in historical_stress: cumu_sum += hs[1] stress_density = cumu_sum / len(historical_stress) if stress_density >= threshold_stressed: stress_episode_start.append((episode_start_timestamp, YESSTRESS)) stress_episode_peak.append((stress_smoothed_list[indx][0], YESSTRESS)) stress_episode_classification.append([stress_smoothed_list[indx][0], stress_smoothed_list[indx][2], stress_smoothed_list[indx][1], YESSTRESS]) elif stress_density <= threshold_not_stressed: stress_episode_start.append((episode_start_timestamp, NOTSTRESS)) stress_episode_peak.append((stress_smoothed_list[indx][0], NOTSTRESS)) stress_episode_classification.append([stress_smoothed_list[indx][0], stress_smoothed_list[indx][2], stress_smoothed_list[indx][1], NOTSTRESS]) else: stress_episode_start.append((episode_start_timestamp, UNSURE)) stress_episode_peak.append((stress_smoothed_list[indx][0], UNSURE)) stress_episode_classification.append([stress_smoothed_list[indx][0], stress_smoothed_list[indx][2], stress_smoothed_list[indx][1], UNSURE]) df = pd.DataFrame(stress_episode_classification, columns=['timestamp', 'localtime', 'stress_probability', 'stress_episode']) df['user'] = user_id df['version'] = 1 return df def ewma(current:int, previous:int, alpha:int)->float: """ compute exponential weighted moving average Args: current (int): current stress probability value previous (int): previous stress probability value alpha (int): Returns: float """ return alpha * current + (1 - alpha) * previous def get_episode_start_timestamp(stress_episode_classification, histogram_list, currenttime): """ Get start time of a stress episode Args: stress_episode_classification: histogram_list: currenttime: Returns: """ timestamp_prev = None if len(stress_episode_classification) >= 3: timestamp_prev = stress_episode_classification[-3][0] elif len(stress_episode_classification) == 2: timestamp_prev = stress_episode_classification[-2][0] elif len(stress_episode_classification) == 1: timestamp_prev = stress_episode_classification[-1][0] histogram_history = get_historical_values_timestamp_based(histogram_list, timestamp_prev, currenttime) if len(histogram_history) <= 1: return None for x in range(len(histogram_history)-2 , -1, -1): if histogram_history[x][1] <= 0: return histogram_history[x+1][0] return histogram_history[0][0] def get_historical_values_timestamp_based(data, start_timestamp, currenttime): """ Args: data: start_timestamp: currenttime: Returns: """ toreturn = [] starttime = start_timestamp if starttime == None: starttime = currenttime - np.timedelta64(100*365*24*3600, 's') # approx 100 year to approximate -1 for c in data: if c[0] >= starttime and c[0] <= currenttime: toreturn.append(c) if c[0] > currenttime: break return toreturn def get_proportion_available(data, st, current_timestamp): """ compute the ratio of (detected + imputed stress episodes)/detected stress episodes Args: data: st: current_timestamp: Returns: """ count = 0 available = 0 start_timestamp = st if start_timestamp is None: start_timestamp = current_timestamp for x in range(len(data)): row_time = data.iloc[x]['timestamp'] if row_time >= start_timestamp and row_time <= current_timestamp: available += data.iloc[x]['imputed'] count +=1 if row_time > current_timestamp: break if count: return available/count return 0 data = ecg_stress_probability._data.groupby(['user', 'version']).apply(stress_episodes_estimation) return DataStream(data=data, metadata=get_metadata())