cerebralcortex.core.data_manager.sql package

Submodules

cerebralcortex.core.data_manager.sql.cache_handler module

class CacheHandler[source]

Bases: object

get_cache_value(key: str) → str[source]

Retrieves value from the cache for the given key.

Parameters:key – key in the cache
Returns:The value in the cache
Return type:str
Raises:ValueError – if key is None or empty
set_cache_value(key: str, value: str) → bool[source]

Creates a new cache entry in the cache. Values are overwritten for existing keys.

Parameters:
  • key – key in the cache
  • value – value associated with the key
Returns:

True on successful insert or False otherwise.

Return type:

bool

Raises:

ValueError – if key is None or empty

cerebralcortex.core.data_manager.sql.data module

class SqlData(CC)[source]

Bases: cerebralcortex.core.data_manager.sql.stream_handler.StreamHandler, cerebralcortex.core.data_manager.sql.users_handler.UserHandler, cerebralcortex.core.data_manager.sql.kafka_offsets_handler.KafkaOffsetsHandler, cerebralcortex.core.data_manager.sql.cache_handler.CacheHandler, cerebralcortex.core.data_manager.sql.data_ingestion_handler.DataIngestionHandler, cerebralcortex.core.data_manager.sql.metadata_handler.MetadataHandler

close(conn, cursor)[source]

close connection of mysql.

Parameters:
  • conn (object) – MySQL connection object
  • cursor (object) – MySQL cursor object
Raises:

Exception – if connection is closed

create_pool(pool_name: str = 'CC_Pool', pool_size: int = 1)[source]

Create a connection pool, after created, the request of connecting MySQL could get a connection from this pool instead of request to create a connection.

Parameters:
  • pool_name (str) – the name of pool, (default=”CC_Pool”)
  • pool_size (int) – size of MySQL connections pool (default=1)
Returns:

MySQL connections pool

Return type:

object

execute(sql, args=None, commit=False, executemany=False) → List[dict][source]

Execute a sql, it could be with args and with out args. The usage is similar with execute() function in module pymysql.

Parameters:
  • sql (str) – sql clause
  • args (tuple) – args need by sql clause
  • commit (bool) – whether to commit
  • executemany (bool) – execute batch
Returns:

returns a list of dicts if commit is set to False

Return type:

list[dict]

Raises:

Exception – if MySQL query fails

cerebralcortex.core.data_manager.sql.data_ingestion_handler module

class DataIngestionHandler[source]

Bases: object

add_ingestion_log(user_id: str = '', stream_name: str = '', file_path: str = '', fault_type: str = '', fault_description: str = '', success: int = None, metadata=None) → bool[source]

Log errors and success of each record during data import process.

Parameters:
  • user_id (str) – id of a user
  • stream_name (str) – name of a stream
  • file_path (str) – filename with its path
  • fault_type (str) – error type
  • fault_description (str) – error details
  • success (int) – 1 if data was successfully ingested, 0 otherwise
  • metadata (dict) – (optional) metadata of a stream
Returns:

bool

Raises:
  • ValeError – if
  • Exception – if sql query fails user_id, file_path, fault_type, or success parameters is missing
add_scanned_files(user_id: str, stream_name: str, metadata: dict, files_list: list) → bool[source]

Add scanned files in ingestion log table that could be processed later on. This method is specific to MD2K data ingestion.

Parameters:
  • user_id (str) – id of a user
  • stream_name (str) – name of a stream
  • metadata (dict) – raw metadata
  • files_list (list) – list of filenames with its path
Returns:

bool

Raises:

Exception – if sql query fails

get_files_list(stream_name: str = None, user_id=None, success_type=None) → list[source]

Get a list of all the processed/un-processed files

Returns:list of all processed files list
Return type:list
get_ingestion_stats() → list[source]

Get stats on ingested records

Returns:{“fault_type”: str, “total_faults”: int, “success”:int}
Return type:dict
get_processed_files_list(success_type=False) → list[source]

Get a list of all the processed/un-processed files

Returns:list of all processed files list
Return type:list
is_file_processed(filename: str) → bool[source]

check if a file is processed and ingested

Returns:True if file is already processed
Return type:bool
update_ingestion_log(file_path: str = '', fault_type: str = '', fault_description: str = '', success: int = None) → bool[source]

update ingestion Logs of each record during data import process.

