Source code for cerebralcortex.core.data_manager.sql.stream_handler

# Copyright (c) 2019, MD2K Center of Excellence
# - Nasir Ali <nasir.ali08@gmail.com>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import uuid
from typing import List

from cerebralcortex.core.data_manager.sql.orm_models import Stream
from cerebralcortex.core.metadata_manager.stream.metadata import Metadata


[docs]class StreamHandler: ################################################################### ################## STORE DATA METHODS ############################# ###################################################################
[docs] def save_stream_metadata(self, metadata_obj) -> dict: """ Update a record if stream already exists or insert a new record otherwise. Args: metadata_obj (Metadata): stream metadata Returns: dict: {"status": True/False,"verion":version} Raises: Exception: if fail to insert/update record in MySQL. Exceptions are logged in a log file """ isQueryReady = 0 if isinstance(metadata_obj, Metadata): metadata_hash = metadata_obj.get_hash() metadata_obj = metadata_obj.to_json() else: raise Exception("Metadata is not type of MetaData object class.") stream_name = metadata_obj.get("name") is_metadata_changed = self._is_metadata_changed(stream_name, metadata_hash) status = is_metadata_changed.get("status") version = is_metadata_changed.get("version") if (status == "exist"): return {"status": True, "version": version, "record_type": "exist"} if (status == "new"): stream = Stream(name=stream_name.lower(), version=version, study_name=self.study_name, metadata_hash=str(metadata_hash), stream_metadata=metadata_obj) isQueryReady = 1 # if nothing is changed then isQueryReady would be 0 and no database transaction would be performed if isQueryReady == 1: try: self.session.add(stream) self.session.commit() return {"status": True, "version": version, "record_type": "new"} except Exception as e: raise Exception(e)
def _is_metadata_changed(self, stream_name, metadata_hash) -> dict: """ Checks whether metadata_hash already exist in the system . Args: stream_name (str): name of a stream metadata_hash (str): hashed form of stream metadata Raises: Exception: if MySQL query fails Returns: dict: {"version": "version_number", "status":"new" OR "exist"} """ rows = self.session.query(Stream).filter(Stream.metadata_hash==metadata_hash).first() if rows: return {"version": int(rows.version), "status": "exist"} else: stream_versions = self.get_stream_versions(stream_name) if bool(stream_versions): version = max(stream_versions) + 1 return {"version": version, "status": "new"} else: return {"version": 1, "status": "new"} ################################################################### ################## GET DATA METHODS ############################### ###################################################################
[docs] def get_stream_metadata_by_name(self, stream_name: str, version:int) -> Metadata: """ Get a list of metadata for all versions available for a stream. Args: stream_name (str): name of a stream version (int): version of a stream. Acceptable parameters are all, latest, or a specific version of a stream (e.g., 2.0) (Default="all") Returns: Metadata: Returns an empty list if no metadata is available for a stream_name or a list of metadata otherwise. Raises: ValueError: stream_name cannot be None or empty. Examples: >>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.list_users("mperf") >>> [Metadata] # list of MetaData class objects """ if stream_name is None or stream_name=="": raise ValueError("stream_name cannot be None or empty.") rows = self.session.query(Stream.stream_metadata).filter((Stream.name == stream_name) & (Stream.version==version) & (Stream.study_name==self.study_name)).first() if rows: return Metadata().from_json_file(rows.stream_metadata) else: return None
[docs] def list_streams(self)->List[Metadata]: """ Get all the available stream names with metadata Returns: List[Metadata]: list of available streams metadata [{name:"", metadata:""}...] Examples: >>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.list_streams() """ rows = self.session.query(Stream.stream_metadata).filter(Stream.study_name == self.study_name).all() results = [] if rows: for row in rows: results.append(Metadata().from_json_file(row.stream_metadata)) return results else: return results
[docs] def search_stream(self, stream_name): """ Find all the stream names similar to stream_name arg. For example, passing "location" argument will return all stream names that contain the word location Returns: List[str]: list of stream names similar to stream_name arg Examples: >>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.search_stream("battery") >>> ["BATTERY--org.md2k.motionsense--MOTION_SENSE_HRV--LEFT_WRIST", "BATTERY--org.md2k.phonesensor--PHONE".....] """ rows = self.session.query(Stream.name).filter(Stream.name.ilike('%'+stream_name+'%')).all() if rows: return rows else: return []
[docs] def get_stream_versions(self, stream_name: str) -> list: """ Returns a list of versions available for a stream Args: stream_name (str): name of a stream Returns: list: list of int Raises: ValueError: if stream_name is empty or None Examples: >>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.get_stream_versions("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> [1, 2, 4] """ if not stream_name: raise ValueError("Stream_name is a required field.") rows = self.session.query(Stream.version).filter((Stream.name==stream_name) & (Stream.study_name==self.study_name)).all() results = [] if rows: for row in rows: results.append(row.version) return results else: return results
[docs] def get_stream_metadata_hash(self, stream_name: str) -> List: """ Get all the metadata_hash associated with a stream name. Args: stream_name (str): name of a stream Returns: list: list of all the metadata hashes with name and versions Examples: >>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.get_metadata_hash("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> [["stream_name", "version", "metadata_hash"]] """ if not stream_name: raise ValueError("stream_name are required field.") rows = self.session.query(Stream.name, Stream.version, Stream.metadata_hash).filter((Stream.name == stream_name) & (Stream.study_name==self.study_name)).all() if rows: return rows else: return []
[docs] def get_stream_name(self, metadata_hash: uuid) -> str: """ metadata_hash are unique to each stream version. This reverse look can return the stream name of a metadata_hash. Args: metadata_hash (uuid): This could be an actual uuid object or a string form of uuid. Returns: str: name of a stream Examples: >>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.get_stream_name("00ab666c-afb8-476e-9872-6472b4e66b68") >>> ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST """ if not metadata_hash: raise ValueError("metadata_hash is a required field.") rows = self.session.query(Stream.name).filter((Stream.metadata_hash == metadata_hash) & (Stream.study_name==self.study_name)).first() if rows: return rows.name else: raise Exception(str(metadata_hash)+ " does not exist.")
[docs] def get_stream_metadata_by_hash(self, metadata_hash: uuid) -> List: """ metadata_hash are unique to each stream version. This reverse look can return the stream name of a metadata_hash. Args: metadata_hash (uuid): This could be an actual uuid object or a string form of uuid. Returns: List: [stream_name, metadata] Examples: >>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.get_stream_name("00ab666c-afb8-476e-9872-6472b4e66b68") >>> ["name" .....] # stream metadata and other information """ if not metadata_hash: raise ValueError("metadata_hash is a required field.") rows = self.session.query(Stream.stream_metadata).filter( (Stream.metadata_hash == metadata_hash) & (Stream.study_name == self.study_name)).first() if rows: return rows.stream_metadata else: raise Exception(str(metadata_hash)+ " does not exist.")
[docs] def is_stream(self, stream_name: str) -> bool: """ Returns true if provided stream exists. Args: stream_name (str): name of a stream Returns: bool: True if stream_name exist False otherwise Examples: >>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.is_stream("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> True """ rows = self.session.query(Stream.name).filter( (Stream.name == stream_name) & (Stream.study_name == self.study_name)).first() if rows: return True else: return False