cerebralcortex.core.data_manager.raw package

Submodules

cerebralcortex.core.data_manager.raw.storage_blueprint module

class BlueprintStorage(obj)[source]

Bases: object

This is a sample reference class. If you want to add another storage layer then the class must have following methods in it. read_file() write_file()

read_file(stream_name: str, version: str = 'all') → object[source]

Get stream data from storage system. Data would be return as pyspark DataFrame object :param stream_name: name of a stream :type stream_name: str :param version: version of a stream. Acceptable parameters are all, latest, or a specific version of a stream (e.g., 2.0) (Default=”all”) :type version: str

Returns:pyspark DataFrame object
Return type:object
Raises:Exception – if stream name does not exist.
write_file(stream_name: str, data: cerebralcortex.core.datatypes.datastream.DataStream) → bool[source]

Write pyspark DataFrame to a data storage system :param stream_name: name of the stream :type stream_name: str :param data: pyspark DataFrame object :type data: object

Returns:True if data is stored successfully or throws an Exception.
Return type:bool
Raises:Exception – if DataFrame write operation fails

cerebralcortex.core.data_manager.raw.storage_filesystem module

class FileSystemStorage(obj)[source]

Bases: object

read_file(stream_name: str, version: str = 'all', user_id: str = None) → object[source]

Get stream data from storage system. Data would be return as pyspark DataFrame object :param stream_name: name of a stream :type stream_name: str :param version: version of a stream. Acceptable parameters are all, latest, or a specific version of a stream (e.g., 2.0) (Default=”all”) :type version: str :param user_id: id of a user :type user_id: str

Note

Please specify a version if you know the exact version of a stream. Getting all the stream data and then filtering versions won’t be efficient.

Returns:pyspark DataFrame object
Return type:object
Raises:Exception – if stream name does not exist.
write_file(stream_name: str, data: <property object at 0x7fb1664f7c28>, file_mode) → bool[source]

Write pyspark DataFrame to a file storage system

Parameters:
  • stream_name (str) – name of the stream
  • data (object) – pyspark DataFrame object
Returns:

True if data is stored successfully or throws an Exception.

Return type:

bool

Raises:

Exception – if DataFrame write operation fails

write_pandas_dataframe(stream_name, data)[source]
write_spark_dataframe(stream_name, data, file_mode)[source]

cerebralcortex.core.data_manager.raw.storage_hdfs module

class HDFSStorage(obj)[source]

Bases: object

read_file(stream_name: str, version: str = 'all', user_id: str = None) → object[source]

Get stream data from storage system. Data would be return as pyspark DataFrame object :param stream_name: name of a stream :type stream_name: str :param version: version of a stream. Acceptable parameters are all, latest, or a specific version of a stream (e.g., 2.0) (Default=”all”) :type version: str :param user_id: id of a user :type user_id: str

Note

Please specify a version if you know the exact version of a stream. Getting all the stream data and then filtering versions won’t be efficient.

Returns:pyspark DataFrame object
Return type:object
Raises:Exception – if stream name does not exist.
write_file(stream_name: str, data: <property object at 0x7fb1664f7c28>) → bool[source]

Write pyspark DataFrame to HDFS

Parameters:
  • stream_name (str) – name of the stream
  • data (object) – pyspark DataFrame object
Returns:

True if data is stored successfully or throws an Exception.

Return type:

bool

Raises:

Exception – if DataFrame write operation fails

write_pandas_dataframe(stream_name, data)[source]
write_spark_dataframe(stream_name, data)[source]

cerebralcortex.core.data_manager.raw.stream_handler module

class DataSet[source]

Bases: enum.Enum

An enumeration.

COMPLETE = (1,)
ONLY_DATA = (2,)
ONLY_METADATA = 3
class StreamHandler[source]

Bases: object

get_stream(stream_name: str, version: str, user_id: str = None, data_type=<DataSet.COMPLETE: (1, )>) → cerebralcortex.core.datatypes.datastream.DataStream[source]

Retrieve a data-stream with it’s metadata.

Parameters:
  • stream_name (str) – name of a stream
  • version (str) – version of a stream. Acceptable parameters are all, latest, or a specific version of a stream (e.g., 2.0) (Default=”all”)
  • user_id (str) – id of a user
  • data_type (DataSet) – DataSet.COMPLETE returns both Data and Metadata. DataSet.ONLY_DATA returns only Data. DataSet.ONLY_METADATA returns only metadata of a stream. (Default=DataSet.COMPLETE)
Returns:

contains Data and/or metadata

Return type:

DataStream

Raises:

ValueError – if stream name is empty or None

Note

Please specify a version if you know the exact version of a stream. Getting all the stream data and then filtering versions won’t be efficient.

Examples

>>> CC = CerebralCortex("/directory/path/of/configs/")
>>> ds = CC.get_stream("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST")
>>> ds.data # an object of a dataframe
>>> ds.metadata # an object of MetaData class
>>> ds.get_metadata(version=1) # get the specific version metadata of a stream
save_stream(datastream, file_mode='append', ingestInfluxDB=False, publishOnKafka=False) → bool[source]

Saves datastream raw data in selected NoSQL storage and metadata in MySQL.

Parameters:
  • datastream (DataStream) – a DataStream object
  • ingestInfluxDB (bool) – Setting this to True will ingest the raw data in InfluxDB as well that could be used to visualize data in Grafana
Returns:

True if stream is successfully stored or throws an exception

Return type:

bool

Todo

Add functionality to store data in influxdb.

Raises:Exception – log or throws exception if stream is not stored

Examples

>>> CC = CerebralCortex("/directory/path/of/configs/")
>>> ds = DataStream(dataframe, MetaData)
>>> CC.save_stream(ds)

Module contents