Parameters:
  • file_path (str) – filename with its path
  • fault_type (str) – error type
  • fault_description (str) – error details
  • success (int) – 1 if data was successfully ingested, 0 otherwise
Returns:

bool

Raises:
  • ValeError – if
  • Exception – if sql query fails user_id, file_path, fault_type, or success parameters is missing
update_ingestion_log_status(stream_name, fault_type, fault_description, status_type, metadata={}, platform_metadata={})[source]
update_ingestion_log_status_ignore(stream_name, fault_type, fault_description, status_type, metadata=None)[source]

cerebralcortex.core.data_manager.sql.kafka_offsets_handler module

class KafkaOffsetsHandler[source]

Bases: object

get_kafka_offsets(topic: str) → List[dict][source]

Get last stored kafka offsets

Parameters:topic (str) – kafka topic name
Returns:list of kafka offsets. This method will return empty list if topic does not exist and/or no offset is stored for the topic.
Return type:list[dict]
Raises:ValueError – Topic name cannot be empty/None

Examples

>>> CC = CerebralCortex("/directory/path/of/configs/")
>>> CC.get_kafka_offsets("live-data")
>>> [{"id","topic", "topic_partition", "offset_start", "offset_until", "offset_update_time"}]
store_or_update_Kafka_offset(topic: str, topic_partition: str, offset_start: str, offset_until: str) → bool[source]

Store or Update kafka topic offsets. Offsets are used to track what messages have been processed.

Parameters:
  • topic (str) – name of the kafka topic
  • topic_partition (str) – partition number
  • offset_start (str) – starting of offset
  • offset_until (str) – last processed offset
Raises:
  • ValueError – All params are required.
  • Exception – Cannot add/update kafka offsets because ERROR-MESSAGE
Returns:

returns True if offsets are add/updated or throws an exception.

Return type:

bool

cerebralcortex.core.data_manager.sql.stream_handler module

class StreamHandler[source]

Bases: object

get_stream_info_by_hash(metadata_hash: <module 'uuid' from '/home/docs/.pyenv/versions/3.6.8/lib/python3.6/uuid.py'>) → str[source]

metadata_hash are unique to each stream version. This reverse look can return the stream name of a metadata_hash.

Parameters:metadata_hash (uuid) – This could be an actual uuid object or a string form of uuid.
Returns:stream metadata and other info related to a stream
Return type:dict

Examples

>>> CC = CerebralCortex("/directory/path/of/configs/")
>>> CC.get_stream_name("00ab666c-afb8-476e-9872-6472b4e66b68")
>>> {"name": .....} # stream metadata and other information
get_stream_metadata(stream_name: str, version: str = 'all') → List[cerebralcortex.core.metadata_manager.stream.metadata.Metadata][source]

Get a list of metadata for all versions available for a stream.

Parameters:
  • stream_name (str) – name of a stream
  • version (str) – version of a stream. Acceptable parameters are all, latest, or a specific version of a stream (e.g., 2.0) (Default=”all”)
Returns:

Returns an empty list if no metadata is available for a stream_name or a list of metadata otherwise.

Return type:

list (Metadata)

Raises:

ValueError – stream_name cannot be None or empty.

Examples

>>> CC = CerebralCortex("/directory/path/of/configs/")
>>> CC.get_all_users("mperf")
>>> [Metadata] # list of MetaData class objects
get_stream_metadata_hash(stream_name: str) → List[str][source]

Get all the metadata_hash associated with a stream name.

Parameters:stream_name (str) – name of a stream
Returns:list of all the metadata hashes
Return type:list[str]

Examples

>>> CC = CerebralCortex("/directory/path/of/configs/")
>>> CC.get_metadata_hash("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST")
>>> ["00ab666c-afb8-476e-9872-6472b4e66b68", "15cc444c-dfb8-676e-3872-8472b4e66b12"]
get_stream_name(metadata_hash: <module 'uuid' from '/home/docs/.pyenv/versions/3.6.8/lib/python3.6/uuid.py'>) → str[source]

metadata_hash are unique to each stream version. This reverse look can return the stream name of a metadata_hash.

Parameters:metadata_hash (uuid) – This could be an actual uuid object or a string form of uuid.
Returns:name of a stream
Return type:str

Examples

>>> CC = CerebralCortex("/directory/path/of/configs/")
>>> CC.get_stream_name("00ab666c-afb8-476e-9872-6472b4e66b68")
>>> ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST
get_stream_versions(stream_name: str) → list[source]

Returns a list of versions available for a stream

