cerebralcortex.core.data_manager.raw package

Submodules

cerebralcortex.core.data_manager.raw.filebased_storage module

class FileBasedStorage[source]

Bases: object

get_stream_versions(stream_name: str) → list[source]

Returns a list of versions available for a stream

Parameters:stream_name (str) – name of a stream
Returns:list of int
Return type:list
Raises:ValueError – if stream_name is empty or None

Examples

>>> CC = Kernel("/directory/path/of/configs/", study_name="default")
>>> CC.get_stream_versions("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST")
>>> [1, 2, 4]
is_stream(stream_name: str) → bool[source]

Returns true if provided stream exists.

Parameters:stream_name (str) – name of a stream
Returns:True if stream_name exist False otherwise
Return type:bool

Examples

>>> CC = Kernel("/directory/path/of/configs/", study_name="default")
>>> CC.is_stream("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST")
>>> True
is_study() → bool[source]

Returns true if study_name exists.

Returns:True if study_name exist False otherwise
Return type:bool

Examples

>>> CC = Kernel("/directory/path/of/configs/", study_name="default")
>>> CC.is_study()
>>> True
list_streams() → List[str][source]

Get all the available stream names

Returns:list of available streams names
Return type:List[str]

Examples

>>> CC = Kernel("/directory/path/of/configs/", study_name="default")
>>> CC.list_streams()
list_users(stream_name: str, version: int = 1) → List[str][source]

Get all the available stream names with metadata

stream_name (str): name of a stream version (int): version of a stream

Returns:list of available user-ids for a giving stream version
Return type:List[str]

Examples

>>> CC = Kernel("/directory/path/of/configs/", study_name="default")
>>> CC.list_users()
read_file(stream_name: str, version: str = 'latest', user_id: str = None) → object[source]

Get stream data from storage system. Data would be return as pyspark DataFrame object :param stream_name: name of a stream :type stream_name: str :param version: version of a stream. Acceptable parameters are all, latest, or a specific version of a stream (e.g., 2.0) (Default=”all”) :type version: str :param user_id: id of a user :type user_id: str

Note

Please specify a version if you know the exact version of a stream. Getting all the stream data and then filtering versions won’t be efficient.

Returns:pyspark DataFrame object
Return type:object
Raises:Exception – if stream name does not exist.
search_stream(stream_name) → List[str][source]

Find all the stream names similar to stream_name arg. For example, passing “location” argument will return all stream names that contain the word location

Returns:list of stream names similar to stream_name arg
Return type:List[str]

Examples

>>> CC = Kernel("/directory/path/of/configs/", study_name="default")
>>> CC.search_stream("battery")
>>> ["BATTERY--org.md2k.motionsense--MOTION_SENSE_HRV--LEFT_WRIST", "BATTERY--org.md2k.phonesensor--PHONE".....]
write_file(stream_name: str, data: <property object at 0x7f7d7f80e1d8>, file_mode: str) → bool[source]

Write pyspark DataFrame to a file storage system

Parameters:
  • stream_name (str) – name of the stream
  • data (object) – pyspark DataFrame object
  • file_mode (str) – write mode, append is currently supportes
Returns:

True if data is stored successfully or throws an Exception.

Return type:

bool

Raises:

Exception – if DataFrame write operation fails

write_pandas_to_parquet_file(df: <module 'pandas' from '/home/docs/checkouts/readthedocs.org/user_builds/cerebralcortex-kernel/envs/latest/lib/python3.6/site-packages/pandas/__init__.py'>, user_id: str, stream_name: str, stream_version: str) → str[source]

Convert pandas dataframe into pyarrow parquet format and store

Parameters:
  • df (pandas) – pandas dataframe
  • user_id (str) – user id
  • stream_name (str) – name of a stream
Returns:

file_name of newly create parquet file

Return type:

str

cerebralcortex.core.data_manager.raw.sample_code_for_soujanya module

cerebralcortex.core.data_manager.raw.storage_blueprint module

class BlueprintStorage(obj)[source]

Bases: object

This is a sample reference class. If you want to add another storage layer then the class must have following methods in it. read_file() write_file()

get_stream_metadata_hash(stream_name: str) → list[source]

Get all the metadata_hash associated with a stream name.

Parameters:stream_name (str) – name of a stream
Returns:list of all the metadata hashes
Return type:list[str]

Examples

>>> CC.get_stream_metadata_hash("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST")
>>> ["00ab666c-afb8-476e-9872-6472b4e66b68", "15cc444c-dfb8-676e-3872-8472b4e66b12"]
get_stream_name(metadata_hash: <module 'uuid' from '/home/docs/.pyenv/versions/3.6.8/lib/python3.6/uuid.py'>) → str[source]

metadata_hash are unique to each stream version. This reverse look can return the stream name of a metadata_hash.

Parameters:metadata_hash (uuid) – This could be an actual uuid object or a string form of uuid.
Returns:name of a stream
Return type:str

Examples

>>> CC.get_stream_name("00ab666c-afb8-476e-9872-6472b4e66b68")
>>> ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST
get_stream_versions(stream_name: str) → list[source]

Returns a list of versions available for a stream

Parameters:stream_name (str) – name of a stream
Returns:list of int
Return type:list
Raises:ValueError – if stream_name is empty or None

Examples

>>> CC.get_stream_versions("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST")
>>> [1, 2, 4]
is_stream(stream_name: str) → bool[source]

Returns true if provided stream exists.

Parameters:stream_name (str) – name of a stream
Returns:True if stream_name exist False otherwise
Return type:bool

Examples

>>> CC.is_stream("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST")
>>> True
list_streams() → List[str][source]

Get all the available stream names with metadata

