# Copyright (c) 2020, MD2K Center of Excellence
# All rights reserved.
# Md Azim Ullah (mullah@memphis.edu)
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import numpy as np
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructField, StructType, DoubleType, StringType, ArrayType, TimestampType, IntegerType
from scipy import signal
from scipy.stats import iqr
from cerebralcortex.algorithms.utils.mprov_helper import CC_MProvAgg
from cerebralcortex.core.datatypes import DataStream
from cerebralcortex.core.metadata_manager.stream.metadata import Metadata, DataDescriptor, \
ModuleMetadata
[docs]def get_hrv_features(rr_data,
acceptable_percentage=50,
window_length=60):
"""
Args:
rr_data (DataStream):
acceptable_percentage (int):
window_length (int):
Returns:
"""
stream_name = 'org.md2k.autosense.ecg.features'
def get_metadata():
stream_metadata = Metadata()
stream_metadata.set_name(stream_name).set_description("HRV Features from ECG RR interval") \
.add_input_stream(rr_data.metadata.get_name()) \
.add_dataDescriptor(
DataDescriptor()
.set_name("var")
.set_type("double")
.set_attribute("description","variance")) \
.add_dataDescriptor(
DataDescriptor()
.set_name("iqr")
.set_type("double")
.set_attribute("description","Inter Quartile Range")) \
.add_dataDescriptor(
DataDescriptor()
.set_name("mean")
.set_type("double")
.set_attribute("description","Mean RR Interval")) \
.add_dataDescriptor(
DataDescriptor()
.set_name("median")
.set_type("double")
.set_attribute("description","Median RR Interval")) \
.add_dataDescriptor(
DataDescriptor()
.set_name("80th")
.set_type("double")
.set_attribute("description","80th percentile RR Interval")) \
.add_dataDescriptor(
DataDescriptor()
.set_name("20th")
.set_type("double")
.set_attribute("description","20th percentile RR Interval")) \
.add_dataDescriptor(
DataDescriptor()
.set_name("heartrate")
.set_type("double")
.set_attribute("description","Heart Rate in BPM")) \
.add_dataDescriptor(
DataDescriptor()
.set_name("vlf")
.set_type("double")
.set_attribute("description","Very Low Frequency Energy")) \
.add_dataDescriptor(
DataDescriptor()
.set_name("lf")
.set_type("double")
.set_attribute("description","Low Frequency Energy")) \
.add_dataDescriptor(
DataDescriptor()
.set_name("hf")
.set_type("double")
.set_attribute("description","High Frequency Energy")) \
.add_dataDescriptor(
DataDescriptor()
.set_name("lfhf")
.set_type("double")
.set_attribute("description","Low frequency to High Frequency energy ratio")) \
.add_dataDescriptor(
DataDescriptor()
.set_name("window")
.set_type("struct")
.set_attribute("description","window start and end time in UTC")
.set_attribute('start','start of window')
.set_attribute('end','end of window')) \
.add_module(
ModuleMetadata().set_name("HRV Features from ECG RR Interval")
.set_attribute("url", "http://md2k.org/")
.set_attribute('algorithm','ecg feature computation')
.set_attribute('unit','ms')
.set_author("Md Azim Ullah", "mullah@memphis.edu"))
return stream_metadata
def get_rr_features(a):
return np.array([np.var(a),iqr(a),np.mean(a),np.median(a),np.percentile(a,80),np.percentile(a,20),60000/np.median(a)])
def frequencyDomain(RRints,
tmStamps,
band_type = None,
lf_bw = 0.11,
hf_bw = 0.1,
vlf= (0.003, 0.04),
lf = (0.04, 0.15),
hf = (0.15, 0.4)):
"""
Args:
RRints:
tmStamps:
band_type:
lf_bw:
hf_bw:
vlf:
lf:
hf:
Returns:
"""
NNs = RRints
tss = tmStamps
frequency_range = np.linspace(0.001, 1, 10000)
NNs = np.array(NNs)
NNs = NNs - np.mean(NNs)
result = signal.lombscargle(tss, NNs, frequency_range)
#Pwelch w/ zero pad
fxx = frequency_range
pxx = result
if band_type == 'adapted':
vlf_peak = fxx[np.where(pxx == np.max(pxx[np.logical_and(fxx >= vlf[0], fxx < vlf[1])]))[0][0]]
lf_peak = fxx[np.where(pxx == np.max(pxx[np.logical_and(fxx >= lf[0], fxx < lf[1])]))[0][0]]
hf_peak = fxx[np.where(pxx == np.max(pxx[np.logical_and(fxx >= hf[0], fxx < hf[1])]))[0][0]]
peak_freqs = (vlf_peak, lf_peak, hf_peak)
hf = (peak_freqs[2] - hf_bw/2, peak_freqs[2] + hf_bw/2)
lf = (peak_freqs[1] - lf_bw/2, peak_freqs[1] + lf_bw/2)
vlf = (0.003, lf[0])
if lf[0] < 0:
print('***Warning***: Adapted LF band lower bound spills into negative frequency range')
print('Lower thresold of LF band has been set to zero')
print('Adjust LF and HF bandwidths accordingly')
lf = (0, lf[1])
vlf = (0, 0)
elif hf[0] < 0:
print('***Warning***: Adapted HF band lower bound spills into negative frequency range')
print('Lower thresold of HF band has been set to zero')
print('Adjust LF and HF bandwidths accordingly')
hf = (0, hf[1])
lf = (0, 0)
vlf = (0, 0)
df = fxx[1] - fxx[0]
vlf_power = np.trapz(pxx[np.logical_and(fxx >= vlf[0], fxx < vlf[1])], dx = df)
lf_power = np.trapz(pxx[np.logical_and(fxx >= lf[0], fxx < lf[1])], dx = df)
hf_power = np.trapz(pxx[np.logical_and(fxx >= hf[0], fxx < hf[1])], dx = df)
totalPower = vlf_power + lf_power + hf_power
#Normalize and take log
vlf_NU_log = np.log((vlf_power / (totalPower - vlf_power)) + 1)
lf_NU_log = np.log((lf_power / (totalPower - vlf_power)) + 1)
hf_NU_log = np.log((hf_power / (totalPower - vlf_power)) + 1)
lfhfRation_log = np.log((lf_power / hf_power) + 1)
freqDomainFeats = {'VLF_Power': vlf_NU_log, 'LF_Power': lf_NU_log,
'HF_Power': hf_NU_log, 'LF/HF': lfhfRation_log}
return freqDomainFeats
schema = StructType([StructField("timestamp", TimestampType()),
StructField("start", TimestampType()),
StructField("end", TimestampType()),
StructField("localtime", TimestampType()),
StructField("version", IntegerType()),
StructField("user", StringType()),
StructField("features", ArrayType(DoubleType()))
])
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
@CC_MProvAgg('org.md2k.autosense.ecg.rr', 'get_hrv_features', stream_name, ['user', 'timestamp'], ['user', 'timestamp'])
def ecg_r_peak(key,data):
"""
Args:
key:
data:
Returns:
"""
if data.shape[0]>=acceptable_percentage*window_length/100:
data = data.sort_values('time')
data['time'] = 1000*data['time']
a = data['rr'].values
features = [np.double(np.array(list(get_rr_features(a))+list(frequencyDomain(np.array(a)/1000,np.cumsum(a)/1000).values())))]
data = data[:1]
data['features'] = features
data['start'] = [key[2]['start']]
data['end'] = [key[2]['end']]
data = data[['timestamp','localtime','version','user','start','end','features']]
return data
else:
return pd.DataFrame([],columns=['timestamp','localtime','version','user','features','start','end'])
rr_data = rr_data.withColumn('time',F.col('timestamp').cast('double'))
ecg_features = rr_data.compute(ecg_r_peak,windowDuration=window_length,startTime='0 seconds')
df = ecg_features.select('timestamp', F.struct('start', 'end').alias('window'), 'localtime','features','user','version')
df = df.withColumn('var', F.col('features').getItem(0))
df = df.withColumn('iqr', F.col('features').getItem(1))
df = df.withColumn('vlf', F.col('features').getItem(7))
df = df.withColumn('lf', F.col('features').getItem(8))
df = df.withColumn('hf', F.col('features').getItem(9))
df = df.withColumn('lfhf', F.col('features').getItem(10))
df = df.withColumn('mean', F.col('features').getItem(2))
df = df.withColumn('median', F.col('features').getItem(3))
df = df.withColumn('80th', F.col('features').getItem(4))
df = df.withColumn('20th', F.col('features').getItem(5))
ecg_features_final = df.withColumn('heartrate', F.col('features').getItem(6))
ecg_features_final = ecg_features_final.drop('features')
ecg_features_final.metadata = get_metadata()
feature_names = ['var','iqr','mean','median','80th','20th','heartrate','vlf','lf','hf','lfhf']
stress_features = ecg_features_final.withColumn('features',F.array([F.col(i) for i in feature_names]))
return stress_features