Source code for cerebralcortex.markers.mcontain.assign_covid_user

import argparse

from pyspark.sql import functions as F

from cerebralcortex.kernel import Kernel


[docs]def assign_covid_user(data,covid_users): if not isinstance(covid_users,list): covid_users = [covid_users] data = data.withColumn('covid',F.when(F.col('user').isin(covid_users), 1 ).when(F.col('participant_identifier').isin(covid_users), 2).otherwise(0)) return data
[docs]def make_CC_object(config_dir="/home/jupyter/cc3_conf/", study_name='mcontain'): CC = Kernel(config_dir, study_name=study_name) return CC
[docs]def save_data(CC,data_result,centroid_present=True,metadata=None): if centroid_present: columns = ['centroid_id', 'centroid_latitude', 'centroid_longitude', 'centroid_area'] for c in columns: if c in data_result.columns: data_result = data_result.drop(*[c]) data_result.metadata = metadata CC.save_stream(data_result,overwrite=False) return True
if __name__ == "__main__": parser = argparse.ArgumentParser(description="Assign users as Covid-19 affected") parser.add_argument('-c', '--users', help='list of user ids', required=True) parser.add_argument('-c', '--input_stream_name', help='Encounter stream name', required=True) parser.add_argument('-c', '--config_dir', help='CC Configuration directory path', required=True) args = vars(parser.parse_args()) users = args["users"] config_dir = str(args["config_dir"]).strip() input_stream_name = str(args["input_stream_name"]).strip() CC = make_CC_object(config_dir) data = CC.get_stream(input_stream_name) metadata = data.metadata data = assign_covid_user(data,users) save_data(CC,data,centroid_present=False,metadata=metadata)