Source code for cerebralcortex.core.data_manager.raw.filebased_storage

# Copyright (c) 2019, MD2K Center of Excellence
# - Nasir Ali <nasir.ali08@gmail.com>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.


import os
from typing import List
from uuid import uuid4

import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from pyspark.sql import functions as F

from cerebralcortex.core.datatypes import DataStream


[docs]class FileBasedStorage(): def __init__(self): """ Constructor Args: obj (object): Object of Data class """ if self.new_study or self.study_name=="default": self._create_dir() def _get_storage_path(self, stream_name:str=None, version:int=None, user_id:str=None): """ Construct storage path for data Args: stream_name (str): name of a stream version (int): version number of stream data user_id (str): uuid of a user Returns: str: path where data shall be stored/searched """ dirpath = self.data_path+"study="+self.study_name+"/" if stream_name: dirpath += "stream={0}/".format(stream_name) if version: if "stream=" not in dirpath: raise ValueError("stream_name argument is missing.") else: dirpath += "version={0}/".format(str(version)) if user_id: if "stream=" not in dirpath or "version=" not in dirpath: raise ValueError("stream_name and/or version arguments are missing.") else: dirpath += "user={0}/".format(user_id) return dirpath def _path_exist(self, stream_name:str=None, version:int=None, user_id:str=None): """ Checks if a path exist Args: stream_name (str): name of a stream version (int): version number of stream data user_id (str): uuid of a user Returns: bool: true if path exist, false otherwise """ storage_path = self._get_storage_path(stream_name=stream_name, version=version, user_id=user_id) if self.nosql_store == "hdfs": status = self.fs.exists(storage_path) elif self.nosql_store=="filesystem": status = self.fs.path.exists(storage_path) else: raise Exception("Not supported File system") if status: return True else: return False def _ls_dir(self, stream_name:str=None, version:int=None, user_id:str=None): """ List the contents of a directory Args: stream_name (str): name of a stream version (int): version number of stream data user_id (str): uuid of a user Returns: list[str]: list of file and/or dir names """ storage_path = self._get_storage_path(stream_name=stream_name, version=version, user_id=user_id) if self.nosql_store == "hdfs": return self.fs.ls(storage_path) elif self.nosql_store=="filesystem": return self.fs.listdir(storage_path) else: raise Exception("Not supported File system") def _create_dir(self, stream_name:str=None, version:int=None, user_id:str=None): """ Creates a directory if it does not exist. Args: stream_name (str): name of a stream version (int): version number of stream data user_id (str): uuid of a user """ storage_path = self._get_storage_path(stream_name=stream_name, version=version, user_id=user_id) if self.nosql_store == "hdfs": if not self.fs.exists(storage_path): self.fs.mkdir(storage_path) return storage_path elif self.nosql_store=="filesystem": if not os.path.exists(storage_path): self.fs.makedirs(storage_path) return storage_path return None
[docs] def read_file(self, stream_name:str, version:str="latest", user_id:str=None)->object: """ Get stream data from storage system. Data would be return as pyspark DataFrame object Args: stream_name (str): name of a stream version (str): version of a stream. Acceptable parameters are all, latest, or a specific version of a stream (e.g., 2.0) (Default="all") user_id (str): id of a user Note: Please specify a version if you know the exact version of a stream. Getting all the stream data and then filtering versions won't be efficient. Returns: object: pyspark DataFrame object Raises: Exception: if stream name does not exist. """ if user_id is not None and version=="all": raise Exception("Not supported, yet. You can only request only one version of a stream associated with a user.") if stream_name!="" and stream_name is not None: if not self._path_exist(stream_name=stream_name): raise Exception(stream_name+" does not exist.") if version is not None and version!="all": data_path = self._get_storage_path(stream_name=stream_name, version=version, user_id=user_id) else: data_path = self._get_storage_path(stream_name=stream_name) df = self.sparkSession.read.load(data_path) if version is not None and version!="all": df = df.withColumn('version', F.lit(int(version))) if user_id is not None: df = df.withColumn('user', F.lit(str(user_id))) return df
[docs] def write_file(self, stream_name:str, data:DataStream.data, file_mode:str) -> bool: """ Write pyspark DataFrame to a file storage system Args: stream_name (str): name of the stream data (object): pyspark DataFrame object file_mode (str): write mode, append is currently supportes Returns: bool: True if data is stored successfully or throws an Exception. Raises: Exception: if DataFrame write operation fails """ data_path = self._get_storage_path(stream_name=stream_name) if isinstance(data, pd.DataFrame): try: table = pa.Table.from_pandas(data, preserve_index=False) pq.write_to_dataset(table, root_path=data_path, partition_cols=["version", "user"]) return True except Exception as e: raise Exception("Cannot store pandas dataframe: "+str(e)) else: try: data.write.partitionBy(["version","user"]).format('parquet').mode(file_mode).save(data_path) return True except Exception as e: raise Exception("Cannot store spark dataframe: "+str(e))
[docs] def write_pandas_to_parquet_file(self, df: pd, user_id: str, stream_name: str, stream_version:str) -> str: """ Convert pandas dataframe into pyarrow parquet format and store Args: df (pandas): pandas dataframe user_id (str): user id stream_name (str): name of a stream Returns: str: file_name of newly create parquet file """ table = pa.Table.from_pandas(df, preserve_index=False) file_id = str(uuid4().hex) + ".parquet" data_path = self._create_dir(stream_name=stream_name, version=stream_version, user_id=user_id) data_path = data_path+file_id pq.write_table(table, data_path) return data_path
#################################################################################################################
[docs] def is_study(self) -> bool: """ Returns true if study_name exists. Returns: bool: True if study_name exist False otherwise Examples: >>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.is_study() >>> True """ return self._path_exist()
[docs] def is_stream(self, stream_name: str) -> bool: """ Returns true if provided stream exists. Args: stream_name (str): name of a stream Returns: bool: True if stream_name exist False otherwise Examples: >>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.is_stream("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> True """ return self._path_exist(stream_name=stream_name)
[docs] def get_stream_versions(self, stream_name: str) -> list: """ Returns a list of versions available for a stream Args: stream_name (str): name of a stream Returns: list: list of int Raises: ValueError: if stream_name is empty or None Examples: >>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.get_stream_versions("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> [1, 2, 4] """ stream_path = self._get_storage_path(stream_name=stream_name) stream_versions = [] if self.is_stream(stream_name): all_streams = self._ls_dir(stream_name=stream_name) for strm in all_streams: stream_versions.append(strm.replace(stream_path,"").replace("version=","")) return stream_versions else: raise Exception(stream_name+" does not exist")
[docs] def list_streams(self)->List[str]: """ Get all the available stream names Returns: List[str]: list of available streams names Examples: >>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.list_streams() """ stream_path = self._get_storage_path() stream_names = [] all_streams = self._ls_dir() for strm in all_streams: stream_names.append(strm.replace(stream_path,"").replace("stream=","").replace("study="+self.study_name, "")) return stream_names
[docs] def list_users(self, stream_name:str, version:int=1)->List[str]: """ Get all the available stream names with metadata stream_name (str): name of a stream version (int): version of a stream Returns: List[str]: list of available user-ids for a giving stream version Examples: >>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.list_users() """ stream_path = self._get_storage_path(stream_name=stream_name, version=version) all_users = self._ls_dir(stream_name=stream_name, version=version) user_ids = [] for usr in all_users: user_ids.append(usr.replace(stream_path,"").replace("user=","").replace("study="+self.study_name, "")) return user_ids
[docs] def search_stream(self, stream_name)->List[str]: """ Find all the stream names similar to stream_name arg. For example, passing "location" argument will return all stream names that contain the word location Returns: List[str]: list of stream names similar to stream_name arg Examples: >>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.search_stream("battery") >>> ["BATTERY--org.md2k.motionsense--MOTION_SENSE_HRV--LEFT_WRIST", "BATTERY--org.md2k.phonesensor--PHONE".....] """ stream_path = self._get_storage_path() all_streams = self._ls_dir() stream_names = [] for strm in all_streams: if stream_name in strm: stream_names.append(strm.replace(stream_path,"").replace("stream=","")) return stream_names