cerebralcortex.core.data_manager.raw package¶
Submodules¶
cerebralcortex.core.data_manager.raw.data module¶
cerebralcortex.core.data_manager.raw.storage_blueprint module¶
-
class
BlueprintStorage
(obj)[source]¶ Bases:
object
This is a sample reference class. If you want to add another storage layer then the class must have following methods in it. read_file() write_file()
-
read_file
(stream_name: str, version: str = 'all') → object[source]¶ Get stream data from storage system. Data would be return as pyspark DataFrame object :param stream_name: name of a stream :type stream_name: str :param version: version of a stream. Acceptable parameters are all, latest, or a specific version of a stream (e.g., 2.0) (Default=”all”) :type version: str
Returns: pyspark DataFrame object Return type: object Raises: Exception
– if stream name does not exist.
-
write_file
(stream_name: str, data: cerebralcortex.core.datatypes.datastream.DataStream) → bool[source]¶ Write pyspark DataFrame to a data storage system :param stream_name: name of the stream :type stream_name: str :param data: pyspark DataFrame object :type data: object
Returns: True if data is stored successfully or throws an Exception. Return type: bool Raises: Exception
– if DataFrame write operation fails
-
cerebralcortex.core.data_manager.raw.storage_filesystem module¶
-
class
FileSystemStorage
(obj)[source]¶ Bases:
object
-
read_file
(stream_name: str, version: str = 'all', user_id: str = None) → object[source]¶ Get stream data from storage system. Data would be return as pyspark DataFrame object :param stream_name: name of a stream :type stream_name: str :param version: version of a stream. Acceptable parameters are all, latest, or a specific version of a stream (e.g., 2.0) (Default=”all”) :type version: str :param user_id: id of a user :type user_id: str
Note
Please specify a version if you know the exact version of a stream. Getting all the stream data and then filtering versions won’t be efficient.
Returns: pyspark DataFrame object Return type: object Raises: Exception
– if stream name does not exist.
-
write_file
(stream_name: str, data: <property object at 0x7fb1664f7c28>, file_mode) → bool[source]¶ Write pyspark DataFrame to a file storage system
Parameters: - stream_name (str) – name of the stream
- data (object) – pyspark DataFrame object
Returns: True if data is stored successfully or throws an Exception.
Return type: bool
Raises: Exception
– if DataFrame write operation fails
-
cerebralcortex.core.data_manager.raw.storage_hdfs module¶
-
class
HDFSStorage
(obj)[source]¶ Bases:
object
-
read_file
(stream_name: str, version: str = 'all', user_id: str = None) → object[source]¶ Get stream data from storage system. Data would be return as pyspark DataFrame object :param stream_name: name of a stream :type stream_name: str :param version: version of a stream. Acceptable parameters are all, latest, or a specific version of a stream (e.g., 2.0) (Default=”all”) :type version: str :param user_id: id of a user :type user_id: str
Note
Please specify a version if you know the exact version of a stream. Getting all the stream data and then filtering versions won’t be efficient.
Returns: pyspark DataFrame object Return type: object Raises: Exception
– if stream name does not exist.
-
write_file
(stream_name: str, data: <property object at 0x7fb1664f7c28>) → bool[source]¶ Write pyspark DataFrame to HDFS
Parameters: - stream_name (str) – name of the stream
- data (object) – pyspark DataFrame object
Returns: True if data is stored successfully or throws an Exception.
Return type: bool
Raises: Exception
– if DataFrame write operation fails
-
cerebralcortex.core.data_manager.raw.stream_handler module¶
-
class
DataSet
[source]¶ Bases:
enum.Enum
An enumeration.
-
COMPLETE
= (1,)¶
-
ONLY_DATA
= (2,)¶
-
ONLY_METADATA
= 3¶
-
-
class
StreamHandler
[source]¶ Bases:
object
-
get_stream
(stream_name: str, version: str, user_id: str = None, data_type=<DataSet.COMPLETE: (1, )>) → cerebralcortex.core.datatypes.datastream.DataStream[source]¶ Retrieve a data-stream with it’s metadata.
Parameters: - stream_name (str) – name of a stream
- version (str) – version of a stream. Acceptable parameters are all, latest, or a specific version of a stream (e.g., 2.0) (Default=”all”)
- user_id (str) – id of a user
- data_type (DataSet) – DataSet.COMPLETE returns both Data and Metadata. DataSet.ONLY_DATA returns only Data. DataSet.ONLY_METADATA returns only metadata of a stream. (Default=DataSet.COMPLETE)
Returns: contains Data and/or metadata
Return type: Raises: ValueError
– if stream name is empty or NoneNote
Please specify a version if you know the exact version of a stream. Getting all the stream data and then filtering versions won’t be efficient.
Examples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> ds = CC.get_stream("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> ds.data # an object of a dataframe >>> ds.metadata # an object of MetaData class >>> ds.get_metadata(version=1) # get the specific version metadata of a stream
-
save_stream
(datastream, file_mode='append', ingestInfluxDB=False, publishOnKafka=False) → bool[source]¶ Saves datastream raw data in selected NoSQL storage and metadata in MySQL.
Parameters: - datastream (DataStream) – a DataStream object
- ingestInfluxDB (bool) – Setting this to True will ingest the raw data in InfluxDB as well that could be used to visualize data in Grafana
Returns: True if stream is successfully stored or throws an exception
Return type: bool
Todo
Add functionality to store data in influxdb.
Raises: Exception
– log or throws exception if stream is not storedExamples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> ds = DataStream(dataframe, MetaData) >>> CC.save_stream(ds)
-