cerebralcortex.core.data_manager.sql package¶
Submodules¶
cerebralcortex.core.data_manager.sql.cache_handler module¶
-
class
CacheHandler
[source]¶ Bases:
object
-
get_cache_value
(key: str) → str[source]¶ Retrieves value from the cache for the given key.
Parameters: key – key in the cache Returns: The value in the cache Return type: str Raises: ValueError
– if key is None or empty
-
set_cache_value
(key: str, value: str) → bool[source]¶ Creates a new cache entry in the cache. Values are overwritten for existing keys.
Parameters: - key – key in the cache
- value – value associated with the key
Returns: True on successful insert or False otherwise.
Return type: bool
Raises: ValueError
– if key is None or empty
-
cerebralcortex.core.data_manager.sql.data module¶
-
class
SqlData
(CC)[source]¶ Bases:
cerebralcortex.core.data_manager.sql.stream_handler.StreamHandler
,cerebralcortex.core.data_manager.sql.users_handler.UserHandler
,cerebralcortex.core.data_manager.sql.kafka_offsets_handler.KafkaOffsetsHandler
,cerebralcortex.core.data_manager.sql.cache_handler.CacheHandler
,cerebralcortex.core.data_manager.sql.data_ingestion_handler.DataIngestionHandler
,cerebralcortex.core.data_manager.sql.metadata_handler.MetadataHandler
-
close
(conn, cursor)[source]¶ close connection of mysql.
Parameters: - conn (object) – MySQL connection object
- cursor (object) – MySQL cursor object
Raises: Exception
– if connection is closed
-
create_pool
(pool_name: str = 'CC_Pool', pool_size: int = 1)[source]¶ Create a connection pool, after created, the request of connecting MySQL could get a connection from this pool instead of request to create a connection.
Parameters: - pool_name (str) – the name of pool, (default=”CC_Pool”)
- pool_size (int) – size of MySQL connections pool (default=1)
Returns: MySQL connections pool
Return type: object
-
execute
(sql, args=None, commit=False, executemany=False) → List[dict][source]¶ Execute a sql, it could be with args and with out args. The usage is similar with execute() function in module pymysql.
Parameters: - sql (str) – sql clause
- args (tuple) – args need by sql clause
- commit (bool) – whether to commit
- executemany (bool) – execute batch
Returns: returns a list of dicts if commit is set to False
Return type: list[dict]
Raises: Exception
– if MySQL query fails
-
cerebralcortex.core.data_manager.sql.data_ingestion_handler module¶
-
class
DataIngestionHandler
[source]¶ Bases:
object
-
add_ingestion_log
(user_id: str = '', stream_name: str = '', file_path: str = '', fault_type: str = '', fault_description: str = '', success: int = None, metadata=None) → bool[source]¶ Log errors and success of each record during data import process.
Parameters: - user_id (str) – id of a user
- stream_name (str) – name of a stream
- file_path (str) – filename with its path
- fault_type (str) – error type
- fault_description (str) – error details
- success (int) – 1 if data was successfully ingested, 0 otherwise
- metadata (dict) – (optional) metadata of a stream
Returns: bool
Raises: ValeError
– ifException
– if sql query fails user_id, file_path, fault_type, or success parameters is missing
-
add_scanned_files
(user_id: str, stream_name: str, metadata: dict, files_list: list) → bool[source]¶ Add scanned files in ingestion log table that could be processed later on. This method is specific to MD2K data ingestion.
Parameters: - user_id (str) – id of a user
- stream_name (str) – name of a stream
- metadata (dict) – raw metadata
- files_list (list) – list of filenames with its path
Returns: bool
Raises: Exception
– if sql query fails
-
get_files_list
(stream_name: str = None, user_id=None, success_type=None) → list[source]¶ Get a list of all the processed/un-processed files
Returns: list of all processed files list Return type: list
-
get_ingestion_stats
() → list[source]¶ Get stats on ingested records
Returns: {“fault_type”: str, “total_faults”: int, “success”:int} Return type: dict
-
get_processed_files_list
(success_type=False) → list[source]¶ Get a list of all the processed/un-processed files
Returns: list of all processed files list Return type: list
-
is_file_processed
(filename: str) → bool[source]¶ check if a file is processed and ingested
Returns: True if file is already processed Return type: bool
-
update_ingestion_log
(file_path: str = '', fault_type: str = '', fault_description: str = '', success: int = None) → bool[source]¶ update ingestion Logs of each record during data import process.
Parameters: - file_path (str) – filename with its path
- fault_type (str) – error type
- fault_description (str) – error details
- success (int) – 1 if data was successfully ingested, 0 otherwise
Returns: bool
Raises: ValeError
– ifException
– if sql query fails user_id, file_path, fault_type, or success parameters is missing
-
cerebralcortex.core.data_manager.sql.kafka_offsets_handler module¶
-
class
KafkaOffsetsHandler
[source]¶ Bases:
object
-
get_kafka_offsets
(topic: str) → List[dict][source]¶ Get last stored kafka offsets
Parameters: topic (str) – kafka topic name Returns: list of kafka offsets. This method will return empty list if topic does not exist and/or no offset is stored for the topic. Return type: list[dict] Raises: ValueError
– Topic name cannot be empty/NoneExamples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.get_kafka_offsets("live-data") >>> [{"id","topic", "topic_partition", "offset_start", "offset_until", "offset_update_time"}]
-
store_or_update_Kafka_offset
(topic: str, topic_partition: str, offset_start: str, offset_until: str) → bool[source]¶ Store or Update kafka topic offsets. Offsets are used to track what messages have been processed.
Parameters: - topic (str) – name of the kafka topic
- topic_partition (str) – partition number
- offset_start (str) – starting of offset
- offset_until (str) – last processed offset
Raises: ValueError
– All params are required.Exception
– Cannot add/update kafka offsets because ERROR-MESSAGE
Returns: returns True if offsets are add/updated or throws an exception.
Return type: bool
-
cerebralcortex.core.data_manager.sql.stream_handler module¶
-
class
StreamHandler
[source]¶ Bases:
object
-
get_stream_info_by_hash
(metadata_hash: <module 'uuid' from '/home/docs/.pyenv/versions/3.6.8/lib/python3.6/uuid.py'>) → str[source]¶ metadata_hash are unique to each stream version. This reverse look can return the stream name of a metadata_hash.
Parameters: metadata_hash (uuid) – This could be an actual uuid object or a string form of uuid. Returns: stream metadata and other info related to a stream Return type: dict Examples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.get_stream_name("00ab666c-afb8-476e-9872-6472b4e66b68") >>> {"name": .....} # stream metadata and other information
-
get_stream_metadata
(stream_name: str, version: str = 'all') → List[cerebralcortex.core.metadata_manager.stream.metadata.Metadata][source]¶ Get a list of metadata for all versions available for a stream.
Parameters: - stream_name (str) – name of a stream
- version (str) – version of a stream. Acceptable parameters are all, latest, or a specific version of a stream (e.g., 2.0) (Default=”all”)
Returns: Returns an empty list if no metadata is available for a stream_name or a list of metadata otherwise.
Return type: list (Metadata)
Raises: ValueError
– stream_name cannot be None or empty.Examples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.get_all_users("mperf") >>> [Metadata] # list of MetaData class objects
-
get_stream_metadata_hash
(stream_name: str) → List[str][source]¶ Get all the metadata_hash associated with a stream name.
Parameters: stream_name (str) – name of a stream Returns: list of all the metadata hashes Return type: list[str] Examples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.get_metadata_hash("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> ["00ab666c-afb8-476e-9872-6472b4e66b68", "15cc444c-dfb8-676e-3872-8472b4e66b12"]
-
get_stream_name
(metadata_hash: <module 'uuid' from '/home/docs/.pyenv/versions/3.6.8/lib/python3.6/uuid.py'>) → str[source]¶ metadata_hash are unique to each stream version. This reverse look can return the stream name of a metadata_hash.
Parameters: metadata_hash (uuid) – This could be an actual uuid object or a string form of uuid. Returns: name of a stream Return type: str Examples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.get_stream_name("00ab666c-afb8-476e-9872-6472b4e66b68") >>> ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST
-
get_stream_versions
(stream_name: str) → list[source]¶ Returns a list of versions available for a stream
Parameters: stream_name (str) – name of a stream Returns: list of int Return type: list Raises: ValueError
– if stream_name is empty or NoneExamples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.get_stream_versions("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> [1, 2, 4]
-
is_stream
(stream_name: str) → bool[source]¶ Returns true if provided stream exists.
Parameters: stream_name (str) – name of a stream Returns: True if stream_name exist False otherwise Return type: bool Examples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.is_stream("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> True
-
list_streams
() → List[cerebralcortex.core.metadata_manager.stream.metadata.Metadata][source]¶ Get all the available stream names with metadata
Returns: list of available streams metadata Return type: List[Metadata] Examples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.list_streams()
-
save_stream_metadata
(metadata_obj) → dict[source]¶ Update a record if stream already exists or insert a new record otherwise.
Parameters: metadata_obj (Metadata) – stream metadata Returns: {“status”: True/False,”verion”:version} Return type: dict Raises: Exception
– if fail to insert/update record in MySQL. Exceptions are logged in a log file
-
search_stream
(stream_name)[source]¶ Find all the stream names similar to stream_name arg. For example, passing “location” argument will return all stream names that contain the word location
Returns: list of stream names similar to stream_name arg Return type: List[str] Examples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.search_stream("battery") >>> ["BATTERY--org.md2k.motionsense--MOTION_SENSE_HRV--LEFT_WRIST", "BATTERY--org.md2k.phonesensor--PHONE".....]
-
cerebralcortex.core.data_manager.sql.users_handler module¶
-
class
UserHandler
[source]¶ Bases:
object
-
create_user
(username: str, user_password: str, user_role: str, user_metadata: dict, user_settings: dict) → bool[source]¶ Create a user in SQL storage if it doesn’t exist
Parameters: - username (str) – Only alphanumeric usernames are allowed with the max length of 25 chars.
- user_password (str) – no size limit on password
- user_role (str) – role of a user
- user_metadata (dict) – metadata of a user
- user_settings (dict) – user settings, mCerebrum configurations of a user
Returns: True if user is successfully registered or throws any error in case of failure
Return type: bool
Raises: ValueError
– if selected username is not availableException
– if sql query fails
-
delete_user
(username: str)[source]¶ Delete a user record in SQL table
Parameters: username – username of a user that needs to be deleted
Returns: if user is successfully removed
Return type: bool
Raises: ValueError
– if username param is empty or NoneException
– if sql query fails
-
encrypt_user_password
(user_password: str) → str[source]¶ Encrypt password
Parameters: user_password (str) – unencrypted password Raises: ValueError
– password cannot be None or empty.Returns: encrypted password Return type: str
-
gen_random_pass
(string_type: str, size: int = 8) → str[source]¶ Generate a random password
Parameters: - string_type – Accepted parameters are “varchar” and “char”. (Default=”varchar”)
- size – password length (default=8)
Returns: random password
Return type: str
-
get_all_users
(study_name: str) → List[dict][source]¶ Get a list of all users part of a study.
Parameters: study_name (str) – name of a study Raises: ValueError
– Study name is a requied field.Returns: Returns empty list if there is no user associated to the study_name and/or study_name does not exist. Return type: list[dict] Examples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.get_all_users("mperf") >>> [{"76cc444c-4fb8-776e-2872-9472b4e66b16": "nasir_ali"}] # [{user_id, user_name}]
-
get_user_id
(user_name: str) → str[source]¶ Get the user id linked to user_name.
Parameters: user_name (str) – username of a user Returns: user id associated to user_name Return type: str Raises: ValueError
– User name is a required field.Examples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.get_user_id("nasir_ali") >>> '76cc444c-4fb8-776e-2872-9472b4e66b16'
-
get_user_metadata
(user_id: <module 'uuid' from '/home/docs/.pyenv/versions/3.6.8/lib/python3.6/uuid.py'> = None, username: str = None) → dict[source]¶ Get user metadata by user_id or by username
Parameters: - user_id (str) – id (uuid) of a user
- user_name (str) – username of a user
Returns: user metadata
Return type: dict
Todo
Return list of User class object
Raises: ValueError
– User ID/name cannot be empty.Examples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.get_user_metadata(username="nasir_ali") >>> {"study_name":"mperf"........}
-
get_user_name
(user_id: str) → str[source]¶ Get the user name linked to a user id.
Parameters: user_name (str) – username of a user Returns: user_id associated to username Return type: bool Raises: ValueError
– User ID is a required field.Examples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.get_user_name("76cc444c-4fb8-776e-2872-9472b4e66b16") >>> 'nasir_ali'
-
get_user_settings
(username: str = None, auth_token: str = None) → dict[source]¶ Get user settings by auth-token or by username. These are user’s mCerebrum settings
Parameters: - username (str) – username of a user
- auth_token (str) – auth-token
Returns: List of dictionaries of user metadata
Return type: list[dict]
Todo
Return list of User class object
Raises: ValueError
– User ID/name cannot be empty.Examples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.get_user_settings(username="nasir_ali") >>> [{"mcerebrum":"some-conf"........}]
-
is_auth_token_valid
(username: str, auth_token: str, checktime: bool = False) → bool[source]¶ Validate whether a token is valid or expired based on the token expiry datetime stored in SQL
Parameters: - username (str) – username of a user
- auth_token (str) – token generated by API-Server
- checktime (bool) – setting this to False will only check if the token is available in system. Setting this to true will check if the token is expired based on the token expiry date.
Raises: ValueError
– Auth token and auth-token expiry time cannot be null/empty.Returns: returns True if token is valid or False otherwise.
Return type: bool
-
is_user
(user_id: <module 'uuid' from '/home/docs/.pyenv/versions/3.6.8/lib/python3.6/uuid.py'> = None, user_name: <module 'uuid' from '/home/docs/.pyenv/versions/3.6.8/lib/python3.6/uuid.py'> = None) → bool[source]¶ Checks whether a user exists in the system. One of both parameters could be set to verify whether user exist.
Parameters: - user_id (str) – id (uuid) of a user
- user_name (str) – username of a user
Returns: True if a user exists in the system or False otherwise.
Return type: bool
Raises: ValueError
– Both user_id and user_name cannot be None or empty.Examples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.is_user(user_id="76cc444c-4fb8-776e-2872-9472b4e66b16") >>> True
-
login_user
(username: str, password: str, encrypted_password: bool = False) → dict[source]¶ Authenticate a user based on username and password and return an auth token
Parameters: - username (str) – username of a user
- password (str) – password of a user
- encrypted_password (str) – is password encrypted or not. mCerebrum sends encrypted passwords
Raises: ValueError
– User name and password cannot be empty/None.Returns: return eturn {“status”:bool, “auth_token”: str, “msg”: str}
Return type: dict
Examples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.connect("nasir_ali", "2ksdfhoi2r2ljndf823hlkf8234hohwef0234hlkjwer98u234", True) >>> True
-
update_auth_token
(username: str, auth_token: str, auth_token_issued_time: datetime.datetime, auth_token_expiry_time: datetime.datetime) → bool[source]¶ Update an auth token in SQL database to keep user stay logged in. Auth token valid duration can be changed in configuration files.
Parameters: - username (str) – username of a user
- auth_token (str) – issued new auth token
- auth_token_issued_time (datetime) – datetime when the old auth token was issue
- auth_token_expiry_time (datetime) – datetime when the token will get expired
Raises: ValueError
– Auth token and auth-token issue/expiry time cannot be None/empty.Returns: Returns True if the new auth token is set or False otherwise.
Return type: bool
-
username_checks
(username: str)[source]¶ No space, special characters, dash etc. are allowed in username. Only alphanumeric usernames are allowed with the max length of 25 chars.
Parameters: username (str) – Returns: True if provided username comply the standard or throw an exception Return type: bool Raises: Exception
– if username doesn’t follow standards
-