cerebralcortex.core.data_manager.raw package¶
Submodules¶
cerebralcortex.core.data_manager.raw.data module¶
cerebralcortex.core.data_manager.raw.filebased_storage module¶
-
class
FileBasedStorage
[source]¶ Bases:
object
-
get_stream_versions
(stream_name: str) → list[source]¶ Returns a list of versions available for a stream
Parameters: stream_name (str) – name of a stream Returns: list of int Return type: list Raises: ValueError
– if stream_name is empty or NoneExamples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.get_stream_versions("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> [1, 2, 4]
-
is_stream
(stream_name: str) → bool[source]¶ Returns true if provided stream exists.
Parameters: stream_name (str) – name of a stream Returns: True if stream_name exist False otherwise Return type: bool Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.is_stream("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> True
-
is_study
() → bool[source]¶ Returns true if study_name exists.
Returns: True if study_name exist False otherwise Return type: bool Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.is_study() >>> True
-
list_streams
() → List[str][source]¶ Get all the available stream names
Returns: list of available streams names Return type: List[str] Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.list_streams()
-
list_users
(stream_name: str, version: int = 1) → List[str][source]¶ Get all the available stream names with metadata
stream_name (str): name of a stream version (int): version of a stream
Returns: list of available user-ids for a giving stream version Return type: List[str] Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.list_users()
-
read_file
(stream_name: str, version: str = 'latest', user_id: str = None) → object[source]¶ Get stream data from storage system. Data would be return as pyspark DataFrame object :param stream_name: name of a stream :type stream_name: str :param version: version of a stream. Acceptable parameters are all, latest, or a specific version of a stream (e.g., 2.0) (Default=”all”) :type version: str :param user_id: id of a user :type user_id: str
Note
Please specify a version if you know the exact version of a stream. Getting all the stream data and then filtering versions won’t be efficient.
Returns: pyspark DataFrame object Return type: object Raises: Exception
– if stream name does not exist.
-
search_stream
(stream_name) → List[str][source]¶ Find all the stream names similar to stream_name arg. For example, passing “location” argument will return all stream names that contain the word location
Returns: list of stream names similar to stream_name arg Return type: List[str] Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.search_stream("battery") >>> ["BATTERY--org.md2k.motionsense--MOTION_SENSE_HRV--LEFT_WRIST", "BATTERY--org.md2k.phonesensor--PHONE".....]
-
write_file
(stream_name: str, data: <property object at 0x7f7d7f80e1d8>, file_mode: str) → bool[source]¶ Write pyspark DataFrame to a file storage system
Parameters: - stream_name (str) – name of the stream
- data (object) – pyspark DataFrame object
- file_mode (str) – write mode, append is currently supportes
Returns: True if data is stored successfully or throws an Exception.
Return type: bool
Raises: Exception
– if DataFrame write operation fails
-
write_pandas_to_parquet_file
(df: <module 'pandas' from '/home/docs/checkouts/readthedocs.org/user_builds/cerebralcortex-kernel/envs/latest/lib/python3.6/site-packages/pandas/__init__.py'>, user_id: str, stream_name: str, stream_version: str) → str[source]¶ Convert pandas dataframe into pyarrow parquet format and store
Parameters: - df (pandas) – pandas dataframe
- user_id (str) – user id
- stream_name (str) – name of a stream
Returns: file_name of newly create parquet file
Return type: str
-
cerebralcortex.core.data_manager.raw.sample_code_for_soujanya module¶
cerebralcortex.core.data_manager.raw.storage_blueprint module¶
-
class
BlueprintStorage
(obj)[source]¶ Bases:
object
This is a sample reference class. If you want to add another storage layer then the class must have following methods in it. read_file() write_file()
-
get_stream_metadata_hash
(stream_name: str) → list[source]¶ Get all the metadata_hash associated with a stream name.
Parameters: stream_name (str) – name of a stream Returns: list of all the metadata hashes Return type: list[str] Examples
>>> CC.get_stream_metadata_hash("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> ["00ab666c-afb8-476e-9872-6472b4e66b68", "15cc444c-dfb8-676e-3872-8472b4e66b12"]
-
get_stream_name
(metadata_hash: <module 'uuid' from '/home/docs/.pyenv/versions/3.6.8/lib/python3.6/uuid.py'>) → str[source]¶ metadata_hash are unique to each stream version. This reverse look can return the stream name of a metadata_hash.
Parameters: metadata_hash (uuid) – This could be an actual uuid object or a string form of uuid. Returns: name of a stream Return type: str Examples
>>> CC.get_stream_name("00ab666c-afb8-476e-9872-6472b4e66b68") >>> ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST
-
get_stream_versions
(stream_name: str) → list[source]¶ Returns a list of versions available for a stream
Parameters: stream_name (str) – name of a stream Returns: list of int Return type: list Raises: ValueError
– if stream_name is empty or NoneExamples
>>> CC.get_stream_versions("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> [1, 2, 4]
-
is_stream
(stream_name: str) → bool[source]¶ Returns true if provided stream exists.
Parameters: stream_name (str) – name of a stream Returns: True if stream_name exist False otherwise Return type: bool Examples
>>> CC.is_stream("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> True
-
list_streams
() → List[str][source]¶ Get all the available stream names with metadata
Returns: list of available streams metadata Return type: List[str] Examples
>>> CC = Kernel("/directory/path/of/configs/") >>> CC.list_streams()
-
read_file
(stream_name: str, version: str = 'all') → object[source]¶ Get stream data from storage system. Data would be return as pyspark DataFrame object :param stream_name: name of a stream :type stream_name: str :param version: version of a stream. Acceptable parameters are all, latest, or a specific version of a stream (e.g., 2.0) (Default=”all”) :type version: str
Returns: pyspark DataFrame object Return type: object Raises: Exception
– if stream name does not exist.
-
search_stream
(stream_name)[source]¶ Find all the stream names similar to stream_name arg. For example, passing “location” argument will return all stream names that contain the word location
Returns: list of stream names similar to stream_name arg Return type: List[str] Examples
>>> CC.search_stream("battery") >>> ["BATTERY--org.md2k.motionsense--MOTION_SENSE_HRV--LEFT_WRIST", "BATTERY--org.md2k.phonesensor--PHONE".....]
-
write_file
(stream_name: str, data: cerebralcortex.core.datatypes.datastream.DataStream) → bool[source]¶ Write pyspark DataFrame to a data storage system :param stream_name: name of the stream :type stream_name: str :param data: pyspark DataFrame object :type data: object
Returns: True if data is stored successfully or throws an Exception. Return type: bool Raises: Exception
– if DataFrame write operation fails
-
cerebralcortex.core.data_manager.raw.stream_handler module¶
-
class
DataSet
[source]¶ Bases:
enum.Enum
An enumeration.
-
COMPLETE
= (1,)¶
-
ONLY_DATA
= (2,)¶
-
ONLY_METADATA
= 3¶
-
-
class
StreamHandler
[source]¶ Bases:
object
-
get_stream
(stream_name: str, version: str = 'latest', user_id: str = None, data_type=<DataSet.COMPLETE: (1, )>) → cerebralcortex.core.datatypes.datastream.DataStream[source]¶ Retrieve a data-stream with it’s metadata.
Parameters: - stream_name (str) – name of a stream
- version (str) – version of a stream. Acceptable parameters are latest, or a specific version of a stream (e.g., 2)
- user_id (str) – id of a user
- data_type (DataSet) – DataSet.COMPLETE returns both Data and Metadata. DataSet.ONLY_DATA returns only Data. DataSet.ONLY_METADATA returns only metadata of a stream. (Default=DataSet.COMPLETE)
Returns: contains Data and/or metadata
Return type: Raises: ValueError
– if stream name is empty or NoneNote
Please specify a version if you know the exact version of a stream. Getting all the stream data and then filtering versions won’t be efficient.
Examples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> ds = CC.get_stream("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> ds.data # an object of a dataframe >>> ds.metadata # an object of MetaData class >>> ds.get_metadata(version=1) # get the specific version metadata of a stream
-
save_stream
(datastream, overwrite=False) → bool[source]¶ Saves datastream raw data in selected NoSQL storage and metadata in MySQL.
Parameters: - datastream (DataStream) – a DataStream object
- overwrite (bool) – if set to true, whole existing datastream data will be overwritten by new data
Returns: True if stream is successfully stored or throws an exception
Return type: bool
Todo
Add functionality to store data in influxdb.
Raises: Exception
– log or throws exception if stream is not storedExamples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> ds = DataStream(dataframe, MetaData) >>> CC.save_stream(ds)
-
cerebralcortex.core.data_manager.raw.tedt module¶
cerebralcortex.core.data_manager.raw.util module¶
-
class
filesystem_helper
[source]¶ Bases:
object
-
create_dir
(dirpath: str, stream_name: str = None, version: int = None, user_id: str = None)[source]¶ Creates a directory if it does not exist.
Parameters: - dirpath (str) – base storage dir path
- stream_name (str) – name of a stream
- version (int) – version number of stream data
- user_id (str) – uuid of a user
-
ls_dir
(dirpath: str, stream_name: str = None, version: int = None, user_id: str = None)[source]¶ List the contents of a directory
Parameters: - dirpath (str) – base storage dir path
- stream_name (str) – name of a stream
- version (int) – version number of stream data
- user_id (str) – uuid of a user
Returns: list of file and/or dir names
Return type: list[str]
-
path_exist
(dirpath: str, stream_name: str = None, version: int = None, user_id: str = None)[source]¶ Checks if a path exist
Parameters: - dirpath (str) – base storage dir path
- stream_name (str) – name of a stream
- version (int) – version number of stream data
- user_id (str) – uuid of a user
Returns: true if path exist, false otherwise
Return type: bool
-
-
class
hdfs_helper
[source]¶ Bases:
object
-
create_dir
(dirpath: str, stream_name: str = None, version: int = None, user_id: str = None)[source]¶ Creates a directory if it does not exist.
Parameters: - dirpath (str) – base storage dir path
- stream_name (str) – name of a stream
- version (int) – version number of stream data
- user_id (str) – uuid of a user
-
hdfs_conn
= ''¶
-
ls_dir
(dirpath: str, stream_name: str = None, version: int = None, user_id: str = None)[source]¶ List the contents of a directory
Parameters: - dirpath (str) – base storage dir path
- stream_name (str) – name of a stream
- version (int) – version number of stream data
- user_id (str) – uuid of a user
Returns: list of file and/or dir names
Return type: list[str]
-
path_exist
(dirpath: str, stream_name: str = None, version: int = None, user_id: str = None)[source]¶ Checks if a path exist
Parameters: - dirpath (str) – base storage dir path
- stream_name (str) – name of a stream
- version (int) – version number of stream data
- user_id (str) – uuid of a user
Returns: true if path exist, false otherwise
Return type: bool
-