CerebralCortex Core

Subpackages

Submodules

cerebralcortex.kernel module

class Kernel(configs_dir_path: str = None, auto_offset_reset: str = 'largest', enable_spark: bool = True, enable_spark_ui=False)[source]

Bases: object

connect(username: str, password: str, encrypted_password: bool = False) → dict[source]

Authenticate a user based on username and password and return an auth token

Parameters:
  • username (str) – username of a user
  • password (str) – password of a user
  • encrypted_password (str) – is password encrypted or not. mCerebrum sends encrypted passwords
Raises:

ValueError – User name and password cannot be empty/None.

Returns:

return eturn {“status”:bool, “auth_token”: str, “msg”: str}

Return type:

dict

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> CC.connect("nasir_ali", "2ksdfhoi2r2ljndf823hlkf8234hohwef0234hlkjwer98u234", True)
>>> True
create_bucket(bucket_name: str) → bool[source]

creates a bucket aka folder in object storage system.

Parameters:bucket_name (str) – name of the bucket
Returns:True if bucket was successfully created. On failure, returns an error with dict {“error”:”error-message”}
Return type:bool
Raises:ValueError – Bucket name cannot be empty/None.

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> CC.create_bucket("live_data_folder")
>>> True
create_user(username: str, user_password: str, user_role: str, user_metadata: dict, user_settings: dict) → bool[source]

Create a user in SQL storage if it doesn’t exist

Parameters:
  • username (str) – Only alphanumeric usernames are allowed with the max length of 25 chars.
  • user_password (str) – no size limit on password
  • user_role (str) – role of a user
  • user_metadata (dict) – metadata of a user
  • user_settings (dict) – user settings, mCerebrum configurations of a user
Returns:

True if user is successfully registered or throws any error in case of failure

Return type:

bool

Raises:
  • ValueError – if selected username is not available
  • Exception – if sql query fails
delete_user(username: str) → bool[source]

Delete a user record in SQL table

Parameters:

username – username of a user that needs to be deleted

Returns:

if user is successfully removed

Return type:

bool

Raises:
  • ValueError – if username param is empty or None
  • Exception – if sql query fails
encrypt_user_password(user_password: str) → str[source]

Encrypt password

Parameters:user_password (str) – unencrypted password
Raises:ValueError – password cannot be None or empty.
Returns:encrypted password
Return type:str
gen_random_pass(string_type: str = 'varchar', size: int = 8) → str[source]

Generate a random password

Parameters:
  • string_type – Accepted parameters are “varchar” and “char”. (Default=”varchar”)
  • size – password length (default=8)
Returns:

random password

Return type:

str

get_all_users(study_name: str) → List[dict][source]

Get a list of all users part of a study.

Parameters:study_name (str) – name of a study
Raises:ValueError – Study name is a requied field.
Returns:Returns empty list if there is no user associated to the study_name and/or study_name does not exist.
Return type:list[dict]

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> CC.get_all_users("mperf")
>>> [{"76cc444c-4fb8-776e-2872-9472b4e66b16": "nasir_ali"}] # [{user_id, user_name}]
get_bucket_objects(bucket_name: str) → dict[source]

returns a list of all objects stored in the specified Minio bucket

