Source code for cerebralcortex.algorithms.gps.clustering

# Copyright (c) 2019, MD2K Center of Excellence
# - Nasir Ali <nasir.ali08@gmail.com>, Md Azim Ullah <mullah@memphis.edu>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import numpy as np
import pandas as pd
from geopy.distance import great_circle
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.group import GroupedData
from pyspark.sql.types import StructField, StructType, DoubleType, IntegerType
from scipy.spatial import ConvexHull
from shapely.geometry.multipoint import MultiPoint
from sklearn.cluster import DBSCAN

from cerebralcortex.algorithms.utils.mprov_helper import CC_MProvAgg
from cerebralcortex.algorithms.utils.util import update_metadata
from cerebralcortex.core.datatypes import DataStream
from cerebralcortex.core.metadata_manager.stream.metadata import Metadata


[docs]def impute_gps_data(ds, accuracy_threashold:int=100): """ Inpute GPS data Args: ds (DataStream): Windowed/grouped DataStream object accuracy_threashold (int): Returns: DataStream object """ schema = ds._data.schema @pandas_udf(schema, PandasUDFType.GROUPED_MAP) def gps_imputer(data): data = data.sort_values('localtime').reset_index(drop=True) data['latitude'][data.accuracy > accuracy_threashold] = np.nan data['longitude'][data.accuracy > accuracy_threashold] = np.nan data = data.fillna(method='ffill').dropna() return data # check if datastream object contains grouped type of DataFrame if not isinstance(ds._data, GroupedData): raise Exception( "DataStream object is not grouped data type. Please use 'window' operation on datastream object before running this algorithm") data = ds._data.apply(gps_imputer) results = DataStream(data=data, metadata=Metadata()) metadta = update_metadata(stream_metadata=results.metadata, stream_name="gps--org.md2k.imputed", stream_desc="impute GPS data", module_name="cerebralcortex.algorithms.gps.clustering.impute_gps_data", module_version="1.0.0", authors=[{"Azim": "aungkonazim@gmail.com"}]) results.metadata = metadta return results
[docs]def cluster_gps(ds: DataStream, epsilon_constant:int = 1000, km_per_radian:int = 6371.0088, geo_fence_distance:int = 30, minimum_points_in_cluster:int = 1, latitude_column_name:str = 'latitude', longitude_column_name:str = 'longitude'): """ Cluster GPS data - Algorithm used to cluster GPS data is based on DBScan Args: ds (DataStream): Windowed/grouped DataStream object epsilon_constant (int): km_per_radian (int): geo_fence_distance (int): minimum_points_in_cluster (int): latitude_column_name (str): longitude_column_name (str): Returns: DataStream object """ centroid_id_name = 'centroid_id' features_list = [StructField('centroid_longitude', DoubleType()), StructField('centroid_latitude', DoubleType()), StructField('centroid_id', IntegerType()), StructField('centroid_area', DoubleType())] schema = StructType(ds._data._df.schema.fields + features_list) column_names = [a.name for a in schema.fields] def reproject(latitude, longitude): from math import pi, cos, radians earth_radius = 6371009 # in meters lat_dist = pi * earth_radius / 180.0 y = [lat * lat_dist for lat in latitude] x = [long * lat_dist * cos(radians(lat)) for lat, long in zip(latitude, longitude)] return np.column_stack((x, y)) def get_centermost_point(cluster: np.ndarray) -> object: """ Get center most point of a cluster Args: cluster (np.ndarray): Returns: """ try: if cluster.shape[0]>=3: points_project = reproject(cluster[:,0],cluster[:,1]) hull = ConvexHull(points_project) area = hull.area else: area = 1 except: area = 1 centroid = ( MultiPoint(cluster).centroid.x, MultiPoint(cluster).centroid.y) centermost_point = min(cluster, key=lambda point: great_circle(point, centroid).m) return list(centermost_point) + [area] @pandas_udf(schema, PandasUDFType.GROUPED_MAP) @CC_MProvAgg('gps--org.md2k.phonesensor--phone', 'gps_clustering', 'gps--org.md2k.clusters', ['user', 'timestamp'], ['user', 'timestamp']) def gps_clustering(data): if data.shape[0] < minimum_points_in_cluster: return pd.DataFrame([], columns=column_names) elif data.shape[0] < 2: data['centroid_area'] = 1 data['centroid_id'] = 0 data['centroid_latitude'] = data[latitude_column_name].values[0] data['centroid_longitude'] = data[longitude_column_name].values[0] return data coords = np.float64(data[[latitude_column_name, longitude_column_name]].values) epsilon = geo_fence_distance / ( epsilon_constant * km_per_radian) db = DBSCAN(eps=epsilon, min_samples= minimum_points_in_cluster, algorithm='ball_tree', metric='haversine').fit( np.radians(coords)) data[centroid_id_name] = db.labels_ cluster_labels = db.labels_ clusters = pd.Series( [coords[cluster_labels == n] for n in np.unique(cluster_labels)]) cluster_names = np.array([n for n in np.unique(cluster_labels)]) centermost_points = clusters.map(get_centermost_point) centermost_points = np.array(centermost_points) all_dict = [] for i, col in enumerate(cluster_names): cols = np.array(centermost_points[i]) all_dict.append([col, cols[0], cols[1], cols[2]]) temp_df = pd.DataFrame(all_dict, columns=[centroid_id_name, 'centroid_latitude', 'centroid_longitude', 'centroid_area']) data = pd.merge(data, temp_df, how='left', left_on=[centroid_id_name], right_on=[centroid_id_name]) return data # check if datastream object contains grouped type of DataFrame if not isinstance(ds._data, GroupedData): raise Exception( "DataStream object is not grouped data type. Please use 'window' operation on datastream object before running this algorithm") data = ds._data.apply(gps_clustering) results = DataStream(data=data, metadata=Metadata()) metadta = update_metadata(stream_metadata=results.metadata, stream_name="gps--org.md2k.clusters", stream_desc="GPS clusters computed using DBSCAN algorithm.", module_name="cerebralcortex.algorithms.gps.clustering.cluster_gps", module_version="1.0.0", authors=[{"Azim": "aungkonazim@gmail.com"}]) results.metadata = metadta return results