Returns:list of available streams metadata
Return type:List[str]

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> CC.list_streams()
read_file(stream_name: str, version: str = 'all') → object[source]

Get stream data from storage system. Data would be return as pyspark DataFrame object :param stream_name: name of a stream :type stream_name: str :param version: version of a stream. Acceptable parameters are all, latest, or a specific version of a stream (e.g., 2.0) (Default=”all”) :type version: str

Returns:pyspark DataFrame object
Return type:object
Raises:Exception – if stream name does not exist.
search_stream(stream_name)[source]

Find all the stream names similar to stream_name arg. For example, passing “location” argument will return all stream names that contain the word location

Returns:list of stream names similar to stream_name arg
Return type:List[str]

Examples

>>> CC.search_stream("battery")
>>> ["BATTERY--org.md2k.motionsense--MOTION_SENSE_HRV--LEFT_WRIST", "BATTERY--org.md2k.phonesensor--PHONE".....]
write_file(stream_name: str, data: cerebralcortex.core.datatypes.datastream.DataStream) → bool[source]

Write pyspark DataFrame to a data storage system :param stream_name: name of the stream :type stream_name: str :param data: pyspark DataFrame object :type data: object

Returns:True if data is stored successfully or throws an Exception.
Return type:bool
Raises:Exception – if DataFrame write operation fails

cerebralcortex.core.data_manager.raw.stream_handler module

class DataSet[source]

Bases: enum.Enum

An enumeration.

COMPLETE = (1,)
ONLY_DATA = (2,)
ONLY_METADATA = 3
class StreamHandler[source]

Bases: object

get_stream(stream_name: str, version: str = 'latest', user_id: str = None, data_type=<DataSet.COMPLETE: (1, )>) → cerebralcortex.core.datatypes.datastream.DataStream[source]

Retrieve a data-stream with it’s metadata.

Parameters:
  • stream_name (str) – name of a stream
  • version (str) – version of a stream. Acceptable parameters are latest, or a specific version of a stream (e.g., 2)
  • user_id (str) – id of a user
  • data_type (DataSet) – DataSet.COMPLETE returns both Data and Metadata. DataSet.ONLY_DATA returns only Data. DataSet.ONLY_METADATA returns only metadata of a stream. (Default=DataSet.COMPLETE)
Returns:

contains Data and/or metadata

Return type:

DataStream

Raises:

ValueError – if stream name is empty or None

Note

Please specify a version if you know the exact version of a stream. Getting all the stream data and then filtering versions won’t be efficient.

Examples

>>> CC = CerebralCortex("/directory/path/of/configs/")
>>> ds = CC.get_stream("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST")
>>> ds.data # an object of a dataframe
>>> ds.metadata # an object of MetaData class
>>> ds.get_metadata(version=1) # get the specific version metadata of a stream
save_stream(datastream, overwrite=False) → bool[source]

Saves datastream raw data in selected NoSQL storage and metadata in MySQL.

Parameters:
  • datastream (DataStream) – a DataStream object
  • overwrite (bool) – if set to true, whole existing datastream data will be overwritten by new data
Returns:

True if stream is successfully stored or throws an exception

Return type:

bool

Todo

Add functionality to store data in influxdb.

Raises:Exception – log or throws exception if stream is not stored

Examples

>>> CC = CerebralCortex("/directory/path/of/configs/")
>>> ds = DataStream(dataframe, MetaData)
>>> CC.save_stream(ds)

cerebralcortex.core.data_manager.raw.tedt module

cerebralcortex.core.data_manager.raw.util module

class filesystem_helper[source]

Bases: object

create_dir(dirpath: str, stream_name: str = None, version: int = None, user_id: str = None)[source]

Creates a directory if it does not exist.

Parameters:
  • dirpath (str) – base storage dir path
  • stream_name (str) – name of a stream
  • version (int) – version number of stream data
  • user_id (str) – uuid of a user
ls_dir(dirpath: str, stream_name: str = None, version: int = None, user_id: str = None)[source]

List the contents of a directory

Parameters:
  • dirpath (str) – base storage dir path
  • stream_name (str) – name of a stream
  • version (int) – version number of stream data
  • user_id (str) – uuid of a user
Returns:

list of file and/or dir names

Return type:

list[str]

path_exist(dirpath: str, stream_name: str = None, version: int = None, user_id: str = None)[source]

Checks if a path exist

Parameters:
  • dirpath (str) – base storage dir path
  • stream_name (str) – name of a stream
  • version (int) – version number of stream data
  • user_id (str) – uuid of a user
Returns:

true if path exist, false otherwise

Return type:

bool

class hdfs_helper[source]

Bases: object

create_dir(dirpath: str, stream_name: str = None, version: int = None, user_id: str = None)[source]

Creates a directory if it does not exist.

Parameters:
  • dirpath (str) – base storage dir path
  • stream_name (str) – name of a stream
  • version (int) – version number of stream data
  • user_id (str) – uuid of a user
hdfs_conn = ''
ls_dir(dirpath: str, stream_name: str = None, version: int = None, user_id: str = None)[source]

List the contents of a directory

Parameters:
  • dirpath (str) – base storage dir path
  • stream_name (str) – name of a stream
  • version (int) – version number of stream data
  • user_id (str) – uuid of a user
Returns:

list of file and/or dir names

Return type:

list[str]

path_exist(dirpath: str, stream_name: str = None, version: int = None, user_id: str = None)[source]

Checks if a path exist

Parameters:
  • dirpath (str) – base storage dir path
  • stream_name (str) – name of a stream
  • version (int) – version number of stream data
  • user_id (str) – uuid of a user
Returns:

true if path exist, false otherwise

Return type:

bool

class tmp[source]

Bases: object

get_storage_path(dirpath, stream_name, version, user_id)[source]

Module contents