Parameters:bucket_name (str) – name of the bucket aka folder
Returns:{bucket-objects: [{“object_name”:”“, “metadata”: {}}…], in case of an error {“error”: str}
Return type:dict
get_buckets() → dict[source]

returns all available buckets in an object storage

Returns:{bucket-name: str, [{“key”:”value”}]}, in case of an error {“error”: str}
Return type:dict
get_cache_value(key: str) → str[source]

Retrieves value from the cache for the given key.

Parameters:key – key in the cache
Returns:The value in the cache
Return type:str
Raises:ValueError – if key is None or empty
get_kafka_offsets(topic: str) → dict[source]

Get last stored kafka offsets

Parameters:topic (str) – kafka topic name
Returns:list of kafka offsets. This method will return empty list if topic does not exist and/or no offset is stored for the topic.
Return type:list[dict]
Raises:ValueError – Topic name cannot be empty/None

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> CC.get_kafka_offsets("live-data")
>>> [{"id","topic", "topic_partition", "offset_start", "offset_until", "offset_update_time"}]
get_object(bucket_name: str, object_name: str) → dict[source]

Returns stored object (HttpResponse)

Parameters:
  • bucket_name (str) – name of a bucket aka folder
  • object_name (str) – name of an object that needs to be downloaded
Returns:

object that needs to be downloaded. If file does not exists then it returns an error {“error”: “File does not exist.”}

Return type:

file-object

Raises:
  • ValueError – Missing bucket_name and object_name params.
  • Exception – {“error”: “error-message”}
get_object_stats(bucket_name: str, object_name: str) → dict[source]

Returns properties (e.g., object type, last modified etc.) of an object stored in a specified bucket

Parameters:
  • bucket_name (str) – name of a bucket aka folder
  • object_name (str) – name of an object
Returns:

information of an object (e.g., creation_date, object_size etc.). In case of an error {“error”: str}

Return type:

dict

Raises:
  • ValueError – Missing bucket_name and object_name params.
  • Exception – {“error”: “error-message”}
get_stream(stream_name: str, version: str = 'all', data_type=<DataSet.COMPLETE: (1, )>) → cerebralcortex.core.datatypes.datastream.DataStream[source]

Retrieve a data-stream with it’s metadata.

Parameters:
  • stream_name (str) – name of a stream
  • version (str) – version of a stream. Acceptable parameters are all, latest, or a specific version of a stream (e.g., 2.0) (Default=”all”)
  • data_type (DataSet) – DataSet.COMPLETE returns both Data and Metadata. DataSet.ONLY_DATA returns only Data. DataSet.ONLY_METADATA returns only metadata of a stream. (Default=DataSet.COMPLETE)
Returns:

contains Data and/or metadata

Return type:

DataStream

Raises:

ValueError – if stream name is empty or None

Note

Please specify a version if you know the exact version of a stream. Getting all the stream data and then filtering versions won’t be efficient.

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> ds = CC.get_stream("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST")
>>> ds.data # an object of a dataframe
>>> ds.metadata # an object of MetaData class
>>> ds.get_metadata(version=1) # get the specific version metadata of a stream
get_stream_info_by_hash(metadata_hash: <module 'uuid' from '/home/docs/.pyenv/versions/3.6.8/lib/python3.6/uuid.py'>) → str[source]

metadata_hash are unique to each stream version. This reverse look can return the stream name of a metadata_hash.

Parameters:metadata_hash (uuid) – This could be an actual uuid object or a string form of uuid.
Returns:stream metadata and other info related to a stream
Return type:dict

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> CC.get_stream_name("00ab666c-afb8-476e-9872-6472b4e66b68")
>>> {"name": .....} # stream metadata and other information
get_stream_metadata(stream_name: str, version: str = 'all') → List[cerebralcortex.core.metadata_manager.stream.metadata.Metadata][source]

Get a list of metadata for all versions available for a stream.

Parameters:
  • stream_name (str) – name of a stream
  • version (str) – version of a stream. Acceptable parameters are all, latest, or a specific version of a stream (e.g., 2.0) (Default=”all”)
Returns:

Returns an empty list if no metadata is available for a stream_name or a list of metadata otherwise.

Return type:

list[Metadata]

Raises:

ValueError – stream_name cannot be None or empty.

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> CC.get_all_users("mperf")
>>> [Metadata] # list of MetaData class objects
get_stream_metadata_hash(stream_name: str) → list[source]

Get all the metadata_hash associated with a stream name.

Parameters:stream_name (str) – name of a stream
Returns:list of all the metadata hashes
Return type:list[str]

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> CC.get_metadata_hash("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST")
>>> ["00ab666c-afb8-476e-9872-6472b4e66b68", "15cc444c-dfb8-676e-3872-8472b4e66b12"]
get_stream_name(metadata_hash: <module 'uuid' from '/home/docs/.pyenv/versions/3.6.8/lib/python3.6/uuid.py'>) → str[source]

metadata_hash are unique to each stream version. This reverse look can return the stream name of a metadata_hash.

Parameters:metadata_hash (uuid) – This could be an actual uuid object or a string form of uuid.
Returns:name of a stream
Return type:str

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> CC.get_stream_name("00ab666c-afb8-476e-9872-6472b4e66b68")
>>> ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST
get_stream_versions(stream_name: str) → list[source]

Returns a list of versions available for a stream

Parameters:stream_name (str) – name of a stream
Returns:list of int
Return type:list
Raises:ValueError – if stream_name is empty or None

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> CC.get_stream_versions("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST")
>>> [1, 2, 4]
get_user_id(user_name: str) → str[source]

Get the user id linked to user_name.

Parameters:user_name (str) – username of a user
Returns:user id associated to user_name
Return type:str
Raises:ValueError – User name is a required field.

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> CC.get_user_id("nasir_ali")
>>> '76cc444c-4fb8-776e-2872-9472b4e66b16'
get_user_metadata(user_id: str = None, username: str = None) → dict[source]

Get user metadata by user_id or by username

Parameters:
  • user_id (str) – id (uuid) of a user
  • user_name (str) – username of a user
Returns:

user metadata

Return type:

dict

Todo

Return list of User class object

Raises:ValueError – User ID/name cannot be empty.

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> CC.get_user_metadata(username="nasir_ali")
>>> {"study_name":"mperf"........}
get_user_name(user_id: str) → str[source]

Get the user name linked to a user id.

Parameters:user_name (str) – username of a user
Returns:user_id associated to username
Return type:bool
Raises:ValueError – User ID is a required field.

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> CC.get_user_name("76cc444c-4fb8-776e-2872-9472b4e66b16")
>>> 'nasir_ali'
get_user_settings(username: str = None, auth_token: str = None) → dict[source]

Get user settings by auth-token or by username. These are user’s mCerebrum settings

Parameters:
  • username (str) – username of a user
  • auth_token (str) – auth-token
Returns:

List of dictionaries of user metadata

Return type:

list[dict]

Todo

Return list of User class object

Raises:ValueError – User ID/name cannot be empty.

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> CC.get_user_settings(username="nasir_ali")
>>> [{"mcerebrum":"some-conf"........}]
is_auth_token_valid(username: str, auth_token: str, checktime: bool = False) → bool[source]

Validate whether a token is valid or expired based on the token expiry datetime stored in SQL

Parameters:
  • username (str) – username of a user
  • auth_token (str) – token generated by API-Server
  • checktime (bool) – setting this to False will only check if the token is available in system. Setting this to true will check if the token is expired based on the token expiry date.
Raises:

ValueError – Auth token and auth-token expiry time cannot be null/empty.

Returns:

returns True if token is valid or False otherwise.

Return type:

bool

is_bucket(bucket_name: str) → bool[source]

checks whether a bucket exist

Parameters:bucket_name (str) – name of the bucket aka folder
Returns:True if bucket exist or False otherwise. In case an error {“error”: str}
Return type:bool
Raises:ValueError – bucket_name cannot be None or empty.
is_object(bucket_name: str, object_name: str) → bool[source]

checks whether an object exist in a bucket

Parameters:
  • bucket_name (str) – name of the bucket aka folder
  • object_name (str) – name of the object
Returns:

True if object exist or False otherwise. In case an error {“error”: str}

Return type:

bool

Raises:

Excecption – if bucket_name and object_name are empty or None

is_stream(stream_name: str) → bool[source]

Returns true if provided stream exists.

Parameters:stream_name (str) – name of a stream
Returns:True if stream_name exist False otherwise
Return type:bool

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> CC.is_stream("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST")
>>> True
is_user(user_id: str = None, user_name: str = None) → bool[source]

Checks whether a user exists in the system. One of both parameters could be set to verify whether user exist.

Parameters:
  • user_id (str) – id (uuid) of a user
  • user_name (str) – username of a user
Returns:

True if a user exists in the system or False otherwise.

Return type:

bool

Raises:

ValueError – Both user_id and user_name cannot be None or empty.

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> CC.is_user(user_id="76cc444c-4fb8-776e-2872-9472b4e66b16")
>>> True
kafka_produce_message(topic: str, msg: dict)[source]

Publish a message on kafka message queue

Parameters:
  • topic (str) – name of the kafka topic
  • msg (dict) – message that needs to published on kafka
Returns:

True if successful. In case of failure, it returns an Exception message.

Return type:

bool

Raises:
  • ValueError – topic and message parameters cannot be empty or None.
  • Exception – Error publishing message. Topic: topic_name - error-message
kafka_subscribe_to_topic(topic: str)[source]

Subscribe to kafka topic as a consumer

Parameters:topic (str) – name of the kafka topic
Yields:dict – kafka message
Raises:ValueError – Topic parameter is missing.
list_streams() → List[cerebralcortex.core.metadata_manager.stream.metadata.Metadata][source]

Get all the available stream names with metadata

Returns:list of available streams metadata
Return type:List[Metadata]

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> CC.list_streams()
save_data_to_influxdb(datastream: cerebralcortex.core.datatypes.datastream.DataStream)[source]

Save data stream to influxdb only for visualization purposes.

Parameters:datastream (DataStream) – a DataStream object
Returns:True if data is ingested successfully or False otherwise
Return type:bool

Todo

This needs to be updated with the new structure. Should metadata be stored or not?

Example

>>> CC = Kernel("/directory/path/of/configs/")
>>> ds = DataStream(dataframe, MetaData)
>>> CC.save_data_to_influxdb(ds)
save_stream(datastream: cerebralcortex.core.datatypes.datastream.DataStream, ingestInfluxDB: bool = False) → bool[source]

Saves datastream raw data in selected NoSQL storage and metadata in MySQL.

Parameters:
  • datastream (DataStream) – a DataStream object
  • ingestInfluxDB (bool) – Setting this to True will ingest the raw data in InfluxDB as well that could be used to visualize data in Grafana
Returns:

True if stream is successfully stored or throws an exception

Return type:

bool

Raises:

Exception – log or throws exception if stream is not stored

Todo

Add functionality to store data in influxdb.

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> ds = DataStream(dataframe, MetaData)
>>> CC.save_stream(ds)
search_stream(stream_name)[source]

Find all the stream names similar to stream_name arg. For example, passing “location” argument will return all stream names that contain the word location

Returns:list of stream names similar to stream_name arg
Return type:List[str]

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> CC.search_stream("battery")
>>> ["BATTERY--org.md2k.motionsense--MOTION_SENSE_HRV--LEFT_WRIST", "BATTERY--org.md2k.phonesensor--PHONE".....]
set_cache_value(key: str, value: str) → bool[source]

Creates a new cache entry in the cache. Values are overwritten for existing keys.

Parameters:
  • key – key in the cache
  • value – value associated with the key
Returns:

True on successful insert or False otherwise.

Return type:

bool

Raises:

ValueError – if key is None or empty

store_or_update_Kafka_offset(topic: str, topic_partition: str, offset_start: str, offset_until: str) → bool[source]

Store or Update kafka topic offsets. Offsets are used to track what messages have been processed.

Parameters:
  • topic (str) – name of the kafka topic
  • topic_partition (str) – partition number
  • offset_start (str) – starting of offset
  • offset_until (str) – last processed offset
Raises:
  • ValueError – All params are required.
  • Exception – Cannot add/update kafka offsets because ERROR-MESSAGE
Returns:

returns True if offsets are add/updated or throws an exception.

Return type:

bool

update_auth_token(username: str, auth_token: str, auth_token_issued_time: datetime.datetime, auth_token_expiry_time: datetime.datetime) → bool[source]

Update an auth token in SQL database to keep user stay logged in. Auth token valid duration can be changed in configuration files.

Notes

This method is used by API-server to store newly created auth-token

Parameters:
  • username (str) – username of a user
  • auth_token (str) – issued new auth token
  • auth_token_issued_time (datetime) – datetime when the old auth token was issue
  • auth_token_expiry_time (datetime) – datetime when the token will get expired
Raises:

ValueError – Auth token and auth-token issue/expiry time cannot be None/empty.

Returns:

Returns True if the new auth token is set or False otherwise.

Return type:

bool

upload_object(bucket_name: str, object_name: str, object_filepath: str) → bool[source]

Upload an object in a bucket aka folder of object storage system.

Parameters:
  • bucket_name (str) – name of the bucket
  • object_name (str) – name of the object to be uploaded
  • object_filepath (str) – it shall contain full path of a file with file name (e.g., /home/nasir/obj.zip)
Returns:

True if object successfully uploaded. On failure, returns an error with dict {“error”:”error-message”}

Return type:

bool

Raises:

ValueError – Bucket name cannot be empty/None.

Module contents

class Kernel(configs_dir_path: str = None, auto_offset_reset: str = 'largest', enable_spark: bool = True, enable_spark_ui=False)[source]

Bases: object

connect(username: str, password: str, encrypted_password: bool = False) → dict[source]

Authenticate a user based on username and password and return an auth token

Parameters:
  • username (str) – username of a user
  • password (str) – password of a user
  • encrypted_password (str) – is password encrypted or not. mCerebrum sends encrypted passwords
Raises:

ValueError – User name and password cannot be empty/None.

Returns:

return eturn {“status”:bool, “auth_token”: str, “msg”: str}

Return type:

dict

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> CC.connect("nasir_ali", "2ksdfhoi2r2ljndf823hlkf8234hohwef0234hlkjwer98u234", True)
>>> True
create_bucket(bucket_name: str) → bool[source]

creates a bucket aka folder in object storage system.

Parameters:bucket_name (str) – name of the bucket
Returns:True if bucket was successfully created. On failure, returns an error with dict {“error”:”error-message”}
Return type:bool
Raises:ValueError – Bucket name cannot be empty/None.

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> CC.create_bucket("live_data_folder")
>>> True
create_user(username: str, user_password: str, user_role: str, user_metadata: dict, user_settings: dict) → bool[source]

Create a user in SQL storage if it doesn’t exist

Parameters:
  • username (str) – Only alphanumeric usernames are allowed with the max length of 25 chars.
  • user_password (str) – no size limit on password
  • user_role (str) – role of a user
  • user_metadata (dict) – metadata of a user
  • user_settings (dict) – user settings, mCerebrum configurations of a user
Returns:

True if user is successfully registered or throws any error in case of failure

Return type:

bool

Raises:
  • ValueError – if selected username is not available
  • Exception – if sql query fails
delete_user(username: str) → bool[source]

Delete a user record in SQL table

Parameters:

username – username of a user that needs to be deleted

Returns:

if user is successfully removed

Return type:

bool

Raises:
  • ValueError – if username param is empty or None
  • Exception – if sql query fails
encrypt_user_password(user_password: str) → str[source]

Encrypt password

Parameters:user_password (str) – unencrypted password
Raises:ValueError – password cannot be None or empty.
Returns:encrypted password
Return type:str
gen_random_pass(string_type: str = 'varchar', size: int = 8) → str[source]

Generate a random password

Parameters:
  • string_type – Accepted parameters are “varchar” and “char”. (Default=”varchar”)
  • size – password length (default=8)
Returns:

random password

Return type:

str

get_all_users(study_name: str) → List[dict][source]

Get a list of all users part of a study.

Parameters:study_name (str) – name of a study
Raises:ValueError – Study name is a requied field.
Returns:Returns empty list if there is no user associated to the study_name and/or study_name does not exist.
Return type:list[dict]

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> CC.get_all_users("mperf")
>>> [{"76cc444c-4fb8-776e-2872-9472b4e66b16": "nasir_ali"}] # [{user_id, user_name}]
get_bucket_objects(bucket_name: str) → dict[source]

returns a list of all objects stored in the specified Minio bucket

Parameters:bucket_name (str) – name of the bucket aka folder
Returns:{bucket-objects: [{“object_name”:”“, “metadata”: {}}…], in case of an error {“error”: str}
Return type:dict
get_buckets() → dict[source]

returns all available buckets in an object storage

Returns:{bucket-name: str, [{“key”:”value”}]}, in case of an error {“error”: str}
Return type:dict
get_cache_value(key: str) → str[source]

Retrieves value from the cache for the given key.

Parameters:key – key in the cache
Returns:The value in the cache
Return type:str
Raises:ValueError – if key is None or empty
get_kafka_offsets(topic: str) → dict[source]

Get last stored kafka offsets

Parameters:topic (str) – kafka topic name
Returns:list of kafka offsets. This method will return empty list if topic does not exist and/or no offset is stored for the topic.
Return type:list[dict]
Raises:ValueError – Topic name cannot be empty/None

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> CC.get_kafka_offsets("live-data")
>>> [{"id","topic", "topic_partition", "offset_start", "offset_until", "offset_update_time"}]
get_object(bucket_name: str, object_name: str) → dict[source]

Returns stored object (HttpResponse)

Parameters:
  • bucket_name (str) – name of a bucket aka folder
  • object_name (str) – name of an object that needs to be downloaded
Returns:

object that needs to be downloaded. If file does not exists then it returns an error {“error”: “File does not exist.”}

Return type:

file-object

Raises:
  • ValueError – Missing bucket_name and object_name params.
  • Exception – {“error”: “error-message”}
get_object_stats(bucket_name: str, object_name: str) → dict[source]

Returns properties (e.g., object type, last modified etc.) of an object stored in a specified bucket

Parameters:
  • bucket_name (str) – name of a bucket aka folder
  • object_name (str) – name of an object
Returns:

information of an object (e.g., creation_date, object_size etc.). In case of an error {“error”: str}

Return type:

dict

Raises:
  • ValueError – Missing bucket_name and object_name params.
  • Exception – {“error”: “error-message”}
get_stream(stream_name: str, version: str = 'all', data_type=<DataSet.COMPLETE: (1, )>) → cerebralcortex.core.datatypes.datastream.DataStream[source]

Retrieve a data-stream with it’s metadata.

Parameters:
  • stream_name (str) – name of a stream
  • version (str) – version of a stream. Acceptable parameters are all, latest, or a specific version of a stream (e.g., 2.0) (Default=”all”)
  • data_type (DataSet) – DataSet.COMPLETE returns both Data and Metadata. DataSet.ONLY_DATA returns only Data. DataSet.ONLY_METADATA returns only metadata of a stream. (Default=DataSet.COMPLETE)
Returns:

contains Data and/or metadata

Return type:

DataStream

Raises:

ValueError – if stream name is empty or None

Note

Please specify a version if you know the exact version of a stream. Getting all the stream data and then filtering versions won’t be efficient.

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> ds = CC.get_stream("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST")
>>> ds.data # an object of a dataframe
>>> ds.metadata # an object of MetaData class
>>> ds.get_metadata(version=1) # get the specific version metadata of a stream
get_stream_info_by_hash(metadata_hash: <module 'uuid' from '/home/docs/.pyenv/versions/3.6.8/lib/python3.6/uuid.py'>) → str[source]

metadata_hash are unique to each stream version. This reverse look can return the stream name of a metadata_hash.

Parameters:metadata_hash (uuid) – This could be an actual uuid object or a string form of uuid.
Returns:stream metadata and other info related to a stream
Return type:dict

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> CC.get_stream_name("00ab666c-afb8-476e-9872-6472b4e66b68")
>>> {"name": .....} # stream metadata and other information
get_stream_metadata(stream_name: str, version: str = 'all') → List[cerebralcortex.core.metadata_manager.stream.metadata.Metadata][source]

Get a list of metadata for all versions available for a stream.

Parameters:
  • stream_name (str) – name of a stream
  • version (str) – version of a stream. Acceptable parameters are all, latest, or a specific version of a stream (e.g., 2.0) (Default=”all”)
Returns:

Returns an empty list if no metadata is available for a stream_name or a list of metadata otherwise.

Return type:

list[Metadata]

Raises:

ValueError – stream_name cannot be None or empty.

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> CC.get_all_users("mperf")
>>> [Metadata] # list of MetaData class objects
get_stream_metadata_hash(stream_name: str) → list[source]

Get all the metadata_hash associated with a stream name.

Parameters:stream_name (str) – name of a stream
Returns:list of all the metadata hashes
Return type:list[str]

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> CC.get_metadata_hash("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST")
>>> ["00ab666c-afb8-476e-9872-6472b4e66b68", "15cc444c-dfb8-676e-3872-8472b4e66b12"]
get_stream_name(metadata_hash: <module 'uuid' from '/home/docs/.pyenv/versions/3.6.8/lib/python3.6/uuid.py'>) → str[source]

metadata_hash are unique to each stream version. This reverse look can return the stream name of a metadata_hash.

Parameters:metadata_hash (uuid) – This could be an actual uuid object or a string form of uuid.
Returns:name of a stream
Return type:str

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> CC.get_stream_name("00ab666c-afb8-476e-9872-6472b4e66b68")
>>> ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST
get_stream_versions(stream_name: str) → list[source]

Returns a list of versions available for a stream

Parameters:stream_name (str) – name of a stream
Returns:list of int
Return type:list
Raises:ValueError – if stream_name is empty or None

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> CC.get_stream_versions("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST")
>>> [1, 2, 4]
get_user_id(user_name: str) → str[source]

Get the user id linked to user_name.

Parameters:user_name (str) – username of a user
Returns:user id associated to user_name
Return type:str
Raises:ValueError – User name is a required field.

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> CC.get_user_id("nasir_ali")
>>> '76cc444c-4fb8-776e-2872-9472b4e66b16'
get_user_metadata(user_id: str = None, username: str = None) → dict[source]

Get user metadata by user_id or by username

Parameters:
  • user_id (str) – id (uuid) of a user
  • user_name (str) – username of a user
Returns:

user metadata

Return type:

dict

Todo

Return list of User class object

Raises:ValueError – User ID/name cannot be empty.

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> CC.get_user_metadata(username="nasir_ali")
>>> {"study_name":"mperf"........}
get_user_name(user_id: str) → str[source]

Get the user name linked to a user id.

Parameters:user_name (str) – username of a user
Returns:user_id associated to username
Return type:bool
Raises:ValueError – User ID is a required field.

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> CC.get_user_name("76cc444c-4fb8-776e-2872-9472b4e66b16")
>>> 'nasir_ali'
get_user_settings(username: str = None, auth_token: str = None) → dict[source]

Get user settings by auth-token or by username. These are user’s mCerebrum settings

Parameters:
  • username (str) – username of a user
  • auth_token (str) – auth-token
Returns:

List of dictionaries of user metadata

Return type:

list[dict]

Todo

Return list of User class object

Raises:ValueError – User ID/name cannot be empty.

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> CC.get_user_settings(username="nasir_ali")
>>> [{"mcerebrum":"some-conf"........}]
is_auth_token_valid(username: str, auth_token: str, checktime: bool = False) → bool[source]

Validate whether a token is valid or expired based on the token expiry datetime stored in SQL

Parameters:
  • username (str) – username of a user
  • auth_token (str) – token generated by API-Server
  • checktime (bool) – setting this to False will only check if the token is available in system. Setting this to true will check if the token is expired based on the token expiry date.
Raises:

ValueError – Auth token and auth-token expiry time cannot be null/empty.

Returns:

returns True if token is valid or False otherwise.

Return type:

bool

is_bucket(bucket_name: str) → bool[source]

checks whether a bucket exist

Parameters:bucket_name (str) – name of the bucket aka folder
Returns:True if bucket exist or False otherwise. In case an error {“error”: str}
Return type:bool
Raises:ValueError – bucket_name cannot be None or empty.
is_object(bucket_name: str, object_name: str) → bool[source]

checks whether an object exist in a bucket

Parameters:
  • bucket_name (str) – name of the bucket aka folder
  • object_name (str) – name of the object
Returns:

True if object exist or False otherwise. In case an error {“error”: str}

Return type:

bool

Raises:

Excecption – if bucket_name and object_name are empty or None

is_stream(stream_name: str) → bool[source]

Returns true if provided stream exists.

Parameters:stream_name (str) – name of a stream
Returns:True if stream_name exist False otherwise
Return type:bool

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> CC.is_stream("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST")
>>> True
is_user(user_id: str = None, user_name: str = None) → bool[source]

Checks whether a user exists in the system. One of both parameters could be set to verify whether user exist.

Parameters:
  • user_id (str) – id (uuid) of a user
  • user_name (str) – username of a user
Returns:

True if a user exists in the system or False otherwise.

Return type:

bool

Raises:

ValueError – Both user_id and user_name cannot be None or empty.

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> CC.is_user(user_id="76cc444c-4fb8-776e-2872-9472b4e66b16")
>>> True
kafka_produce_message(topic: str, msg: dict)[source]

Publish a message on kafka message queue

Parameters:
  • topic (str) – name of the kafka topic
  • msg (dict) – message that needs to published on kafka
Returns:

True if successful. In case of failure, it returns an Exception message.

Return type:

bool

Raises:
  • ValueError – topic and message parameters cannot be empty or None.
  • Exception – Error publishing message. Topic: topic_name - error-message
kafka_subscribe_to_topic(topic: str)[source]

Subscribe to kafka topic as a consumer

Parameters:topic (str) – name of the kafka topic
Yields:dict – kafka message
Raises:ValueError – Topic parameter is missing.
list_streams() → List[cerebralcortex.core.metadata_manager.stream.metadata.Metadata][source]

Get all the available stream names with metadata

Returns:list of available streams metadata
Return type:List[Metadata]

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> CC.list_streams()
save_data_to_influxdb(datastream: cerebralcortex.core.datatypes.datastream.DataStream)[source]

Save data stream to influxdb only for visualization purposes.

Parameters:datastream (DataStream) – a DataStream object
Returns:True if data is ingested successfully or False otherwise
Return type:bool

Todo

This needs to be updated with the new structure. Should metadata be stored or not?

Example

>>> CC = Kernel("/directory/path/of/configs/")
>>> ds = DataStream(dataframe, MetaData)
>>> CC.save_data_to_influxdb(ds)
save_stream(datastream: cerebralcortex.core.datatypes.datastream.DataStream, ingestInfluxDB: bool = False) → bool[source]

Saves datastream raw data in selected NoSQL storage and metadata in MySQL.

Parameters:
  • datastream (DataStream) – a DataStream object
  • ingestInfluxDB (bool) – Setting this to True will ingest the raw data in InfluxDB as well that could be used to visualize data in Grafana
Returns:

True if stream is successfully stored or throws an exception

Return type:

bool

Raises:

Exception – log or throws exception if stream is not stored

Todo

Add functionality to store data in influxdb.

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> ds = DataStream(dataframe, MetaData)
>>> CC.save_stream(ds)
search_stream(stream_name)[source]

Find all the stream names similar to stream_name arg. For example, passing “location” argument will return all stream names that contain the word location

Returns:list of stream names similar to stream_name arg
Return type:List[str]

Examples

>>> CC = Kernel("/directory/path/of/configs/")
>>> CC.search_stream("battery")
>>> ["BATTERY--org.md2k.motionsense--MOTION_SENSE_HRV--LEFT_WRIST", "BATTERY--org.md2k.phonesensor--PHONE".....]
set_cache_value(key: str, value: str) → bool[source]

Creates a new cache entry in the cache. Values are overwritten for existing keys.

Parameters:
  • key – key in the cache
  • value – value associated with the key
Returns:

True on successful insert or False otherwise.

Return type:

bool

Raises:

ValueError – if key is None or empty

store_or_update_Kafka_offset(topic: str, topic_partition: str, offset_start: str, offset_until: str) → bool[source]

Store or Update kafka topic offsets. Offsets are used to track what messages have been processed.

Parameters:
  • topic (str) – name of the kafka topic
  • topic_partition (str) – partition number
  • offset_start (str) – starting of offset
  • offset_until (str) – last processed offset
Raises:
  • ValueError – All params are required.
  • Exception – Cannot add/update kafka offsets because ERROR-MESSAGE
Returns:

returns True if offsets are add/updated or throws an exception.

Return type:

bool

update_auth_token(username: str, auth_token: str, auth_token_issued_time: datetime.datetime, auth_token_expiry_time: datetime.datetime) → bool[source]

Update an auth token in SQL database to keep user stay logged in. Auth token valid duration can be changed in configuration files.

Notes

This method is used by API-server to store newly created auth-token

Parameters:
  • username (str) – username of a user
  • auth_token (str) – issued new auth token
  • auth_token_issued_time (datetime) – datetime when the old auth token was issue
  • auth_token_expiry_time (datetime) – datetime when the token will get expired
Raises:

ValueError – Auth token and auth-token issue/expiry time cannot be None/empty.

Returns:

Returns True if the new auth token is set or False otherwise.

Return type:

bool

upload_object(bucket_name: str, object_name: str, object_filepath: str) → bool[source]

Upload an object in a bucket aka folder of object storage system.

Parameters:
  • bucket_name (str) – name of the bucket
  • object_name (str) – name of the object to be uploaded
  • object_filepath (str) – it shall contain full path of a file with file name (e.g., /home/nasir/obj.zip)
Returns:

True if object successfully uploaded. On failure, returns an error with dict {“error”:”error-message”}

Return type:

bool

Raises:

ValueError – Bucket name cannot be empty/None.