CerebralCortex Core¶
Subpackages¶
- cerebralcortex.core package
- Subpackages
- cerebralcortex.core.config_manager package
- cerebralcortex.core.data_manager package
- Subpackages
- cerebralcortex.core.data_manager.object package
- cerebralcortex.core.data_manager.raw package
- Submodules
- cerebralcortex.core.data_manager.raw.data module
- cerebralcortex.core.data_manager.raw.storage_blueprint module
- cerebralcortex.core.data_manager.raw.storage_filesystem module
- cerebralcortex.core.data_manager.raw.storage_hdfs module
- cerebralcortex.core.data_manager.raw.stream_handler module
- Module contents
- cerebralcortex.core.data_manager.sql package
- Submodules
- cerebralcortex.core.data_manager.sql.cache_handler module
- cerebralcortex.core.data_manager.sql.data module
- cerebralcortex.core.data_manager.sql.data_ingestion_handler module
- cerebralcortex.core.data_manager.sql.kafka_offsets_handler module
- cerebralcortex.core.data_manager.sql.stream_handler module
- cerebralcortex.core.data_manager.sql.users_handler module
- Module contents
- cerebralcortex.core.data_manager.time_series package
- Module contents
- Subpackages
- cerebralcortex.core.datatypes package
- cerebralcortex.core.log_manager package
- cerebralcortex.core.messaging_manager package
- cerebralcortex.core.metadata_manager package
- cerebralcortex.core.util package
- Module contents
- Subpackages
Submodules¶
cerebralcortex.kernel module¶
-
class
Kernel
(configs_dir_path: str = None, auto_offset_reset: str = 'largest', enable_spark: bool = True, enable_spark_ui=False)[source]¶ Bases:
object
-
connect
(username: str, password: str, encrypted_password: bool = False) → dict[source]¶ Authenticate a user based on username and password and return an auth token
Parameters: - username (str) – username of a user
- password (str) – password of a user
- encrypted_password (str) – is password encrypted or not. mCerebrum sends encrypted passwords
Raises: ValueError
– User name and password cannot be empty/None.Returns: return eturn {“status”:bool, “auth_token”: str, “msg”: str}
Return type: dict
Examples
>>> CC = Kernel("/directory/path/of/configs/") >>> CC.connect("nasir_ali", "2ksdfhoi2r2ljndf823hlkf8234hohwef0234hlkjwer98u234", True) >>> True
-
create_bucket
(bucket_name: str) → bool[source]¶ creates a bucket aka folder in object storage system.
Parameters: bucket_name (str) – name of the bucket Returns: True if bucket was successfully created. On failure, returns an error with dict {“error”:”error-message”} Return type: bool Raises: ValueError
– Bucket name cannot be empty/None.Examples
>>> CC = Kernel("/directory/path/of/configs/") >>> CC.create_bucket("live_data_folder") >>> True
-
create_user
(username: str, user_password: str, user_role: str, user_metadata: dict, user_settings: dict) → bool[source]¶ Create a user in SQL storage if it doesn’t exist
Parameters: - username (str) – Only alphanumeric usernames are allowed with the max length of 25 chars.
- user_password (str) – no size limit on password
- user_role (str) – role of a user
- user_metadata (dict) – metadata of a user
- user_settings (dict) – user settings, mCerebrum configurations of a user
Returns: True if user is successfully registered or throws any error in case of failure
Return type: bool
Raises: ValueError
– if selected username is not availableException
– if sql query fails
-
delete_user
(username: str) → bool[source]¶ Delete a user record in SQL table
Parameters: username – username of a user that needs to be deleted
Returns: if user is successfully removed
Return type: bool
Raises: ValueError
– if username param is empty or NoneException
– if sql query fails
-
encrypt_user_password
(user_password: str) → str[source]¶ Encrypt password
Parameters: user_password (str) – unencrypted password Raises: ValueError
– password cannot be None or empty.Returns: encrypted password Return type: str
-
gen_random_pass
(string_type: str = 'varchar', size: int = 8) → str[source]¶ Generate a random password
Parameters: - string_type – Accepted parameters are “varchar” and “char”. (Default=”varchar”)
- size – password length (default=8)
Returns: random password
Return type: str
-
get_all_users
(study_name: str) → List[dict][source]¶ Get a list of all users part of a study.
Parameters: study_name (str) – name of a study Raises: ValueError
– Study name is a requied field.Returns: Returns empty list if there is no user associated to the study_name and/or study_name does not exist. Return type: list[dict] Examples
>>> CC = Kernel("/directory/path/of/configs/") >>> CC.get_all_users("mperf") >>> [{"76cc444c-4fb8-776e-2872-9472b4e66b16": "nasir_ali"}] # [{user_id, user_name}]
-
get_bucket_objects
(bucket_name: str) → dict[source]¶ returns a list of all objects stored in the specified Minio bucket
Parameters: bucket_name (str) – name of the bucket aka folder Returns: {bucket-objects: [{“object_name”:”“, “metadata”: {}}…], in case of an error {“error”: str} Return type: dict
-
get_buckets
() → dict[source]¶ returns all available buckets in an object storage
Returns: {bucket-name: str, [{“key”:”value”}]}, in case of an error {“error”: str} Return type: dict
-
get_cache_value
(key: str) → str[source]¶ Retrieves value from the cache for the given key.
Parameters: key – key in the cache Returns: The value in the cache Return type: str Raises: ValueError
– if key is None or empty
-
get_kafka_offsets
(topic: str) → dict[source]¶ Get last stored kafka offsets
Parameters: topic (str) – kafka topic name Returns: list of kafka offsets. This method will return empty list if topic does not exist and/or no offset is stored for the topic. Return type: list[dict] Raises: ValueError
– Topic name cannot be empty/NoneExamples
>>> CC = Kernel("/directory/path/of/configs/") >>> CC.get_kafka_offsets("live-data") >>> [{"id","topic", "topic_partition", "offset_start", "offset_until", "offset_update_time"}]
-
get_object
(bucket_name: str, object_name: str) → dict[source]¶ Returns stored object (HttpResponse)
Parameters: - bucket_name (str) – name of a bucket aka folder
- object_name (str) – name of an object that needs to be downloaded
Returns: object that needs to be downloaded. If file does not exists then it returns an error {“error”: “File does not exist.”}
Return type: file-object
Raises: ValueError
– Missing bucket_name and object_name params.Exception
– {“error”: “error-message”}
-
get_object_stats
(bucket_name: str, object_name: str) → dict[source]¶ Returns properties (e.g., object type, last modified etc.) of an object stored in a specified bucket
Parameters: - bucket_name (str) – name of a bucket aka folder
- object_name (str) – name of an object
Returns: information of an object (e.g., creation_date, object_size etc.). In case of an error {“error”: str}
Return type: dict
Raises: ValueError
– Missing bucket_name and object_name params.Exception
– {“error”: “error-message”}
-
get_stream
(stream_name: str, version: str = 'all', data_type=<DataSet.COMPLETE: (1, )>) → cerebralcortex.core.datatypes.datastream.DataStream[source]¶ Retrieve a data-stream with it’s metadata.
Parameters: - stream_name (str) – name of a stream
- version (str) – version of a stream. Acceptable parameters are all, latest, or a specific version of a stream (e.g., 2.0) (Default=”all”)
- data_type (DataSet) – DataSet.COMPLETE returns both Data and Metadata. DataSet.ONLY_DATA returns only Data. DataSet.ONLY_METADATA returns only metadata of a stream. (Default=DataSet.COMPLETE)
Returns: contains Data and/or metadata
Return type: Raises: ValueError
– if stream name is empty or NoneNote
Please specify a version if you know the exact version of a stream. Getting all the stream data and then filtering versions won’t be efficient.
Examples
>>> CC = Kernel("/directory/path/of/configs/") >>> ds = CC.get_stream("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> ds.data # an object of a dataframe >>> ds.metadata # an object of MetaData class >>> ds.get_metadata(version=1) # get the specific version metadata of a stream
-
get_stream_info_by_hash
(metadata_hash: <module 'uuid' from '/home/docs/.pyenv/versions/3.6.8/lib/python3.6/uuid.py'>) → str[source]¶ metadata_hash are unique to each stream version. This reverse look can return the stream name of a metadata_hash.
Parameters: metadata_hash (uuid) – This could be an actual uuid object or a string form of uuid. Returns: stream metadata and other info related to a stream Return type: dict Examples
>>> CC = Kernel("/directory/path/of/configs/") >>> CC.get_stream_name("00ab666c-afb8-476e-9872-6472b4e66b68") >>> {"name": .....} # stream metadata and other information
-
get_stream_metadata
(stream_name: str, version: str = 'all') → List[cerebralcortex.core.metadata_manager.stream.metadata.Metadata][source]¶ Get a list of metadata for all versions available for a stream.
Parameters: - stream_name (str) – name of a stream
- version (str) – version of a stream. Acceptable parameters are all, latest, or a specific version of a stream (e.g., 2.0) (Default=”all”)
Returns: Returns an empty list if no metadata is available for a stream_name or a list of metadata otherwise.
Return type: list[Metadata]
Raises: ValueError
– stream_name cannot be None or empty.Examples
>>> CC = Kernel("/directory/path/of/configs/") >>> CC.get_all_users("mperf") >>> [Metadata] # list of MetaData class objects
-
get_stream_metadata_hash
(stream_name: str) → list[source]¶ Get all the metadata_hash associated with a stream name.
Parameters: stream_name (str) – name of a stream Returns: list of all the metadata hashes Return type: list[str] Examples
>>> CC = Kernel("/directory/path/of/configs/") >>> CC.get_metadata_hash("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> ["00ab666c-afb8-476e-9872-6472b4e66b68", "15cc444c-dfb8-676e-3872-8472b4e66b12"]
-
get_stream_name
(metadata_hash: <module 'uuid' from '/home/docs/.pyenv/versions/3.6.8/lib/python3.6/uuid.py'>) → str[source]¶ metadata_hash are unique to each stream version. This reverse look can return the stream name of a metadata_hash.
Parameters: metadata_hash (uuid) – This could be an actual uuid object or a string form of uuid. Returns: name of a stream Return type: str Examples
>>> CC = Kernel("/directory/path/of/configs/") >>> CC.get_stream_name("00ab666c-afb8-476e-9872-6472b4e66b68") >>> ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST
-
get_stream_versions
(stream_name: str) → list[source]¶ Returns a list of versions available for a stream
Parameters: stream_name (str) – name of a stream Returns: list of int Return type: list Raises: ValueError
– if stream_name is empty or NoneExamples
>>> CC = Kernel("/directory/path/of/configs/") >>> CC.get_stream_versions("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> [1, 2, 4]
-
get_user_id
(user_name: str) → str[source]¶ Get the user id linked to user_name.
Parameters: user_name (str) – username of a user Returns: user id associated to user_name Return type: str Raises: ValueError
– User name is a required field.Examples
>>> CC = Kernel("/directory/path/of/configs/") >>> CC.get_user_id("nasir_ali") >>> '76cc444c-4fb8-776e-2872-9472b4e66b16'
-
get_user_metadata
(user_id: str = None, username: str = None) → dict[source]¶ Get user metadata by user_id or by username
Parameters: - user_id (str) – id (uuid) of a user
- user_name (str) – username of a user
Returns: user metadata
Return type: dict
Todo
Return list of User class object
Raises: ValueError
– User ID/name cannot be empty.Examples
>>> CC = Kernel("/directory/path/of/configs/") >>> CC.get_user_metadata(username="nasir_ali") >>> {"study_name":"mperf"........}
-
get_user_name
(user_id: str) → str[source]¶ Get the user name linked to a user id.
Parameters: user_name (str) – username of a user Returns: user_id associated to username Return type: bool Raises: ValueError
– User ID is a required field.Examples
>>> CC = Kernel("/directory/path/of/configs/") >>> CC.get_user_name("76cc444c-4fb8-776e-2872-9472b4e66b16") >>> 'nasir_ali'
-
get_user_settings
(username: str = None, auth_token: str = None) → dict[source]¶ Get user settings by auth-token or by username. These are user’s mCerebrum settings
Parameters: - username (str) – username of a user
- auth_token (str) – auth-token
Returns: List of dictionaries of user metadata
Return type: list[dict]
Todo
Return list of User class object
Raises: ValueError
– User ID/name cannot be empty.Examples
>>> CC = Kernel("/directory/path/of/configs/") >>> CC.get_user_settings(username="nasir_ali") >>> [{"mcerebrum":"some-conf"........}]
-
is_auth_token_valid
(username: str, auth_token: str, checktime: bool = False) → bool[source]¶ Validate whether a token is valid or expired based on the token expiry datetime stored in SQL
Parameters: - username (str) – username of a user
- auth_token (str) – token generated by API-Server
- checktime (bool) – setting this to False will only check if the token is available in system. Setting this to true will check if the token is expired based on the token expiry date.
Raises: ValueError
– Auth token and auth-token expiry time cannot be null/empty.Returns: returns True if token is valid or False otherwise.
Return type: bool
-
is_bucket
(bucket_name: str) → bool[source]¶ checks whether a bucket exist
Parameters: bucket_name (str) – name of the bucket aka folder Returns: True if bucket exist or False otherwise. In case an error {“error”: str} Return type: bool Raises: ValueError
– bucket_name cannot be None or empty.
-
is_object
(bucket_name: str, object_name: str) → bool[source]¶ checks whether an object exist in a bucket
Parameters: - bucket_name (str) – name of the bucket aka folder
- object_name (str) – name of the object
Returns: True if object exist or False otherwise. In case an error {“error”: str}
Return type: bool
Raises: Excecption
– if bucket_name and object_name are empty or None
-
is_stream
(stream_name: str) → bool[source]¶ Returns true if provided stream exists.
Parameters: stream_name (str) – name of a stream Returns: True if stream_name exist False otherwise Return type: bool Examples
>>> CC = Kernel("/directory/path/of/configs/") >>> CC.is_stream("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> True
-
is_user
(user_id: str = None, user_name: str = None) → bool[source]¶ Checks whether a user exists in the system. One of both parameters could be set to verify whether user exist.
Parameters: - user_id (str) – id (uuid) of a user
- user_name (str) – username of a user
Returns: True if a user exists in the system or False otherwise.
Return type: bool
Raises: ValueError
– Both user_id and user_name cannot be None or empty.Examples
>>> CC = Kernel("/directory/path/of/configs/") >>> CC.is_user(user_id="76cc444c-4fb8-776e-2872-9472b4e66b16") >>> True
-
kafka_produce_message
(topic: str, msg: dict)[source]¶ Publish a message on kafka message queue
Parameters: - topic (str) – name of the kafka topic
- msg (dict) – message that needs to published on kafka
Returns: True if successful. In case of failure, it returns an Exception message.
Return type: bool
Raises: ValueError
– topic and message parameters cannot be empty or None.Exception
– Error publishing message. Topic: topic_name - error-message
-
kafka_subscribe_to_topic
(topic: str)[source]¶ Subscribe to kafka topic as a consumer
Parameters: topic (str) – name of the kafka topic Yields: dict – kafka message Raises: ValueError
– Topic parameter is missing.
-
list_streams
() → List[cerebralcortex.core.metadata_manager.stream.metadata.Metadata][source]¶ Get all the available stream names with metadata
Returns: list of available streams metadata Return type: List[Metadata] Examples
>>> CC = Kernel("/directory/path/of/configs/") >>> CC.list_streams()
-
save_data_to_influxdb
(datastream: cerebralcortex.core.datatypes.datastream.DataStream)[source]¶ Save data stream to influxdb only for visualization purposes.
Parameters: datastream (DataStream) – a DataStream object Returns: True if data is ingested successfully or False otherwise Return type: bool Todo
This needs to be updated with the new structure. Should metadata be stored or not?
Example
>>> CC = Kernel("/directory/path/of/configs/") >>> ds = DataStream(dataframe, MetaData) >>> CC.save_data_to_influxdb(ds)
-
save_stream
(datastream: cerebralcortex.core.datatypes.datastream.DataStream, ingestInfluxDB: bool = False) → bool[source]¶ Saves datastream raw data in selected NoSQL storage and metadata in MySQL.
Parameters: - datastream (DataStream) – a DataStream object
- ingestInfluxDB (bool) – Setting this to True will ingest the raw data in InfluxDB as well that could be used to visualize data in Grafana
Returns: True if stream is successfully stored or throws an exception
Return type: bool
Raises: Exception
– log or throws exception if stream is not storedTodo
Add functionality to store data in influxdb.
Examples
>>> CC = Kernel("/directory/path/of/configs/") >>> ds = DataStream(dataframe, MetaData) >>> CC.save_stream(ds)
-
search_stream
(stream_name)[source]¶ Find all the stream names similar to stream_name arg. For example, passing “location” argument will return all stream names that contain the word location
Returns: list of stream names similar to stream_name arg Return type: List[str] Examples
>>> CC = Kernel("/directory/path/of/configs/") >>> CC.search_stream("battery") >>> ["BATTERY--org.md2k.motionsense--MOTION_SENSE_HRV--LEFT_WRIST", "BATTERY--org.md2k.phonesensor--PHONE".....]
-
set_cache_value
(key: str, value: str) → bool[source]¶ Creates a new cache entry in the cache. Values are overwritten for existing keys.
Parameters: - key – key in the cache
- value – value associated with the key
Returns: True on successful insert or False otherwise.
Return type: bool
Raises: ValueError
– if key is None or empty
-
store_or_update_Kafka_offset
(topic: str, topic_partition: str, offset_start: str, offset_until: str) → bool[source]¶ Store or Update kafka topic offsets. Offsets are used to track what messages have been processed.
Parameters: - topic (str) – name of the kafka topic
- topic_partition (str) – partition number
- offset_start (str) – starting of offset
- offset_until (str) – last processed offset
Raises: ValueError
– All params are required.Exception
– Cannot add/update kafka offsets because ERROR-MESSAGE
Returns: returns True if offsets are add/updated or throws an exception.
Return type: bool
-
update_auth_token
(username: str, auth_token: str, auth_token_issued_time: datetime.datetime, auth_token_expiry_time: datetime.datetime) → bool[source]¶ Update an auth token in SQL database to keep user stay logged in. Auth token valid duration can be changed in configuration files.
Notes
This method is used by API-server to store newly created auth-token
Parameters: - username (str) – username of a user
- auth_token (str) – issued new auth token
- auth_token_issued_time (datetime) – datetime when the old auth token was issue
- auth_token_expiry_time (datetime) – datetime when the token will get expired
Raises: ValueError
– Auth token and auth-token issue/expiry time cannot be None/empty.Returns: Returns True if the new auth token is set or False otherwise.
Return type: bool
-
upload_object
(bucket_name: str, object_name: str, object_filepath: str) → bool[source]¶ Upload an object in a bucket aka folder of object storage system.
Parameters: - bucket_name (str) – name of the bucket
- object_name (str) – name of the object to be uploaded
- object_filepath (str) – it shall contain full path of a file with file name (e.g., /home/nasir/obj.zip)
Returns: True if object successfully uploaded. On failure, returns an error with dict {“error”:”error-message”}
Return type: bool
Raises: ValueError
– Bucket name cannot be empty/None.
-
Module contents¶
-
class
Kernel
(configs_dir_path: str = None, auto_offset_reset: str = 'largest', enable_spark: bool = True, enable_spark_ui=False)[source]¶ Bases:
object
-
connect
(username: str, password: str, encrypted_password: bool = False) → dict[source]¶ Authenticate a user based on username and password and return an auth token
Parameters: - username (str) – username of a user
- password (str) – password of a user
- encrypted_password (str) – is password encrypted or not. mCerebrum sends encrypted passwords
Raises: ValueError
– User name and password cannot be empty/None.Returns: return eturn {“status”:bool, “auth_token”: str, “msg”: str}
Return type: dict
Examples
>>> CC = Kernel("/directory/path/of/configs/") >>> CC.connect("nasir_ali", "2ksdfhoi2r2ljndf823hlkf8234hohwef0234hlkjwer98u234", True) >>> True
-
create_bucket
(bucket_name: str) → bool[source]¶ creates a bucket aka folder in object storage system.
Parameters: bucket_name (str) – name of the bucket Returns: True if bucket was successfully created. On failure, returns an error with dict {“error”:”error-message”} Return type: bool Raises: ValueError
– Bucket name cannot be empty/None.Examples
>>> CC = Kernel("/directory/path/of/configs/") >>> CC.create_bucket("live_data_folder") >>> True
-
create_user
(username: str, user_password: str, user_role: str, user_metadata: dict, user_settings: dict) → bool[source]¶ Create a user in SQL storage if it doesn’t exist
Parameters: - username (str) – Only alphanumeric usernames are allowed with the max length of 25 chars.
- user_password (str) – no size limit on password
- user_role (str) – role of a user
- user_metadata (dict) – metadata of a user
- user_settings (dict) – user settings, mCerebrum configurations of a user
Returns: True if user is successfully registered or throws any error in case of failure
Return type: bool
Raises: ValueError
– if selected username is not availableException
– if sql query fails
-
delete_user
(username: str) → bool[source]¶ Delete a user record in SQL table
Parameters: username – username of a user that needs to be deleted
Returns: if user is successfully removed
Return type: bool
Raises: ValueError
– if username param is empty or NoneException
– if sql query fails
-
encrypt_user_password
(user_password: str) → str[source]¶ Encrypt password
Parameters: user_password (str) – unencrypted password Raises: ValueError
– password cannot be None or empty.Returns: encrypted password Return type: str
-
gen_random_pass
(string_type: str = 'varchar', size: int = 8) → str[source]¶ Generate a random password
Parameters: - string_type – Accepted parameters are “varchar” and “char”. (Default=”varchar”)
- size – password length (default=8)
Returns: random password
Return type: str
-
get_all_users
(study_name: str) → List[dict][source]¶ Get a list of all users part of a study.
Parameters: study_name (str) – name of a study Raises: ValueError
– Study name is a requied field.Returns: Returns empty list if there is no user associated to the study_name and/or study_name does not exist. Return type: list[dict] Examples
>>> CC = Kernel("/directory/path/of/configs/") >>> CC.get_all_users("mperf") >>> [{"76cc444c-4fb8-776e-2872-9472b4e66b16": "nasir_ali"}] # [{user_id, user_name}]
-
get_bucket_objects
(bucket_name: str) → dict[source]¶ returns a list of all objects stored in the specified Minio bucket
Parameters: bucket_name (str) – name of the bucket aka folder Returns: {bucket-objects: [{“object_name”:”“, “metadata”: {}}…], in case of an error {“error”: str} Return type: dict
-
get_buckets
() → dict[source]¶ returns all available buckets in an object storage
Returns: {bucket-name: str, [{“key”:”value”}]}, in case of an error {“error”: str} Return type: dict
-
get_cache_value
(key: str) → str[source]¶ Retrieves value from the cache for the given key.
Parameters: key – key in the cache Returns: The value in the cache Return type: str Raises: ValueError
– if key is None or empty
-
get_kafka_offsets
(topic: str) → dict[source]¶ Get last stored kafka offsets
Parameters: topic (str) – kafka topic name Returns: list of kafka offsets. This method will return empty list if topic does not exist and/or no offset is stored for the topic. Return type: list[dict] Raises: ValueError
– Topic name cannot be empty/NoneExamples
>>> CC = Kernel("/directory/path/of/configs/") >>> CC.get_kafka_offsets("live-data") >>> [{"id","topic", "topic_partition", "offset_start", "offset_until", "offset_update_time"}]
-
get_object
(bucket_name: str, object_name: str) → dict[source]¶ Returns stored object (HttpResponse)
Parameters: - bucket_name (str) – name of a bucket aka folder
- object_name (str) – name of an object that needs to be downloaded
Returns: object that needs to be downloaded. If file does not exists then it returns an error {“error”: “File does not exist.”}
Return type: file-object
Raises: ValueError
– Missing bucket_name and object_name params.Exception
– {“error”: “error-message”}
-
get_object_stats
(bucket_name: str, object_name: str) → dict[source]¶ Returns properties (e.g., object type, last modified etc.) of an object stored in a specified bucket
Parameters: - bucket_name (str) – name of a bucket aka folder
- object_name (str) – name of an object
Returns: information of an object (e.g., creation_date, object_size etc.). In case of an error {“error”: str}
Return type: dict
Raises: ValueError
– Missing bucket_name and object_name params.Exception
– {“error”: “error-message”}
-
get_stream
(stream_name: str, version: str = 'all', data_type=<DataSet.COMPLETE: (1, )>) → cerebralcortex.core.datatypes.datastream.DataStream[source]¶ Retrieve a data-stream with it’s metadata.
Parameters: - stream_name (str) – name of a stream
- version (str) – version of a stream. Acceptable parameters are all, latest, or a specific version of a stream (e.g., 2.0) (Default=”all”)
- data_type (DataSet) – DataSet.COMPLETE returns both Data and Metadata. DataSet.ONLY_DATA returns only Data. DataSet.ONLY_METADATA returns only metadata of a stream. (Default=DataSet.COMPLETE)
Returns: contains Data and/or metadata
Return type: Raises: ValueError
– if stream name is empty or NoneNote
Please specify a version if you know the exact version of a stream. Getting all the stream data and then filtering versions won’t be efficient.
Examples
>>> CC = Kernel("/directory/path/of/configs/") >>> ds = CC.get_stream("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> ds.data # an object of a dataframe >>> ds.metadata # an object of MetaData class >>> ds.get_metadata(version=1) # get the specific version metadata of a stream
-
get_stream_info_by_hash
(metadata_hash: <module 'uuid' from '/home/docs/.pyenv/versions/3.6.8/lib/python3.6/uuid.py'>) → str[source]¶ metadata_hash are unique to each stream version. This reverse look can return the stream name of a metadata_hash.
Parameters: metadata_hash (uuid) – This could be an actual uuid object or a string form of uuid. Returns: stream metadata and other info related to a stream Return type: dict Examples
>>> CC = Kernel("/directory/path/of/configs/") >>> CC.get_stream_name("00ab666c-afb8-476e-9872-6472b4e66b68") >>> {"name": .....} # stream metadata and other information
-
get_stream_metadata
(stream_name: str, version: str = 'all') → List[cerebralcortex.core.metadata_manager.stream.metadata.Metadata][source]¶ Get a list of metadata for all versions available for a stream.
Parameters: - stream_name (str) – name of a stream
- version (str) – version of a stream. Acceptable parameters are all, latest, or a specific version of a stream (e.g., 2.0) (Default=”all”)
Returns: Returns an empty list if no metadata is available for a stream_name or a list of metadata otherwise.
Return type: list[Metadata]
Raises: ValueError
– stream_name cannot be None or empty.Examples
>>> CC = Kernel("/directory/path/of/configs/") >>> CC.get_all_users("mperf") >>> [Metadata] # list of MetaData class objects
-
get_stream_metadata_hash
(stream_name: str) → list[source]¶ Get all the metadata_hash associated with a stream name.
Parameters: stream_name (str) – name of a stream Returns: list of all the metadata hashes Return type: list[str] Examples
>>> CC = Kernel("/directory/path/of/configs/") >>> CC.get_metadata_hash("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> ["00ab666c-afb8-476e-9872-6472b4e66b68", "15cc444c-dfb8-676e-3872-8472b4e66b12"]
-
get_stream_name
(metadata_hash: <module 'uuid' from '/home/docs/.pyenv/versions/3.6.8/lib/python3.6/uuid.py'>) → str[source]¶ metadata_hash are unique to each stream version. This reverse look can return the stream name of a metadata_hash.
Parameters: metadata_hash (uuid) – This could be an actual uuid object or a string form of uuid. Returns: name of a stream Return type: str Examples
>>> CC = Kernel("/directory/path/of/configs/") >>> CC.get_stream_name("00ab666c-afb8-476e-9872-6472b4e66b68") >>> ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST
-
get_stream_versions
(stream_name: str) → list[source]¶ Returns a list of versions available for a stream
Parameters: stream_name (str) – name of a stream Returns: list of int Return type: list Raises: ValueError
– if stream_name is empty or NoneExamples
>>> CC = Kernel("/directory/path/of/configs/") >>> CC.get_stream_versions("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> [1, 2, 4]
-
get_user_id
(user_name: str) → str[source]¶ Get the user id linked to user_name.
Parameters: user_name (str) – username of a user Returns: user id associated to user_name Return type: str Raises: ValueError
– User name is a required field.Examples
>>> CC = Kernel("/directory/path/of/configs/") >>> CC.get_user_id("nasir_ali") >>> '76cc444c-4fb8-776e-2872-9472b4e66b16'
-
get_user_metadata
(user_id: str = None, username: str = None) → dict[source]¶ Get user metadata by user_id or by username
Parameters: - user_id (str) – id (uuid) of a user
- user_name (str) – username of a user
Returns: user metadata
Return type: dict
Todo
Return list of User class object
Raises: ValueError
– User ID/name cannot be empty.Examples
>>> CC = Kernel("/directory/path/of/configs/") >>> CC.get_user_metadata(username="nasir_ali") >>> {"study_name":"mperf"........}
-
get_user_name
(user_id: str) → str[source]¶ Get the user name linked to a user id.
Parameters: user_name (str) – username of a user Returns: user_id associated to username Return type: bool Raises: ValueError
– User ID is a required field.Examples
>>> CC = Kernel("/directory/path/of/configs/") >>> CC.get_user_name("76cc444c-4fb8-776e-2872-9472b4e66b16") >>> 'nasir_ali'
-
get_user_settings
(username: str = None, auth_token: str = None) → dict[source]¶ Get user settings by auth-token or by username. These are user’s mCerebrum settings
Parameters: - username (str) – username of a user
- auth_token (str) – auth-token
Returns: List of dictionaries of user metadata
Return type: list[dict]
Todo
Return list of User class object
Raises: ValueError
– User ID/name cannot be empty.Examples
>>> CC = Kernel("/directory/path/of/configs/") >>> CC.get_user_settings(username="nasir_ali") >>> [{"mcerebrum":"some-conf"........}]
-
is_auth_token_valid
(username: str, auth_token: str, checktime: bool = False) → bool[source]¶ Validate whether a token is valid or expired based on the token expiry datetime stored in SQL
Parameters: - username (str) – username of a user
- auth_token (str) – token generated by API-Server
- checktime (bool) – setting this to False will only check if the token is available in system. Setting this to true will check if the token is expired based on the token expiry date.
Raises: ValueError
– Auth token and auth-token expiry time cannot be null/empty.Returns: returns True if token is valid or False otherwise.
Return type: bool
-
is_bucket
(bucket_name: str) → bool[source]¶ checks whether a bucket exist
Parameters: bucket_name (str) – name of the bucket aka folder Returns: True if bucket exist or False otherwise. In case an error {“error”: str} Return type: bool Raises: ValueError
– bucket_name cannot be None or empty.
-
is_object
(bucket_name: str, object_name: str) → bool[source]¶ checks whether an object exist in a bucket
Parameters: - bucket_name (str) – name of the bucket aka folder
- object_name (str) – name of the object
Returns: True if object exist or False otherwise. In case an error {“error”: str}
Return type: bool
Raises: Excecption
– if bucket_name and object_name are empty or None
-
is_stream
(stream_name: str) → bool[source]¶ Returns true if provided stream exists.
Parameters: stream_name (str) – name of a stream Returns: True if stream_name exist False otherwise Return type: bool Examples
>>> CC = Kernel("/directory/path/of/configs/") >>> CC.is_stream("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> True
-
is_user
(user_id: str = None, user_name: str = None) → bool[source]¶ Checks whether a user exists in the system. One of both parameters could be set to verify whether user exist.
Parameters: - user_id (str) – id (uuid) of a user
- user_name (str) – username of a user
Returns: True if a user exists in the system or False otherwise.
Return type: bool
Raises: ValueError
– Both user_id and user_name cannot be None or empty.Examples
>>> CC = Kernel("/directory/path/of/configs/") >>> CC.is_user(user_id="76cc444c-4fb8-776e-2872-9472b4e66b16") >>> True
-
kafka_produce_message
(topic: str, msg: dict)[source]¶ Publish a message on kafka message queue
Parameters: - topic (str) – name of the kafka topic
- msg (dict) – message that needs to published on kafka
Returns: True if successful. In case of failure, it returns an Exception message.
Return type: bool
Raises: ValueError
– topic and message parameters cannot be empty or None.Exception
– Error publishing message. Topic: topic_name - error-message
-
kafka_subscribe_to_topic
(topic: str)[source]¶ Subscribe to kafka topic as a consumer
Parameters: topic (str) – name of the kafka topic Yields: dict – kafka message Raises: ValueError
– Topic parameter is missing.
-
list_streams
() → List[cerebralcortex.core.metadata_manager.stream.metadata.Metadata][source]¶ Get all the available stream names with metadata
Returns: list of available streams metadata Return type: List[Metadata] Examples
>>> CC = Kernel("/directory/path/of/configs/") >>> CC.list_streams()
-
save_data_to_influxdb
(datastream: cerebralcortex.core.datatypes.datastream.DataStream)[source]¶ Save data stream to influxdb only for visualization purposes.
Parameters: datastream (DataStream) – a DataStream object Returns: True if data is ingested successfully or False otherwise Return type: bool Todo
This needs to be updated with the new structure. Should metadata be stored or not?
Example
>>> CC = Kernel("/directory/path/of/configs/") >>> ds = DataStream(dataframe, MetaData) >>> CC.save_data_to_influxdb(ds)
-
save_stream
(datastream: cerebralcortex.core.datatypes.datastream.DataStream, ingestInfluxDB: bool = False) → bool[source]¶ Saves datastream raw data in selected NoSQL storage and metadata in MySQL.
Parameters: - datastream (DataStream) – a DataStream object
- ingestInfluxDB (bool) – Setting this to True will ingest the raw data in InfluxDB as well that could be used to visualize data in Grafana
Returns: True if stream is successfully stored or throws an exception
Return type: bool
Raises: Exception
– log or throws exception if stream is not storedTodo
Add functionality to store data in influxdb.
Examples
>>> CC = Kernel("/directory/path/of/configs/") >>> ds = DataStream(dataframe, MetaData) >>> CC.save_stream(ds)
-
search_stream
(stream_name)[source]¶ Find all the stream names similar to stream_name arg. For example, passing “location” argument will return all stream names that contain the word location
Returns: list of stream names similar to stream_name arg Return type: List[str] Examples
>>> CC = Kernel("/directory/path/of/configs/") >>> CC.search_stream("battery") >>> ["BATTERY--org.md2k.motionsense--MOTION_SENSE_HRV--LEFT_WRIST", "BATTERY--org.md2k.phonesensor--PHONE".....]
-
set_cache_value
(key: str, value: str) → bool[source]¶ Creates a new cache entry in the cache. Values are overwritten for existing keys.
Parameters: - key – key in the cache
- value – value associated with the key
Returns: True on successful insert or False otherwise.
Return type: bool
Raises: ValueError
– if key is None or empty
-
store_or_update_Kafka_offset
(topic: str, topic_partition: str, offset_start: str, offset_until: str) → bool[source]¶ Store or Update kafka topic offsets. Offsets are used to track what messages have been processed.
Parameters: - topic (str) – name of the kafka topic
- topic_partition (str) – partition number
- offset_start (str) – starting of offset
- offset_until (str) – last processed offset
Raises: ValueError
– All params are required.Exception
– Cannot add/update kafka offsets because ERROR-MESSAGE
Returns: returns True if offsets are add/updated or throws an exception.
Return type: bool
-
update_auth_token
(username: str, auth_token: str, auth_token_issued_time: datetime.datetime, auth_token_expiry_time: datetime.datetime) → bool[source]¶ Update an auth token in SQL database to keep user stay logged in. Auth token valid duration can be changed in configuration files.
Notes
This method is used by API-server to store newly created auth-token
Parameters: - username (str) – username of a user
- auth_token (str) – issued new auth token
- auth_token_issued_time (datetime) – datetime when the old auth token was issue
- auth_token_expiry_time (datetime) – datetime when the token will get expired
Raises: ValueError
– Auth token and auth-token issue/expiry time cannot be None/empty.Returns: Returns True if the new auth token is set or False otherwise.
Return type: bool
-
upload_object
(bucket_name: str, object_name: str, object_filepath: str) → bool[source]¶ Upload an object in a bucket aka folder of object storage system.
Parameters: - bucket_name (str) – name of the bucket
- object_name (str) – name of the object to be uploaded
- object_filepath (str) – it shall contain full path of a file with file name (e.g., /home/nasir/obj.zip)
Returns: True if object successfully uploaded. On failure, returns an error with dict {“error”:”error-message”}
Return type: bool
Raises: ValueError
– Bucket name cannot be empty/None.
-