Parameters:stream_name (str) – name of a stream
Returns:list of int
Return type:list
Raises:ValueError – if stream_name is empty or None

Examples

>>> CC = CerebralCortex("/directory/path/of/configs/")
>>> CC.get_stream_versions("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST")
>>> [1, 2, 4]
is_stream(stream_name: str) → bool[source]

Returns true if provided stream exists.

Parameters:stream_name (str) – name of a stream
Returns:True if stream_name exist False otherwise
Return type:bool

Examples

>>> CC = CerebralCortex("/directory/path/of/configs/")
>>> CC.is_stream("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST")
>>> True
list_streams() → List[cerebralcortex.core.metadata_manager.stream.metadata.Metadata][source]

Get all the available stream names with metadata

Returns:list of available streams metadata
Return type:List[Metadata]

Examples

>>> CC = CerebralCortex("/directory/path/of/configs/")
>>> CC.list_streams()
save_stream_metadata(metadata_obj) → dict[source]

Update a record if stream already exists or insert a new record otherwise.

Parameters:metadata_obj (Metadata) – stream metadata
Returns:{“status”: True/False,”verion”:version}
Return type:dict
Raises:Exception – if fail to insert/update record in MySQL. Exceptions are logged in a log file
search_stream(stream_name)[source]

Find all the stream names similar to stream_name arg. For example, passing “location” argument will return all stream names that contain the word location

Returns:list of stream names similar to stream_name arg
Return type:List[str]

Examples

>>> CC = CerebralCortex("/directory/path/of/configs/")
>>> CC.search_stream("battery")
>>> ["BATTERY--org.md2k.motionsense--MOTION_SENSE_HRV--LEFT_WRIST", "BATTERY--org.md2k.phonesensor--PHONE".....]

cerebralcortex.core.data_manager.sql.users_handler module

class UserHandler[source]

Bases: object

create_user(username: str, user_password: str, user_role: str, user_metadata: dict, user_settings: dict) → bool[source]

Create a user in SQL storage if it doesn’t exist

Parameters:
  • username (str) – Only alphanumeric usernames are allowed with the max length of 25 chars.
  • user_password (str) – no size limit on password
  • user_role (str) – role of a user
  • user_metadata (dict) – metadata of a user
  • user_settings (dict) – user settings, mCerebrum configurations of a user
Returns:

True if user is successfully registered or throws any error in case of failure

Return type:

bool

Raises:
  • ValueError – if selected username is not available
  • Exception – if sql query fails
delete_user(username: str)[source]

Delete a user record in SQL table

Parameters:

username – username of a user that needs to be deleted

Returns:

if user is successfully removed

Return type:

bool

Raises:
  • ValueError – if username param is empty or None
  • Exception – if sql query fails
encrypt_user_password(user_password: str) → str[source]

Encrypt password

Parameters:user_password (str) – unencrypted password
Raises:ValueError – password cannot be None or empty.
Returns:encrypted password
Return type:str
gen_random_pass(string_type: str, size: int = 8) → str[source]

Generate a random password

Parameters:
  • string_type – Accepted parameters are “varchar” and “char”. (Default=”varchar”)
  • size – password length (default=8)
Returns:

random password

Return type:

str

get_all_users(study_name: str) → List[dict][source]

Get a list of all users part of a study.

Parameters:study_name (str) – name of a study
Raises:ValueError – Study name is a requied field.
Returns:Returns empty list if there is no user associated to the study_name and/or study_name does not exist.
Return type:list[dict]

Examples

>>> CC = CerebralCortex("/directory/path/of/configs/")
>>> CC.get_all_users("mperf")
>>> [{"76cc444c-4fb8-776e-2872-9472b4e66b16": "nasir_ali"}] # [{user_id, user_name}]
get_user_id(user_name: str) → str[source]

Get the user id linked to user_name.

Parameters:user_name (str) – username of a user
Returns:user id associated to user_name
Return type:str
Raises:ValueError – User name is a required field.

Examples

>>> CC = CerebralCortex("/directory/path/of/configs/")
>>> CC.get_user_id("nasir_ali")
>>> '76cc444c-4fb8-776e-2872-9472b4e66b16'
get_user_metadata(user_id: <module 'uuid' from '/home/docs/.pyenv/versions/3.6.8/lib/python3.6/uuid.py'> = None, username: str = None) → dict[source]

Get user metadata by user_id or by username

