Source code for cerebralcortex.markers.mcontain.daily_encounter_stats
# Copyright (c) 2019, MD2K Center of Excellence
# - Md Azim Ullah <mullah@memphis.edu>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# Copyright (c) 2019, MD2K Center of Excellence
# - Md Azim Ullah <mullah@memphis.edu>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import time
from datetime import datetime
import numpy as np
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructField, StructType, DoubleType, StringType, TimestampType, IntegerType
from cerebralcortex.core.datatypes import DataStream
from cerebralcortex.core.metadata_manager.stream.metadata import Metadata
from cerebralcortex.markers.mcontain.hourly_encounters import *
[docs]def generate_metadata_dailystats():
stream_metadata = Metadata()
stream_metadata.set_name('mcontain-md2k--daily-stats').set_description('Daily stats for website') \
.add_dataDescriptor(
DataDescriptor().set_name("start_time").set_type("timestamp").set_attribute("description", \
"Start time of the day in localtime")) \
.add_dataDescriptor(
DataDescriptor().set_name("end_time").set_type("timestamp").set_attribute("description", \
"End time of the day in localtime")) \
.add_dataDescriptor(
DataDescriptor().set_name("number_of_app_users").set_type("double").set_attribute("description", \
"Total number of app users")) \
.add_dataDescriptor(
DataDescriptor().set_name("encounter_per_user").set_type("double").set_attribute("description", \
"Average encounter per user")) \
.add_dataDescriptor(
DataDescriptor().set_name("total_covid_encounters").set_type("double").set_attribute("description", \
"Total covid encounters on the day")) \
.add_dataDescriptor(
DataDescriptor().set_name("maximum_concurrent_encounters").set_type("double").set_attribute("description", \
"Maximum concurrent encounters"))
stream_metadata.add_module(
ModuleMetadata().set_name('Daily encounter stats for all the users to be shown in website') \
.set_attribute("url", "https://mcontain.md2k.org").set_author(
"Md Azim Ullah", "mullah@memphis.edu"))
return stream_metadata
[docs]def generate_metadata_notif():
stream_metadata = Metadata()
stream_metadata.set_name('mcontain-md2k--user-notifications').set_description('Notification generated for the Covid-19 encountered users.') \
.add_dataDescriptor(
DataDescriptor().set_name("user").set_type("string").set_attribute("description", \
"user id")) \
.add_dataDescriptor(
DataDescriptor().set_name("timestamp").set_type("timestamp").set_attribute("description", \
"Unix timestamp when the message was generated")) \
.add_dataDescriptor(
DataDescriptor().set_name("localtime").set_type("timestamp").set_attribute("description", \
"Local timestamp when the message was generated.")) \
.add_dataDescriptor(
DataDescriptor().set_name("message").set_type("string").set_attribute("description", \
"Generated notification message")) \
.add_dataDescriptor(
DataDescriptor().set_name("day").set_type("timestamp").set_attribute("description", \
"day of the encounter")) \
.add_dataDescriptor(
DataDescriptor().set_name("version").set_type("int").set_attribute("description", \
"version"))
stream_metadata.add_module(
ModuleMetadata().set_name('Generated notification for a user encountered with Covid-19 participant') \
.set_attribute("url", "https://mcontain.md2k.org").set_author(
"Md Shiplu Hawlader", "shiplu.cse.du@gmail.com").set_version(1))
return stream_metadata
[docs]def generate_metadata_user_encounter_count():
stream_metadata = Metadata()
stream_metadata.set_name('mcontain-md2k--user--encounter-count').set_description('Number of encounter in a given time window') \
.add_dataDescriptor(
DataDescriptor().set_name("start_time").set_type("timestamp").set_attribute("description", \
"Start time of the time window in localtime")) \
.add_dataDescriptor(
DataDescriptor().set_name("end_time").set_type("timestamp").set_attribute("description", \
"End time of the time window in localtime")) \
.add_dataDescriptor(
DataDescriptor().set_name("encounter_count").set_type("int").set_attribute("description", \
"Total number of encounter for the user in the given time window"))
stream_metadata.add_module(
ModuleMetadata().set_name('Total number of encounter for a user in a given time window') \
.set_attribute("url", "https://mcontain.md2k.org").set_author(
"Md Shiplu Hawlader, Md Azim Ullah", "shiplu.cse.du@gmail.com, mullah@memphis.edu").set_version(1))
return stream_metadata
[docs]def generate_metadata_encounter_daily():
stream_metadata = Metadata()
stream_metadata.set_name('mcontain-md2k-encounter-daily--bluetooth-gps').set_description('Contains each unique encounters between two persons along with the location of encounter') \
.add_dataDescriptor(
DataDescriptor().set_name("start_time").set_type("timestamp").set_attribute("description", \
"Start time of the encounter in localtime")) \
.add_dataDescriptor(
DataDescriptor().set_name("end_time").set_type("timestamp").set_attribute("description", \
"End time of the encounter in localtime")) \
.add_dataDescriptor(
DataDescriptor().set_name("participant_identifier").set_type("string").set_attribute("description", \
"Participant with whom encounter happened")) \
.add_dataDescriptor(
DataDescriptor().set_name("os").set_type("string").set_attribute("description", \
"Operating system of the phone belonging to user")) \
.add_dataDescriptor(
DataDescriptor().set_name("latitude").set_type("double").set_attribute("description", \
"Latitude of encounter location")) \
.add_dataDescriptor(
DataDescriptor().set_name("longitude").set_type("double").set_attribute("description", \
"Longitude of encounter location")) \
.add_dataDescriptor(
DataDescriptor().set_name("durations").set_type("array").set_attribute("description", \
"Mean distance between participants in encounter")) \
.add_dataDescriptor(
DataDescriptor().set_name("covid").set_type("integer").set_attribute("description", \
"0, 1 or 2 indicating if this encounter contained a covid user -- 0 - no covid-19 affected, 1 - user is, 2 - participant identifier is"))
stream_metadata.add_module(
ModuleMetadata().set_name('Encounter computation after parsing raw bluetooth-gps data, clustering gps locations and removing double counting') \
.set_attribute("url", "https://mcontain.md2k.org").set_author(
"Md Azim Ullah", "mullah@memphis.edu"))
return stream_metadata
[docs]def generate_metadata_visualization_daily():
stream_metadata = Metadata()
stream_metadata.set_name('mcontain-md2k--visualization-stats--daily').set_description('Computes visualization stats every time window defined by start time and end time') \
.add_dataDescriptor(
DataDescriptor().set_name("start_time").set_type("timestamp").set_attribute("description", \
"Start time of the time window localtime")) \
.add_dataDescriptor(
DataDescriptor().set_name("end_time").set_type("timestamp").set_attribute("description", \
"End time of the time window in localtime")) \
.add_dataDescriptor(
DataDescriptor().set_name("latitude").set_type("double").set_attribute("description", \
"Latitude of centroid location, a gps cluster output grouping encounters in similar location together")) \
.add_dataDescriptor(
DataDescriptor().set_name("longitude").set_type("double").set_attribute("description", \
"Longitude of centroid location, a gps cluster output grouping encounters in similar location together")) \
.add_dataDescriptor(
DataDescriptor().set_name("n_users").set_type("integer").set_attribute("description", \
"Number of unique users in that cluster centroid")) \
.add_dataDescriptor(
DataDescriptor().set_name("total_encounters").set_type("double").set_attribute("description", \
"Total encounters happening in the time window in this specific location")) \
.add_dataDescriptor(
DataDescriptor().set_name("normalized_total_encounters").set_type("double").set_attribute("description", \
"Total encounters normalized by the centroid area. (encounters per 10 square meter)")) \
.add_dataDescriptor(
DataDescriptor().set_name("avg_encounters").set_type("double").set_attribute("description", \
"average encounter per participant(participants who had at least one encounter)"))
stream_metadata.add_module(
ModuleMetadata().set_name('Visualization stats computation in a day between start time and end time') \
.set_attribute("url", "https://mcontain.md2k.org").set_author(
"Md Azim Ullah", "mullah@memphis.edu"))
return stream_metadata
[docs]def drop_centroid_columns(data_result, centroid_present=True):
if centroid_present:
columns = ['centroid_id',
'centroid_latitude',
'centroid_longitude',
'centroid_area']
for c in columns:
if c in data_result.columns:
data_result = data_result.drop(*[c])
return data_result
[docs]def assign_covid_user(data,covid_users):
if not isinstance(covid_users,list):
covid_users = [covid_users]
data = data.withColumn('covid',F.when(F.col('user').isin(covid_users), 1 ).when(F.col('participant_identifier').isin(covid_users), 2).otherwise(0))
return data
[docs]def remove_duplicate_encounters_day(data):
schema = StructType([StructField('timestamp', TimestampType()),
StructField('localtime', TimestampType()),
StructField('user', StringType()),
StructField('participant_identifier', StringType()),
StructField('version', IntegerType()),
StructField('os', StringType()),
StructField('latitude', DoubleType()),
StructField('longitude', DoubleType()),
StructField('durations',DoubleType()),
StructField('covid', IntegerType()),
StructField('centroid_id', IntegerType()),
StructField('centroid_latitude', DoubleType()),
StructField('centroid_longitude', DoubleType()),
StructField('centroid_area', DoubleType())
])
columns = [a.name for a in schema]
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def compute_daily_encounter_stream(data):
data = data.sort_values('distance_mean').reset_index(drop=True)
df_final = pd.DataFrame([],columns=columns)
for i,row in data.iterrows():
temp_temp = df_final[df_final.participant_identifier.isin([row['user'],row['participant_identifier']]) & df_final.user.isin([row['user'],row['participant_identifier']])]
if temp_temp.shape[0]>0:
continue
temp_df = data[data.participant_identifier.isin([row['user'],row['participant_identifier']]) & data.user.isin([row['user'],row['participant_identifier']])]
save_this = temp_df[:1].reset_index(drop=True)
save_this['latitude'].iloc[0] = temp_df['latitude'].median()
save_this['longitude'].iloc[0] = temp_df['longitude'].median()
save_this['durations'] = 1
save_this['durations'].iloc[0] = np.sum([row['end_time']-row['start_time'] for i,row in temp_df.iterrows()])/3600
save_this.drop(columns=['start_time',
'end_time',
'distances',
'average_count',
'distance_mean',
'distance_std',
'distance_count'],axis=1,inplace=True)
df_final = pd.concat([df_final,save_this])
return df_final
columns = [a.name for a in schema]
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def compute_daily_encounter_stream_v2(data):
data = data.sort_values('durations').reset_index(drop=True)
df_final = pd.DataFrame([],columns=columns)
for i,row in data.iterrows():
temp_temp = df_final[df_final.participant_identifier.isin([row['user'],row['participant_identifier']]) & df_final.user.isin([row['user'],row['participant_identifier']])]
if temp_temp.shape[0]>0:
continue
temp_df = data[data.participant_identifier.isin([row['user'],row['participant_identifier']]) & data.user.isin([row['user'],row['participant_identifier']])]
save_this = temp_df[:1].reset_index(drop=True)
save_this['latitude'].iloc[0] = temp_df['latitude'].median()
save_this['longitude'].iloc[0] = temp_df['longitude'].median()
save_this['durations'].iloc[0] = temp_df['durations'].sum()
df_final = pd.concat([df_final,save_this])
return df_final
data_gps = data.withColumn('start_time',F.col('start_time').cast('double')).withColumn('end_time',F.col('end_time').cast('double'))
data_final = data_gps.groupBy(['version','user']).apply(compute_daily_encounter_stream)
data_final = data_final.groupBy(['version']).apply(compute_daily_encounter_stream_v2)
return DataStream(data=data_final,metadata=Metadata())
[docs]def get_notifications(encounter_final_data_with_gps,day,multiplier=10,column_name = 'total_encounters',metric_threshold=1):
schema = StructType(list([StructField('timestamp',TimestampType()),
StructField('localtime',TimestampType()),
StructField('start_time',TimestampType()),
StructField('end_time',TimestampType()),
StructField('participant_identifier',StringType()),
StructField('os',StringType()),
StructField('latitude',DoubleType()),
StructField('distances',ArrayType(DoubleType())),
StructField('longitude',DoubleType()),
StructField('average_count',DoubleType()),
StructField('distance_mean',DoubleType()),
StructField('distance_std',DoubleType()),
StructField('distance_count',DoubleType()),
StructField('covid',IntegerType()),
StructField('version',IntegerType()),
StructField('avg_encounters',DoubleType()),
StructField('unique_users',IntegerType()),
StructField('total_encounters',DoubleType()),
StructField('normalized_total_encounters',DoubleType()),
StructField('user',StringType()),
StructField('centroid_longitude',DoubleType()),
StructField('centroid_latitude',DoubleType()),
StructField('centroid_id',IntegerType()),
StructField('centroid_area',DoubleType())]))
column_names = [a.name for a in schema.fields]
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def compute_cluster_metrics(data):
if data.shape[0]==0:
return pd.DataFrame([],columns=column_names)
data = data.sort_values('start_time').reset_index(drop=True)
unique_users = np.unique(list(data['user'].unique())+list(data['participant_identifier'].unique()))
total_encounters = data.shape[0]
average_encounter = (total_encounters*2)/len(unique_users)
data['unique_users'] = unique_users.shape[0]
data['avg_encounters'] = average_encounter
data['total_encounters'] = total_encounters
data['normalized_total_encounters'] = total_encounters*multiplier/data['centroid_area'].iloc[0]
return data[column_names]
encounter_final_data_with_gps = encounter_final_data_with_gps.filter(F.col('centroid_area')>1)
encounter_final_data_with_gps = encounter_final_data_with_gps.withColumn('hour',F.hour('start_time'))
encounter_personal_data = encounter_final_data_with_gps.groupBy(['centroid_id','version','hour']).apply(compute_cluster_metrics)
drop_columns = ['os','latitude','distances',
'longitude','average_count',
'distance_mean','distance_std',
'distance_count','covid']
encounter_personal_data_filtered = encounter_personal_data.filter(F.col(column_name)>=metric_threshold).drop(*drop_columns)
encounter_personal_data_filtered_p1 = encounter_personal_data_filtered.withColumn('user_temp',
F.col('user')).withColumn('user',
F.col('participant_identifier')).withColumn('participant_identifier',
F.col('user_temp')).drop('user_temp')
encounter_all_data = encounter_personal_data_filtered.unionByName(encounter_personal_data_filtered_p1)
encounter_all_data_with_durations = encounter_all_data.withColumn('durations',(F.col('end_time').cast('double')-F.col('start_time').cast('double')).cast('double')/3600)
encounter_all_data_with_durations = encounter_all_data_with_durations.withColumn('hour',F.hour('start_time'))
columns = ['version','user','hour','centroid_id']
encounter_all_data_with_durations_all = encounter_all_data_with_durations.groupBy(columns).max()
columns1 = ['centroid_latitude','centroid_longitude','centroid_area','durations','total_encounters','normalized_total_encounters','unique_users','avg_encounters']
for c in columns1:
encounter_all_data_with_durations_all = encounter_all_data_with_durations_all.withColumn(c,F.col('max('+c+')'))
encounter_all_data_with_durations_all = encounter_all_data_with_durations_all.select(*(columns+columns1))
schema = StructType(list([StructField('version',IntegerType()),
StructField('avg_encounters',DoubleType()),
StructField('total_encounters',DoubleType()),
StructField('normalized_total_encounters',DoubleType()),
StructField('user',StringType()),
StructField('centroid_longitude',DoubleType()),
StructField('centroid_latitude',DoubleType()),
StructField('centroid_area',DoubleType()),
StructField('durations',DoubleType()),
StructField('unique_users',IntegerType())
]))
column_names = [a.name for a in schema.fields]
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def get_final_durations(data):
if data.shape[0]==0:
return pd.DataFrame([],columns=column_names)
data1 = data[:1].reset_index(drop=True)
data1['durations'].iloc[0] = np.double('{:.2f}'.format(data['durations'].sum()))
return data1[column_names]
final_data = encounter_all_data_with_durations_all.groupBy(['version','user','centroid_id']).apply(get_final_durations)
return DataStream(data=final_data,metadata=Metadata())
[docs]def generate_metadata_notification_daily():
stream_metadata = Metadata()
stream_metadata.set_name('mcontain-md2k--crowd--notification--daily').set_description('Computes notifications for each user who dwelled in a crowded hotspot') \
.add_dataDescriptor(
DataDescriptor().set_name("start_time").set_type("timestamp").set_attribute("description", \
"Start time of the time window localtime")) \
.add_dataDescriptor(
DataDescriptor().set_name("end_time").set_type("timestamp").set_attribute("description", \
"End time of the time window in localtime")) \
.add_dataDescriptor(
DataDescriptor().set_name("centroid_latitude").set_type("double").set_attribute("description", \
"Latitude of centroid location, a gps cluster output grouping encounters in similar location together")) \
.add_dataDescriptor(
DataDescriptor().set_name("centroid_longitude").set_type("double").set_attribute("description", \
"Longitude of centroid location, a gps cluster output grouping encounters in similar location together")) \
.add_dataDescriptor(
DataDescriptor().set_name("centroid_area").set_type("double").set_attribute("description", \
"area of centroid")) \
.add_dataDescriptor(
DataDescriptor().set_name("durations").set_type("double").set_attribute("description", \
"duration of stay in the centroid in hours")) \
.add_dataDescriptor(
DataDescriptor().set_name("unique_users").set_type("integer").set_attribute("description", \
"Number of unique users in that cluster centroid")) \
.add_dataDescriptor(
DataDescriptor().set_name("total_encounters").set_type("double").set_attribute("description", \
"Total encounters happening in the time window in this specific location")) \
.add_dataDescriptor(
DataDescriptor().set_name("normalized_total_encounters").set_type("double").set_attribute("description", \
"Total encounters normalized by the centroid area. (encounters per 10 square meter)")) \
.add_dataDescriptor(
DataDescriptor().set_name("avg_encounters").set_type("double").set_attribute("description", \
"average encounter per participant(participants who had at least one encounter)"))
stream_metadata.add_module(
ModuleMetadata().set_name('Notification messages to be shown to each user') \
.set_attribute("url", "https://mcontain.md2k.org").set_author(
"Md Azim Ullah", "mullah@memphis.edu"))
return stream_metadata
[docs]def get_utcoffset():
ts = time.time()
utc_offset = (datetime.utcfromtimestamp(ts) -
datetime.fromtimestamp(ts)).total_seconds()/3600
return utc_offset
[docs]def get_time_columns(encounter_final_data,start_time,end_time,utc_offset):
encounter_final_data = encounter_final_data.withColumn('localtime',F.lit(start_time).cast('timestamp'))
encounter_final_data = encounter_final_data.withColumn('start_time',F.lit(start_time).cast('timestamp'))
encounter_final_data = encounter_final_data.withColumn('end_time',F.lit(end_time).cast('timestamp'))
encounter_final_data = encounter_final_data.withColumn('timestamp',F.col('localtime')+F.expr("INTERVAL "+str(int(utc_offset))+" HOURS"))
return encounter_final_data