Source code for cerebralcortex.algorithms.ecg.autosense_data_quality

# Copyright (c) 2020, MD2K Center of Excellence
# All rights reserved.
# Md Azim Ullah (mullah@memphis.edu)
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructField, StructType, DoubleType, StringType, TimestampType, IntegerType

from cerebralcortex.algorithms.utils.mprov_helper import CC_MProvAgg
from cerebralcortex.core.datatypes import DataStream
from cerebralcortex.core.metadata_manager.stream.metadata import Metadata, DataDescriptor, \
    ModuleMetadata


[docs]def ecg_autosense_data_quality(ecg,Fs=64,sensor_name='autosense', outlier_threshold_high = 4000, outlier_threshold_low = 20, slope_threshold = 100, range_threshold=50, eck_threshold_band_loose = 400, window_size=3, acceptable_outlier_percent = 34 ): """ Some desc.. Args: ecg (DataStream): Fs (int): sensor_name (str): outlier_threshold_high (int): outlier_threshold_low (int): slope_threshold (int): range_threshold (int): eck_threshold_band_loose (int): window_size (int): acceptable_outlier_percent (int): Returns: DataStream - structure [timestamp, localtime, version.....] """ data_quality_band_loose = 'loose/improper attachment' data_quality_not_worn = 'sensor off body' data_quality_band_off = 'battery down/disconnected' data_quality_missing = 'intermittent data loss' data_quality_good = 'acceptable' stream_name = 'org.md2k.autosense.ecg.quality' def get_metadata(): stream_metadata = Metadata() stream_metadata.set_name(stream_name).set_description("Chest ECG quality 3 seconds") \ .add_input_stream(ecg.metadata.get_name()) \ .add_dataDescriptor(DataDescriptor().set_name("timestamp").set_type("datetime")) \ .add_dataDescriptor(DataDescriptor().set_name("localtime").set_type("datetime")) \ .add_dataDescriptor(DataDescriptor().set_name("version").set_type("int")) \ .add_dataDescriptor(DataDescriptor().set_name("user").set_type("string")) \ .add_dataDescriptor( DataDescriptor().set_name("quality").set_type("string") \ .set_attribute("description", "ECG data quality") \ .set_attribute('Loose/Improper Attachment','Electrode Displacement') \ .set_attribute('Sensor off Body', 'Autosense not worn') \ .set_attribute('Battery down/Disconnected', 'No data is present - Can be due to battery down or sensor disconnection') \ .set_attribute('Intermittent Data Loss','Not enough samples are present') \ .set_attribute('Acceptable','Good Quality')) \ .add_dataDescriptor( DataDescriptor().set_name("ecg").set_type("double").set_attribute("description", \ "ecg sample value")) \ .add_module( ModuleMetadata().set_name("ecg data quality").set_attribute("url", "http://md2k.org/").set_author( "Md Azim Ullah", "mullah@memphis.edu")) return stream_metadata def get_quality_autosense(data): """ Args: data: Returns: """ minimum_expected_samples = window_size*acceptable_outlier_percent*Fs/100 if (len(data)== 0): return data_quality_band_off if (len(data)<=minimum_expected_samples) : return data_quality_missing range_data = max(data)-min(data) if range_data<=range_threshold: return data_quality_not_worn if range_data<=eck_threshold_band_loose: return data_quality_band_loose outlier_counts = 0 for i in range(0,len(data)): im,ip = i,i if i==0: im = len(data)-1 else: im = i-1 if i == len(data)-1: ip = 0 else: ip = ip+1 stuck = ((data[i]==data[im]) and (data[i]==data[ip])) flip = ((abs(data[i]-data[im])>((int(outlier_threshold_high)))) or (abs(data[i]-data[ip])>((int(outlier_threshold_high))))) disc = ((abs(data[i]-data[im])>((int(slope_threshold)))) and (abs(data[i]-data[ip])>((int(slope_threshold))))) if disc: outlier_counts += 1 elif stuck: outlier_counts +=1 elif flip: outlier_counts +=1 elif data[i] >= outlier_threshold_high: outlier_counts +=1 elif data[i]<= outlier_threshold_low: outlier_counts +=1 if (100*outlier_counts>acceptable_outlier_percent*len(data)): return data_quality_band_loose return data_quality_good schema = StructType([ StructField("timestamp", TimestampType()), StructField("localtime", TimestampType()), StructField("version", IntegerType()), StructField("user", StringType()), StructField("quality", StringType()), StructField("ecg", DoubleType()) ]) @pandas_udf(schema, PandasUDFType.GROUPED_MAP) @CC_MProvAgg('ecg--org.md2k.autosense--autosense_chest--chest', 'ecg_autosense_data_quality', stream_name, ['user', 'timestamp'], ['user', 'timestamp']) def data_quality(data): """ Args: data: Returns: """ data['quality'] = '' if data.shape[0]>0: data = data.sort_values('timestamp') if sensor_name in ['autosense']: data['quality'] = get_quality_autosense(list(data['ecg'])) return data ecg_quality_stream = ecg.compute(data_quality,windowDuration=3,startTime='0 seconds') data = ecg_quality_stream._data ds = DataStream(data=data,metadata=get_metadata()) return ds