Parameters:
  • user_id (str) – id (uuid) of a user
  • user_name (str) – username of a user
Returns:

user metadata

Return type:

dict

Todo

Return list of User class object

Raises:ValueError – User ID/name cannot be empty.

Examples

>>> CC = CerebralCortex("/directory/path/of/configs/")
>>> CC.get_user_metadata(username="nasir_ali")
>>> {"study_name":"mperf"........}
get_user_name(user_id: str) → str[source]

Get the user name linked to a user id.

Parameters:user_name (str) – username of a user
Returns:user_id associated to username
Return type:bool
Raises:ValueError – User ID is a required field.

Examples

>>> CC = CerebralCortex("/directory/path/of/configs/")
>>> CC.get_user_name("76cc444c-4fb8-776e-2872-9472b4e66b16")
>>> 'nasir_ali'
get_user_settings(username: str = None, auth_token: str = None) → dict[source]

Get user settings by auth-token or by username. These are user’s mCerebrum settings

Parameters:
  • username (str) – username of a user
  • auth_token (str) – auth-token
Returns:

List of dictionaries of user metadata

Return type:

list[dict]

Todo

Return list of User class object

Raises:ValueError – User ID/name cannot be empty.

Examples

>>> CC = CerebralCortex("/directory/path/of/configs/")
>>> CC.get_user_settings(username="nasir_ali")
>>> [{"mcerebrum":"some-conf"........}]
is_auth_token_valid(username: str, auth_token: str, checktime: bool = False) → bool[source]

Validate whether a token is valid or expired based on the token expiry datetime stored in SQL

Parameters:
  • username (str) – username of a user
  • auth_token (str) – token generated by API-Server
  • checktime (bool) – setting this to False will only check if the token is available in system. Setting this to true will check if the token is expired based on the token expiry date.
Raises:

ValueError – Auth token and auth-token expiry time cannot be null/empty.

Returns:

returns True if token is valid or False otherwise.

Return type:

bool

is_user(user_id: <module 'uuid' from '/home/docs/.pyenv/versions/3.6.8/lib/python3.6/uuid.py'> = None, user_name: <module 'uuid' from '/home/docs/.pyenv/versions/3.6.8/lib/python3.6/uuid.py'> = None) → bool[source]

Checks whether a user exists in the system. One of both parameters could be set to verify whether user exist.

Parameters:
  • user_id (str) – id (uuid) of a user
  • user_name (str) – username of a user
Returns:

True if a user exists in the system or False otherwise.

Return type:

bool

Raises:

ValueError – Both user_id and user_name cannot be None or empty.

Examples

>>> CC = CerebralCortex("/directory/path/of/configs/")
>>> CC.is_user(user_id="76cc444c-4fb8-776e-2872-9472b4e66b16")
>>> True
login_user(username: str, password: str, encrypted_password: bool = False) → dict[source]

Authenticate a user based on username and password and return an auth token

Parameters:
  • username (str) – username of a user
  • password (str) – password of a user
  • encrypted_password (str) – is password encrypted or not. mCerebrum sends encrypted passwords
Raises:

ValueError – User name and password cannot be empty/None.

Returns:

return eturn {“status”:bool, “auth_token”: str, “msg”: str}

Return type:

dict

Examples

>>> CC = CerebralCortex("/directory/path/of/configs/")
>>> CC.connect("nasir_ali", "2ksdfhoi2r2ljndf823hlkf8234hohwef0234hlkjwer98u234", True)
>>> True
update_auth_token(username: str, auth_token: str, auth_token_issued_time: datetime.datetime, auth_token_expiry_time: datetime.datetime) → bool[source]

Update an auth token in SQL database to keep user stay logged in. Auth token valid duration can be changed in configuration files.

Parameters:
  • username (str) – username of a user
  • auth_token (str) – issued new auth token
  • auth_token_issued_time (datetime) – datetime when the old auth token was issue
  • auth_token_expiry_time (datetime) – datetime when the token will get expired
Raises:

ValueError – Auth token and auth-token issue/expiry time cannot be None/empty.

Returns:

Returns True if the new auth token is set or False otherwise.

Return type:

bool

username_checks(username: str)[source]

No space, special characters, dash etc. are allowed in username. Only alphanumeric usernames are allowed with the max length of 25 chars.

Parameters:username (str) –
Returns:True if provided username comply the standard or throw an exception
Return type:bool
Raises:Exception – if username doesn’t follow standards

Module contents