CerebralCortex Kernel¶
Cerebral Cortex is the big data cloud companion of mCerebrum designed to support population-scale data analysis, visualization, model development, and intervention design for mobile sensor data.
You can find more information about MD2K software on our software website or the MD2K organization on our MD2K website.
CerebralCortex Kernel is part of our CerebralCortex cloud platform. CerebralCortex-Kernel is capable of parallelizing tasks and scale a job to n-number of cores/machines. CerebralCortex Kernel offers some builtin features as follows:
Installation¶
Dependencies¶
CerebralCortex Kernel requires java 8
to run. Java 8 prior to
version 8u92 support is deprecated as of CerebralCortex-Kernel 3.3.0. -
check java version - java -version
- set JAVA_HOME
to java 8
- OR start python shell with JAVA_HOME=/path/to/java/Home python3
Install using pip¶
CerebralCortex-Kernel requires minimum Python3.6. To install CerebralCortex-Kernel as an API:
pip3 install cerebralcortex-kernel
- Note: please use appropriate pip (e.g., pip, pip3, pip3.6 etc.) installed on your machine
Install from source code¶
- Clone repo -
git clone https://github.com/MD2Korg/CerebralCortex-Kernel.git
cd CerebralCortex-Kernel
python3 setup.py install
Usage¶
from cerebralcortex.kernel import Kernel
CC = Kernel(cc_configs="default")
# to view default configs
print(CC.config)
# default data storage path is
# /user/home/folder/cc_data
By default Kernel will load default configs. Please have a look at all available configurations for CerebralCortex-Kernel. You may also load config files as:
CC = Kernel(configs_dir_path="dir/path/to/configs/", new_study=True)
How to use builtin algorithms¶
Using builtin algorithms are as easy as loading data, passing it to algorithm and get the results. Below is an example on how to compute CGM Glucose Variability Metrics.
- Download Glucose Data. The device used to collect glucose was the Dexcom G6 Continuous Glucose Monitor
- Install Cerebral Cortex Kernel
pip install cerebralcortex-kernel
- Open terminal and start python3 shell
Python Code¶
# import packages
from cerebralcortex.kernel import Kernel
from cerebralcortex.algorithms.glucose.glucose_variability_metrics import glucose_var
# Create Kernel object
CC = Kernel(cc_configs="default", new_study=True)
# Read sample CSV data
ds = CC.read_csv("/path/of/the/downloaded/file/sample.csv", stream_name="cgm_glucose_variability_metrics", header=True)
# view sample data
ds.show(2)
# Apply glucose_variability_metrics algorithm on the data
results = glucose_var(ds)
# view results
results.show(2)
# save computed data
CC.save_stream(results)
Please have a look at jupyter notebook for basic operation that could be perform on DataStream object.
Algorithms to Analyze Sensor Data¶
External CerebralCortex-Kernel offers following builtin algorithms to analyze sensor data.
- ECG sensor data quality
- ECG RR Interval Computation
- Heart Rate Variability Feature Computation
- CGM Glucose Variability Metrics
- GPS Data Clustering
- Sensor Data Interpolation
- Statistical Features Computation
- List of all available algorithms
Markers with ML Models¶
- Stress Detection using ECG data
- mContain Social Crowding
- Brushing Detection using Accelerometer and Gyro Data (TODO)
Import and Document Data¶
External CerebralCortex-Kernel Supported Platforms¶
- mProv
- mFlow
Examples¶
Documentation¶
Deploy on Cloud¶
CerebralCortex-Kernel is a part of CerebralCortex cloud platform. To test the complete cloud platform, please visit CerebralCortex.
FAQ¶
1 - Do I need whole CerebralCortex cloud platform to use CerebralCortex-Kernal?
No! If you want to use CerebralCortex-Kernel independently.
2 - How can I change NoSQL backend storage layer?
CerebralCortex-Kernel follows component based structure. This makes it easier to add/remove features. * Add a new class in Data manager-Raw. * New class must have read/write methods. Here is a sample skeleton class with mandatory methods required in the new class. * Create an object of new class in Data-Raw with appropriate parameters. * Add appropriate configurations in cerebralcortex.yml in (NoSQL Storage)[https://github.com/MD2Korg/CerebralCortex-Kernel/blob/master/conf/cerebralcortex.yml#L8] section.
3 - How can I replace MySQL with another SQL storage system?
- Add a new class in Data manager-SQL.
- New class must implement all of the methods available in stream_handler.py class.
- Create an object of new class in Data-SQL with appropriate parameters.
- Add appropriate configurations in cerebralcortex.yml in Relational Storage section.
4 - Where are all the backend storage related classes/methods?
In Data manager-Raw. You can add/change any backend storage.
Contributing¶
Please read our Contributing Guidelines for details on the process for submitting pull requests to us.
We use the Python PEP 8 Style Guide.
Our Code of Conduct is the Contributor Covenant.
Bug reports can be submitted through JIRA.
Our discussion forum can be found here.
Versioning¶
We use Semantic Versioning for versioning the software which is based on the following guidelines.
MAJOR.MINOR.PATCH (example: 3.0.12)
- MAJOR version when incompatible API changes are made,
- MINOR version when functionality is added in a backwards-compatible manner, and
- PATCH version when backwards-compatible bug fixes are introduced.
For the versions available, see this repository’s tags.
Contributors¶
Link to the list of contributors who participated in this project.
Acknowledgments¶
- National Institutes of Health - Big Data to Knowledge Initiative
- Grants: R01MD010362, 1UG1DA04030901, 1U54EB020404, 1R01CA190329, 1R01DE02524, R00MD010468, 3UH2DA041713, 10555SC
- National Science Foundation
- Grants: 1640813, 1722646
- Intelligence Advanced Research Projects Activity
- Contract: 2017-17042800006
cerebralcortex package¶
Subpackages¶
cerebralcortex.algorithms package¶
Subpackages¶
-
bluetooth_encounter
(data, st: datetime.datetime, et: datetime.datetime, distance_threshold=12, n_rows_threshold=8, time_threshold=600, ltime=True)[source]¶ Parameters: - ds – Input Datastream
- st – Start Time the time window in UTC
- et – End Time of time window in UTC
- distance_threshold – Threshold on mean distance per encounter
- n_rows_threshold – No of rows per group/encounter
- time_threshold – Minimum Duration of time per encounter
- epsilon – A simple threshold
- count_threshold – Threshold on count
Returns: A Sparse representation of the Bluetooth Encounter
-
ecg_autosense_data_quality
(ecg, Fs=64, sensor_name='autosense', outlier_threshold_high=4000, outlier_threshold_low=20, slope_threshold=100, range_threshold=50, eck_threshold_band_loose=400, window_size=3, acceptable_outlier_percent=34)[source]¶ Some desc..
Parameters: - ecg (DataStream) –
- Fs (int) –
- sensor_name (str) –
- outlier_threshold_high (int) –
- outlier_threshold_low (int) –
- slope_threshold (int) –
- range_threshold (int) –
- eck_threshold_band_loose (int) –
- window_size (int) –
- acceptable_outlier_percent (int) –
Returns: DataStream - structure [timestamp, localtime, version…..]
-
get_rr_interval
(ecg_data, Fs=64)[source]¶ Parameters: - ecg_data (DataStream) –
- Fs (int) –
Returns: DataStream - timestamp, localtime, user, version ….
-
get_hrv_features
(rr_data, acceptable_percentage=50, window_length=60)[source]¶ Parameters: - rr_data (DataStream) –
- acceptable_percentage (int) –
- window_length (int) –
Returns:
-
ema_incentive
(ds)[source]¶ Parse stream name ‘incentive–org.md2k.ema_scheduler–phone’. Convert json column to multiple columns.
Parameters: ds – Windowed/grouped DataStream object Returns: Windowed/grouped DataStream object. Return type: ds
-
ema_logs
(ds)[source]¶ Convert json column to multiple columns.
Parameters: ds (DataStream) – Windowed/grouped DataStream object Returns:
-
glucose_var
(ds)[source]¶ Compute CGM Glucose Variability Metrics:
This algorithm computes 23 clinically validated glucose variability metrics from continuous glucose monitor data.
- Input:
- ds (DataStream): Windowed/grouped DataStream of CGM data
Returns: DataStream with glucose variability metrics Glucose Variability Metrics include: Interday Mean Glucose Interday Median Glucose Interday Maximum Glucose Interday Minimum Glucose Interday Standard Deviation of Glucose Interday Coefficient of Variation of Glucose Intraday Standard Deviation of Glucose (mean, median, standard deviation) Intraday Coefficient of Variation of Glucose (mean, median, standard deviation) TIR (Time in Range of default 1 SD) TOR (Time outside Range of default 1 SD) POR (Percent outside Range of default 1 SD) MAGE (Mean Amplitude of Glucose Excursions, default 1 SD) MAGN (Mean Amplitude of Normal Glucose, default 1 SD) J-index LBGI (Low Blood Glucose Index) HBGI (High Blood Glucose Index) MODD (Mean of Daily Differences) CONGA24 (Continuous overall net glycemic action over 24 hours) ADRR (Average Daily Risk Range) GMI (Glucose Management Indicator) eA1c (estimated A1c according to American Diabetes Association) Q1G (intraday first quartile glucose) Q3G (intraday third quartile glucose) ** for more information on these glucose metrics see dbdp.org**
-
cluster_gps
(ds: cerebralcortex.core.datatypes.datastream.DataStream, epsilon_constant: int = 1000, km_per_radian: int = 6371.0088, geo_fence_distance: int = 30, minimum_points_in_cluster: int = 1, latitude_column_name: str = 'latitude', longitude_column_name: str = 'longitude')[source]¶ Cluster GPS data - Algorithm used to cluster GPS data is based on DBScan
Parameters: - ds (DataStream) – Windowed/grouped DataStream object
- epsilon_constant (int) –
- km_per_radian (int) –
- geo_fence_distance (int) –
- minimum_points_in_cluster (int) –
- latitude_column_name (str) –
- longitude_column_name (str) –
Returns: DataStream object
-
impute_gps_data
(ds, accuracy_threashold: int = 100)[source]¶ Inpute GPS data
Parameters: - ds (DataStream) – Windowed/grouped DataStream object
- accuracy_threashold (int) –
Returns: DataStream object
-
heart_rate_power
(power: numpy.ndarray, frequency: numpy.ndarray, low_rate: float, high_rate: float)[source]¶ Compute Heart Rate Power for specific frequency range :param power: np.ndarray :param frequency: np.ndarray :param high_rate: float :param low_rate: float :return: sum of power for the frequency range
-
lomb
(time_stamps: List, samples: List, low_frequency: float, high_frequency: float)[source]¶ - : Lomb–Scargle periodogram implementation
param data: List[DataPoint] param high_frequency: float param low_frequency: float :return lomb-scargle pgram and frequency values
-
rr_feature_computation
(timestamp: list, value: list, low_frequency: float = 0.01, high_frequency: float = 0.7, low_rate_vlf: float = 0.0009, high_rate_vlf: float = 0.04, low_rate_hf: float = 0.15, high_rate_hf: float = 0.4, low_rate_lf: float = 0.04, high_rate_lf: float = 0.15)[source]¶ ECG Feature Implementation. The frequency ranges for High, Low and Very low heart rate variability values are derived from the following paper: ‘Heart rate variability: standards of measurement, physiological interpretation and clinical use’ :param high_rate_lf: float :param low_rate_lf: float :param high_rate_hf: float :param low_rate_hf: float :param high_rate_vlf: float :param low_rate_vlf: float :param high_frequency: float :param low_frequency: float :param datastream: DataStream :param window_size: float :param window_offset: float :return: ECG Feature DataStreams
-
complementary_filter
(ds, freq: int = 16, accelerometer_x: str = 'accelerometer_x', accelerometer_y: str = 'accelerometer_y', accelerometer_z: str = 'accelerometer_z', gyroscope_x: str = 'gyroscope_x', gyroscope_y: str = 'gyroscope_y', gyroscope_z: str = 'gyroscope_z')[source]¶ Compute complementary filter on gyro and accel data.
Parameters: - ds (DataStream) – Non-Windowed/grouped dataframe
- freq (int) – frequency of accel/gryo. Assumption is that frequency is equal for both gyro and accel.
- accelerometer_x (str) – name of the column
- accelerometer_y (str) – name of the column
- accelerometer_z (str) – name of the column
- gyroscope_x (str) – name of the column
- gyroscope_y (str) – name of the column
- gyroscope_z (str) – name of the column
-
compute_FFT_features
(ds, exclude_col_names: list = [], feature_names=['fft_centroid', 'fft_spread', 'spectral_entropy', 'fft_flux', 'spectral_falloff'])[source]¶ Transforms data from time domain to frequency domain.
Parameters: - list (feature_names) – name of the columns on which features should not be computed
- list – names of the features. Supported features are fft_centroid, fft_spread, spectral_entropy, spectral_entropy_old, fft_flux, spectral_falloff
- windowDuration (int) – duration of a window in seconds
- slideDuration (int) – slide duration of a window
- List[str] (groupByColumnName) – groupby column names, for example, groupby user, col1, col2
- startTime (datetime) – The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15… provide startTime as 15 minutes. First time of data will be used as startTime if none is provided
Returns: DataStream object with all the existing data columns and FFT features
-
compute_zero_cross_rate
(ds, exclude_col_names: list = [], feature_names=['zero_cross_rate'])[source]¶ Compute statistical features.
Parameters: - ds (DataStream) – Windowed/grouped dataframe
- list (feature_names) – name of the columns on which features should not be computed
- list – names of the features. Supported features are [‘mean’, ‘median’, ‘stddev’, ‘variance’, ‘max’, ‘min’, ‘skew’, ‘kurt’, ‘sqr’, ‘zero_cross_rate’
- windowDuration (int) – duration of a window in seconds
- slideDuration (int) – slide duration of a window
- List[str] (groupByColumnName) – groupby column names, for example, groupby user, col1, col2
- startTime (datetime) – The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15… provide startTime as 15 minutes. First time of data will be used as startTime if none is provided
Returns: DataStream object
-
interpolate
(ds, freq=16, method='linear', axis=0, limit=None, inplace=False, limit_direction='forward', limit_area=None, downcast=None)[source]¶ Interpolate values according to different methods. This method internally uses pandas interpolation.
Parameters: - ds (DataStream) – Windowed/grouped DataStream object
- freq (int) – Frequency of the signal
- method (str) – default ‘linear’ - ‘linear’: Ignore the index and treat the values as equally spaced. This is the only method supported on MultiIndexes. - ‘time’: Works on daily and higher resolution data to interpolate given length of interval. - ‘index’, ‘values’: use the actual numerical values of the index. - ‘pad’: Fill in NaNs using existing values. - ‘nearest’, ‘zero’, ‘slinear’, ‘quadratic’, ‘cubic’, ‘spline’, ‘barycentric’, ‘polynomial’: Passed to scipy.interpolate.interp1d. These methods use the numerical values of the index. Both ‘polynomial’ and ‘spline’ require that you also specify an order (int), e.g. df.interpolate(method=’polynomial’, order=5). - ‘krogh’, ‘piecewise_polynomial’, ‘spline’, ‘pchip’, ‘akima’: Wrappers around the SciPy interpolation methods of similar names. See Notes. - ‘from_derivatives’: Refers to scipy.interpolate.BPoly.from_derivatives which replaces ‘piecewise_polynomial’ interpolation method in scipy 0.18.
- {0 or ‘index’, 1 or ‘columns’, None} (axis) – default None. Axis to interpolate along.
- limit (int) – optional. Maximum number of consecutive NaNs to fill. Must be greater than 0.
- inplace (bool) – default False. Update the data in place if possible.
- {‘forward’, ‘backward’, ‘both’} (limit_direction) – default ‘forward’. If limit is specified, consecutive NaNs will be filled in this direction.
- {None, ‘inside’, ‘outside’} (limit_area) – default None. If limit is specified, consecutive NaNs will be filled with this restriction. - None: No fill restriction. - ‘inside’: Only fill NaNs surrounded by valid values (interpolate). - ‘outside’: Only fill NaNs outside valid values (extrapolate).
- optional, ‘infer’ or None (downcast) – defaults to None
- **kwargs – Keyword arguments to pass on to the interpolating function.
Returns DataStream: interpolated data
-
magnitude
(ds, col_names=[])[source]¶ Compute magnitude of columns
Parameters: - ds (DataStream) – Windowed/grouped DataStream object
- col_names (list[str]) – column names
Returns: DataStream
-
statistical_features
(ds, exclude_col_names: list = [], feature_names=['mean', 'median', 'stddev', 'variance', 'max', 'min', 'skew', 'kurt', 'sqr'])[source]¶ Compute statistical features.
Parameters: - ds (DataStream) – Windowed/grouped DataStream object
- list (feature_names) – name of the columns on which features should not be computed
- list – names of the features. Supported features are [‘mean’, ‘median’, ‘stddev’, ‘variance’, ‘max’, ‘min’, ‘skew’, ‘kurt’, ‘sqr’, ‘zero_cross_rate’
Returns: DataStream object with all the existing data columns and FFT features
-
compute_stress_episodes
(ecg_stress_probability, macd_param_fast=7, macd_param_slow=19, macd_param_signal=2, threshold_stressed=0.36, threshold_not_stressed=0.36)[source]¶ Compute stress episodes using MACD
Parameters: - ecg_stress_probability (DataStream) –
- macd_param_fast (int) –
- macd_param_slow (int) –
- macd_param_signal (into) –
- threshold_stressed (float) –
- threshold_not_stressed (float) –
Returns: with a column stress_episodes
Return type:
-
forward_fill_data
(stress_data, output_stream_name='org.md2k.autosense.ecg.stress.probability.forward.filled', minimum_points_per_day=60)[source]¶ Parameters: - stress_data (DataStream) –
- output_stream_name (str) –
- minimum_points_per_day (int) –
Returns:
-
get_metadata
(stress_imputed_data, output_stream_name, input_stream_name)[source]¶ generate metadata for a datastream.
Parameters: - stress_imputed_data (DataStream) –
- output_stream_name (str) –
Returns:
-
impute_stress_likelihood
(stress_data, output_stream_name='org.md2k.autosense.ecg.stress.probability.imputed')[source]¶ Parameters: - stress_data (DataStream) –
- output_stream_name (str) –
Returns:
-
normalize_features
(data, index_of_first_order_feature=2, lower_percentile=20, higher_percentile=99, minimum_minutes_in_day=60, no_features=11, epsilon=1e-08, input_feature_array_name='features')[source]¶ Parameters: - data –
- index_of_first_order_feature –
- lower_percentile –
- higher_percentile –
- minimum_minutes_in_day –
- no_features –
- epsilon –
- input_feature_array_name –
Returns:
-
CC_MProvAgg
(in_stream_name, op, out_stream_name, in_stream_key=['index'], out_stream_key=['index'], map=None, graph_name=None)[source]¶
-
update_metadata
(stream_metadata, stream_name, stream_desc, module_name, module_version, authors: list, input_stream_names: list = [], annotations: list = []) → cerebralcortex.core.metadata_manager.stream.metadata.Metadata[source]¶ Create Metadata object with some sample metadata of phone battery data :param stream_metadata: :param stream_name: :param stream_desc: :param module_name: :param module_version: :param authors: List of authors names and emails ids in dict. For example, authors = [{“ali”:”ali@gmail.com”}, {“nasir”:”nasir@gmail.com”}] :type authors: list[dict] :param input_stream_names: :param annotations:
Returns: metadata of phone battery stream Return type: Metadata
Module contents¶
cerebralcortex.core package¶
Subpackages¶
-
class
Configuration
(config_dir: str, cc_configs: dict = '', config_file_name: str = 'cerebralcortex.yml')[source]¶ Bases:
cerebralcortex.core.config_manager.config_handler.ConfigHandler
-
class
FileBasedStorage
[source]¶ Bases:
object
-
get_stream_versions
(stream_name: str) → list[source]¶ Returns a list of versions available for a stream
Parameters: stream_name (str) – name of a stream Returns: list of int Return type: list Raises: ValueError
– if stream_name is empty or NoneExamples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.get_stream_versions("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> [1, 2, 4]
-
is_stream
(stream_name: str) → bool[source]¶ Returns true if provided stream exists.
Parameters: stream_name (str) – name of a stream Returns: True if stream_name exist False otherwise Return type: bool Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.is_stream("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> True
-
is_study
() → bool[source]¶ Returns true if study_name exists.
Returns: True if study_name exist False otherwise Return type: bool Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.is_study() >>> True
-
list_streams
() → List[str][source]¶ Get all the available stream names
Returns: list of available streams names Return type: List[str] Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.list_streams()
-
list_users
(stream_name: str, version: int = 1) → List[str][source]¶ Get all the available stream names with metadata
stream_name (str): name of a stream version (int): version of a stream
Returns: list of available user-ids for a giving stream version Return type: List[str] Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.list_users()
-
read_file
(stream_name: str, version: str = 'latest', user_id: str = None) → object[source]¶ Get stream data from storage system. Data would be return as pyspark DataFrame object :param stream_name: name of a stream :type stream_name: str :param version: version of a stream. Acceptable parameters are all, latest, or a specific version of a stream (e.g., 2.0) (Default=”all”) :type version: str :param user_id: id of a user :type user_id: str
Note
Please specify a version if you know the exact version of a stream. Getting all the stream data and then filtering versions won’t be efficient.
Returns: pyspark DataFrame object Return type: object Raises: Exception
– if stream name does not exist.
-
search_stream
(stream_name) → List[str][source]¶ Find all the stream names similar to stream_name arg. For example, passing “location” argument will return all stream names that contain the word location
Returns: list of stream names similar to stream_name arg Return type: List[str] Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.search_stream("battery") >>> ["BATTERY--org.md2k.motionsense--MOTION_SENSE_HRV--LEFT_WRIST", "BATTERY--org.md2k.phonesensor--PHONE".....]
-
write_file
(stream_name: str, data: <property object at 0x7fa290788e08>, file_mode: str) → bool[source]¶ Write pyspark DataFrame to a file storage system
Parameters: - stream_name (str) – name of the stream
- data (object) – pyspark DataFrame object
- file_mode (str) – write mode, append is currently supportes
Returns: True if data is stored successfully or throws an Exception.
Return type: bool
Raises: Exception
– if DataFrame write operation fails
-
write_pandas_to_parquet_file
(df: <module 'pandas' from '/home/docs/checkouts/readthedocs.org/user_builds/cerebralcortex-kernel/envs/latest/lib/python3.6/site-packages/pandas/__init__.py'>, user_id: str, stream_name: str, stream_version: str) → str[source]¶ Convert pandas dataframe into pyarrow parquet format and store
Parameters: - df (pandas) – pandas dataframe
- user_id (str) – user id
- stream_name (str) – name of a stream
Returns: file_name of newly create parquet file
Return type: str
-
-
class
BlueprintStorage
(obj)[source]¶ Bases:
object
This is a sample reference class. If you want to add another storage layer then the class must have following methods in it. read_file() write_file()
-
get_stream_metadata_hash
(stream_name: str) → list[source]¶ Get all the metadata_hash associated with a stream name.
Parameters: stream_name (str) – name of a stream Returns: list of all the metadata hashes Return type: list[str] Examples
>>> CC.get_stream_metadata_hash("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> ["00ab666c-afb8-476e-9872-6472b4e66b68", "15cc444c-dfb8-676e-3872-8472b4e66b12"]
-
get_stream_name
(metadata_hash: <module 'uuid' from '/home/docs/.pyenv/versions/3.6.8/lib/python3.6/uuid.py'>) → str[source]¶ metadata_hash are unique to each stream version. This reverse look can return the stream name of a metadata_hash.
Parameters: metadata_hash (uuid) – This could be an actual uuid object or a string form of uuid. Returns: name of a stream Return type: str Examples
>>> CC.get_stream_name("00ab666c-afb8-476e-9872-6472b4e66b68") >>> ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST
-
get_stream_versions
(stream_name: str) → list[source]¶ Returns a list of versions available for a stream
Parameters: stream_name (str) – name of a stream Returns: list of int Return type: list Raises: ValueError
– if stream_name is empty or NoneExamples
>>> CC.get_stream_versions("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> [1, 2, 4]
-
is_stream
(stream_name: str) → bool[source]¶ Returns true if provided stream exists.
Parameters: stream_name (str) – name of a stream Returns: True if stream_name exist False otherwise Return type: bool Examples
>>> CC.is_stream("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> True
-
list_streams
() → List[str][source]¶ Get all the available stream names with metadata
Returns: list of available streams metadata Return type: List[str] Examples
>>> CC = Kernel("/directory/path/of/configs/") >>> CC.list_streams()
-
read_file
(stream_name: str, version: str = 'all') → object[source]¶ Get stream data from storage system. Data would be return as pyspark DataFrame object :param stream_name: name of a stream :type stream_name: str :param version: version of a stream. Acceptable parameters are all, latest, or a specific version of a stream (e.g., 2.0) (Default=”all”) :type version: str
Returns: pyspark DataFrame object Return type: object Raises: Exception
– if stream name does not exist.
-
search_stream
(stream_name)[source]¶ Find all the stream names similar to stream_name arg. For example, passing “location” argument will return all stream names that contain the word location
Returns: list of stream names similar to stream_name arg Return type: List[str] Examples
>>> CC.search_stream("battery") >>> ["BATTERY--org.md2k.motionsense--MOTION_SENSE_HRV--LEFT_WRIST", "BATTERY--org.md2k.phonesensor--PHONE".....]
-
write_file
(stream_name: str, data: cerebralcortex.core.datatypes.datastream.DataStream) → bool[source]¶ Write pyspark DataFrame to a data storage system :param stream_name: name of the stream :type stream_name: str :param data: pyspark DataFrame object :type data: object
Returns: True if data is stored successfully or throws an Exception. Return type: bool Raises: Exception
– if DataFrame write operation fails
-
-
class
DataSet
[source]¶ Bases:
enum.Enum
An enumeration.
-
COMPLETE
= (1,)¶
-
ONLY_DATA
= (2,)¶
-
ONLY_METADATA
= 3¶
-
-
class
StreamHandler
[source]¶ Bases:
object
-
get_stream
(stream_name: str, version: str = 'latest', user_id: str = None, data_type=<DataSet.COMPLETE: (1, )>) → cerebralcortex.core.datatypes.datastream.DataStream[source]¶ Retrieve a data-stream with it’s metadata.
Parameters: - stream_name (str) – name of a stream
- version (str) – version of a stream. Acceptable parameters are latest, or a specific version of a stream (e.g., 2)
- user_id (str) – id of a user
- data_type (DataSet) – DataSet.COMPLETE returns both Data and Metadata. DataSet.ONLY_DATA returns only Data. DataSet.ONLY_METADATA returns only metadata of a stream. (Default=DataSet.COMPLETE)
Returns: contains Data and/or metadata
Return type: Raises: ValueError
– if stream name is empty or NoneNote
Please specify a version if you know the exact version of a stream. Getting all the stream data and then filtering versions won’t be efficient.
Examples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> ds = CC.get_stream("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> ds.data # an object of a dataframe >>> ds.metadata # an object of MetaData class >>> ds.get_metadata(version=1) # get the specific version metadata of a stream
-
save_stream
(datastream, overwrite=False) → bool[source]¶ Saves datastream raw data in selected NoSQL storage and metadata in MySQL.
Parameters: - datastream (DataStream) – a DataStream object
- overwrite (bool) – if set to true, whole existing datastream data will be overwritten by new data
Returns: True if stream is successfully stored or throws an exception
Return type: bool
Todo
Add functionality to store data in influxdb.
Raises: Exception
– log or throws exception if stream is not storedExamples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> ds = DataStream(dataframe, MetaData) >>> CC.save_stream(ds)
-
-
class
filesystem_helper
[source]¶ Bases:
object
-
create_dir
(dirpath: str, stream_name: str = None, version: int = None, user_id: str = None)[source]¶ Creates a directory if it does not exist.
Parameters: - dirpath (str) – base storage dir path
- stream_name (str) – name of a stream
- version (int) – version number of stream data
- user_id (str) – uuid of a user
-
ls_dir
(dirpath: str, stream_name: str = None, version: int = None, user_id: str = None)[source]¶ List the contents of a directory
Parameters: - dirpath (str) – base storage dir path
- stream_name (str) – name of a stream
- version (int) – version number of stream data
- user_id (str) – uuid of a user
Returns: list of file and/or dir names
Return type: list[str]
-
path_exist
(dirpath: str, stream_name: str = None, version: int = None, user_id: str = None)[source]¶ Checks if a path exist
Parameters: - dirpath (str) – base storage dir path
- stream_name (str) – name of a stream
- version (int) – version number of stream data
- user_id (str) – uuid of a user
Returns: true if path exist, false otherwise
Return type: bool
-
-
class
hdfs_helper
[source]¶ Bases:
object
-
create_dir
(dirpath: str, stream_name: str = None, version: int = None, user_id: str = None)[source]¶ Creates a directory if it does not exist.
Parameters: - dirpath (str) – base storage dir path
- stream_name (str) – name of a stream
- version (int) – version number of stream data
- user_id (str) – uuid of a user
-
hdfs_conn
= ''¶
-
ls_dir
(dirpath: str, stream_name: str = None, version: int = None, user_id: str = None)[source]¶ List the contents of a directory
Parameters: - dirpath (str) – base storage dir path
- stream_name (str) – name of a stream
- version (int) – version number of stream data
- user_id (str) – uuid of a user
Returns: list of file and/or dir names
Return type: list[str]
-
path_exist
(dirpath: str, stream_name: str = None, version: int = None, user_id: str = None)[source]¶ Checks if a path exist
Parameters: - dirpath (str) – base storage dir path
- stream_name (str) – name of a stream
- version (int) – version number of stream data
- user_id (str) – uuid of a user
Returns: true if path exist, false otherwise
Return type: bool
-
-
class
Stream
(name, version, study_name, metadata_hash, stream_metadata)[source]¶ Bases:
sqlalchemy.ext.declarative.api.Base
-
creation_date
¶
-
metadata_hash
¶
-
name
¶
-
row_id
¶
-
stream_metadata
¶
-
study_name
¶
-
version
¶
-
-
class
User
(user_id, username, password, study_name, token, token_issued, token_expiry, user_role='participant', user_metadata={}, user_settings={}, active=1)[source]¶ Bases:
sqlalchemy.ext.declarative.api.Base
-
active
¶
-
creation_date
¶
-
has_data
¶
-
password
¶
-
row_id
¶
-
study_name
¶
-
token
¶
-
token_expiry
¶
-
token_issued
¶
-
user_id
¶
-
user_metadata
¶
-
user_role
¶
-
user_settings
¶
-
username
¶
-
-
class
StreamHandler
[source]¶ Bases:
object
-
get_stream_metadata_by_hash
(metadata_hash: <module 'uuid' from '/home/docs/.pyenv/versions/3.6.8/lib/python3.6/uuid.py'>) → List[source]¶ metadata_hash are unique to each stream version. This reverse look can return the stream name of a metadata_hash.
Parameters: metadata_hash (uuid) – This could be an actual uuid object or a string form of uuid. Returns: [stream_name, metadata] Return type: List Examples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.get_stream_name("00ab666c-afb8-476e-9872-6472b4e66b68") >>> ["name" .....] # stream metadata and other information
-
get_stream_metadata_by_name
(stream_name: str, version: int) → cerebralcortex.core.metadata_manager.stream.metadata.Metadata[source]¶ Get a list of metadata for all versions available for a stream.
Parameters: - stream_name (str) – name of a stream
- version (int) – version of a stream. Acceptable parameters are all, latest, or a specific version of a stream (e.g., 2.0) (Default=”all”)
Returns: Returns an empty list if no metadata is available for a stream_name or a list of metadata otherwise.
Return type: Raises: ValueError
– stream_name cannot be None or empty.Examples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.list_users("mperf") >>> [Metadata] # list of MetaData class objects
-
get_stream_metadata_hash
(stream_name: str) → List[source]¶ Get all the metadata_hash associated with a stream name.
Parameters: stream_name (str) – name of a stream Returns: list of all the metadata hashes with name and versions Return type: list Examples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.get_metadata_hash("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> [["stream_name", "version", "metadata_hash"]]
-
get_stream_name
(metadata_hash: <module 'uuid' from '/home/docs/.pyenv/versions/3.6.8/lib/python3.6/uuid.py'>) → str[source]¶ metadata_hash are unique to each stream version. This reverse look can return the stream name of a metadata_hash.
Parameters: metadata_hash (uuid) – This could be an actual uuid object or a string form of uuid. Returns: name of a stream Return type: str Examples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.get_stream_name("00ab666c-afb8-476e-9872-6472b4e66b68") >>> ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST
-
get_stream_versions
(stream_name: str) → list[source]¶ Returns a list of versions available for a stream
Parameters: stream_name (str) – name of a stream Returns: list of int Return type: list Raises: ValueError
– if stream_name is empty or NoneExamples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.get_stream_versions("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> [1, 2, 4]
-
is_stream
(stream_name: str) → bool[source]¶ Returns true if provided stream exists.
Parameters: stream_name (str) – name of a stream Returns: True if stream_name exist False otherwise Return type: bool Examples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.is_stream("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> True
-
list_streams
() → List[cerebralcortex.core.metadata_manager.stream.metadata.Metadata][source]¶ Get all the available stream names with metadata
Returns: list of available streams metadata [{name:”“, metadata:”“}…] Return type: List[Metadata] Examples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.list_streams()
-
save_stream_metadata
(metadata_obj) → dict[source]¶ Update a record if stream already exists or insert a new record otherwise.
Parameters: metadata_obj (Metadata) – stream metadata Returns: {“status”: True/False,”verion”:version} Return type: dict Raises: Exception
– if fail to insert/update record in MySQL. Exceptions are logged in a log file
-
search_stream
(stream_name)[source]¶ Find all the stream names similar to stream_name arg. For example, passing “location” argument will return all stream names that contain the word location
Returns: list of stream names similar to stream_name arg Return type: List[str] Examples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.search_stream("battery") >>> ["BATTERY--org.md2k.motionsense--MOTION_SENSE_HRV--LEFT_WRIST", "BATTERY--org.md2k.phonesensor--PHONE".....]
-
-
class
UserHandler
[source]¶ Bases:
object
-
create_user
(username: str, user_password: str, user_role: str, user_metadata: dict, user_settings: dict, encrypt_password: bool = False) → bool[source]¶ Create a user in SQL storage if it doesn’t exist :param username: Only alphanumeric usernames are allowed with the max length of 25 chars. :type username: str :param user_password: no size limit on password :type user_password: str :param user_role: role of a user :type user_role: str :param user_metadata: metadata of a user :type user_metadata: dict :param user_settings: user settings, mCerebrum configurations of a user :type user_settings: dict :param encrypt_password: encrypt password if set to True :type encrypt_password: bool
Returns: True if user is successfully registered or throws any error in case of failure
Return type: bool
Raises: ValueError
– if selected username is not availableException
– if sql query fails
-
encrypt_user_password
(user_password: str) → str[source]¶ Encrypt password
Parameters: user_password (str) – unencrypted password Raises: ValueError
– password cannot be None or empty.Returns: encrypted password Return type: str
-
gen_random_pass
(string_type: str, size: int = 8) → str[source]¶ Generate a random password
Parameters: - string_type – Accepted parameters are “varchar” and “char”. (Default=”varchar”)
- size – password length (default=8)
Returns: random password
Return type: str
-
get_user_id
(user_name: str) → str[source]¶ Get the user id linked to user_name.
Parameters: user_name (str) – username of a user Returns: user id associated to user_name Return type: str Raises: ValueError
– User name is a required field.Examples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.get_user_id("nasir_ali") >>> '76cc444c-4fb8-776e-2872-9472b4e66b16'
-
get_user_metadata
(user_id: <module 'uuid' from '/home/docs/.pyenv/versions/3.6.8/lib/python3.6/uuid.py'> = None, username: str = None) → dict[source]¶ Get user metadata by user_id or by username
Parameters: - user_id (str) – id (uuid) of a user
- user_name (str) – username of a user
Returns: user metadata
Return type: dict
Todo
Return list of User class object
Raises: ValueError
– User ID/name cannot be empty.Examples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.get_user_metadata(username="nasir_ali") >>> {"study_name":"mperf"........}
-
get_user_settings
(username: str = None, auth_token: str = None) → dict[source]¶ Get user settings by auth-token or by username. These are user’s mCerebrum settings
Parameters: - username (str) – username of a user
- auth_token (str) – auth-token
Returns: List of dictionaries of user metadata
Return type: list[dict]
Todo
Return list of User class object
Raises: ValueError
– User ID/name cannot be empty.Examples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.get_user_settings(username="nasir_ali") >>> [{"mcerebrum":"some-conf"........}]
-
get_username
(user_id: str) → str[source]¶ Get the user name linked to a user id.
Parameters: user_name (str) – username of a user Returns: user_id associated to username Return type: bool Raises: ValueError
– User ID is a required field.Examples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.get_username("76cc444c-4fb8-776e-2872-9472b4e66b16") >>> 'nasir_ali'
-
is_auth_token_valid
(username: str, auth_token: str, checktime: bool = False) → bool[source]¶ Validate whether a token is valid or expired based on the token expiry datetime stored in SQL
Parameters: - username (str) – username of a user
- auth_token (str) – token generated by API-Server
- checktime (bool) – setting this to False will only check if the token is available in system. Setting this to true will check if the token is expired based on the token expiry date.
Raises: ValueError
– Auth token and auth-token expiry time cannot be null/empty.Returns: returns True if token is valid or False otherwise.
Return type: bool
-
is_user
(user_id: <module 'uuid' from '/home/docs/.pyenv/versions/3.6.8/lib/python3.6/uuid.py'> = None, user_name: <module 'uuid' from '/home/docs/.pyenv/versions/3.6.8/lib/python3.6/uuid.py'> = None) → bool[source]¶ Checks whether a user exists in the system. One of both parameters could be set to verify whether user exist.
Parameters: - user_id (str) – id (uuid) of a user
- user_name (str) – username of a user
Returns: True if a user exists in the system or False otherwise.
Return type: bool
Raises: ValueError
– Both user_id and user_name cannot be None or empty.Examples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.is_user(user_id="76cc444c-4fb8-776e-2872-9472b4e66b16") >>> True
-
list_users
() → List[list][source]¶ Get a list of all users part of a study.
Parameters: study_name (str) – name of a study. If no study_name is provided then all users’ list will be returned Raises: ValueError
– Study name is a requied field.Returns: Returns empty list if there is no user associated to the study_name and/or study_name does not exist. Return type: list[list] Examples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.list_users("mperf") >>> [{"76cc444c-4fb8-776e-2872-9472b4e66b16": "nasir_ali"}] # [{user_id, user_name}]
-
login_user
(username: str, password: str, encrypt_password: bool = False) → dict[source]¶ Authenticate a user based on username and password and return an auth token
Parameters: - username (str) – username of a user
- password (str) – password of a user
- encrypt_password (str) – is password encrypted or not. mCerebrum sends encrypted passwords
Raises: ValueError
– User name and password cannot be empty/None.Returns: return eturn {“status”:bool, “auth_token”: str, “msg”: str}
Return type: dict
Examples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> CC.connect("nasir_ali", "2ksdfhoi2r2ljndf823hlkf8234hohwef0234hlkjwer98u234", True) >>> True
-
update_auth_token
(username: str, auth_token: str, auth_token_issued_time: datetime.datetime, auth_token_expiry_time: datetime.datetime) → bool[source]¶ Update an auth token in SQL database to keep user stay logged in. Auth token valid duration can be changed in configuration files.
Parameters: - username (str) – username of a user
- auth_token (str) – issued new auth token
- auth_token_issued_time (datetime) – datetime when the old auth token was issue
- auth_token_expiry_time (datetime) – datetime when the token will get expired
Raises: ValueError
– Auth token and auth-token issue/expiry time cannot be None/empty.Returns: Returns True if the new auth token is set or False otherwise.
Return type: bool
-
username_checks
(username: str)[source]¶ No space, special characters, dash etc. are allowed in username. Only alphanumeric usernames are allowed with the max length of 25 chars.
Parameters: username (str) – Returns: True if provided username comply the standard or throw an exception Return type: bool Raises: Exception
– if username doesn’t follow standards
-
-
class
TimeSeriesData
(CC)[source]¶ Bases:
cerebralcortex.core.data_manager.time_series.influxdb_handler.InfluxdbHandler
-
class
InfluxdbHandler
[source]¶ Bases:
object
-
save_data_to_influxdb
(datastream: cerebralcortex.core.datatypes.datastream.DataStream)[source]¶ Save data stream to influxdb only for visualization purposes.
Parameters: datastream (DataStream) – a DataStream object Returns: True if data is ingested successfully or False otherwise Return type: bool Todo
This needs to be updated with the new structure. Should metadata be stored or not?
Example
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> ds = DataStream(dataframe, MetaData) >>> CC.save_data_to_influxdb(ds)
-
write_pd_to_influxdb
(user_id: str, username: str, stream_name: str, df: pandas.core.frame.DataFrame)[source]¶ Store data in influxdb. Influxdb is used for visualization purposes
Parameters: - user_id (str) – id of a user
- username (str) – username
- stream_name (str) – name of a stream
- df (pandas) – pandas dataframe
Raises: Exception
– if error occurs during storing data to influxdb
-
-
class
DataStream
(data: object = None, metadata: cerebralcortex.core.metadata_manager.stream.metadata.Metadata = None)[source]¶ Bases:
pyspark.sql.dataframe.DataFrame
-
agg
(*exprs)[source]¶ Aggregate on the entire DataStream without groups
Parameters: *exprs – Returns: this will return a new datastream object with blank metadata Return type: DataStream Examples
>>> ds.agg({"age": "max"}).collect() >>> # Below example shows how to use pyspark functions in add method >>> from pyspark.sql import functions as F >>> ds.agg(F.min(ds.age)).collect()
-
alias
(alias)[source]¶ Returns a new DataStream with an alias set.
Parameters: alias – string, an alias name to be set for the datastream. Returns: DataStream object Return type: object Examples
>>> df_as1 = df.alias("df_as1") >>> df_as2 = df.alias("df_as2")
-
approxQuantile
(col, probabilities, relativeError)[source]¶ Calculates the approximate quantiles of numerical columns of a DataStream.
The result of this algorithm has the following deterministic bound: If the DataStream has N elements and if we request the quantile at probability p up to error err, then the algorithm will return a sample x from the DataStream so that the exact rank of x is close to (p * N). More precisely,
floor((p - err) * N) <= rank(x) <= ceil((p + err) * N).
This method implements a variation of the Greenwald-Khanna algorithm (with some speed optimizations). The algorithm was first present in [[http://dx.doi.org/10.1145/375663.375670 Space-efficient Online Computation of Quantile Summaries]] by Greenwald and Khanna.
Note that null values will be ignored in numerical columns before calculation. For columns only containing null values, an empty list is returned.
Parameters: - col (str[list]) – Can be a single column name, or a list of names for multiple columns.
- probabilities – a list of quantile probabilities Each number must belong to [0, 1]. For example 0 is the minimum, 0.5 is the median, 1 is the maximum.
- relativeError – The relative target precision to achieve (>= 0). If set to zero, the exact quantiles are computed, which could be very expensive. Note that values greater than 1 are accepted but give the same result as 1.
Returns: the approximate quantiles at the given probabilities. If the input col is a string, the output is a list of floats. If the input col is a list or tuple of strings, the output is also a list, but each element in it is a list of floats, i.e., the output is a list of list of floats.
-
colRegex
(colName)[source]¶ Selects column based on the column name specified as a regex and returns it as Column.
Parameters: colName (str) – column name specified as a regex. Returns: Return type: DataStream Examples
>>> ds.colRegex("colName")
-
collect
()[source]¶ Collect all the data to master node and return list of rows
Returns: rows of all the dataframe Return type: List Examples
>>> ds.collect()
-
compute
(udfName, windowDuration: int = None, slideDuration: int = None, groupByColumnName: List[str] = [], startTime=None)[source]¶ Run an algorithm. This method supports running an udf method on windowed data
Parameters: - udfName – Name of the algorithm
- windowDuration (int) – duration of a window in seconds
- slideDuration (int) – slide duration of a window
- List[str] (groupByColumnName) – groupby column names, for example, groupby user, col1, col2
- startTime (datetime) – The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15… provide startTime as 15 minutes. First time of data will be used as startTime if none is provided
Returns: this will return a new datastream object with blank metadata
Return type:
-
corr
(col1, col2, method=None)[source]¶ Calculates the correlation of two columns of a DataStream as a double value. Currently only supports the Pearson Correlation Coefficient.
Parameters: - col1 (str) – The name of the first column
- col2 (str) – The name of the second column
- method (str) – The correlation method. Currently only supports “pearson”
Returns: this will return a new datastream object with blank metadata
Return type: Examples
>>> ds.corr("cal1", "col2", "pearson").collect()
-
cov
(col1, col2)[source]¶ Calculate the sample covariance for the given columns, specified by their names, as a double value.
Parameters: - col1 (str) – The name of the first column
- col2 (str) – The name of the second column
Returns: this will return a new datastream object with blank metadata
Return type: Examples
>>> ds.cov("cal1", "col2", "pearson").collect()
-
create_windows
(window_length='hour')[source]¶ filter data
Parameters: - columnName (str) – name of the column
- operator (str) – basic operators (e.g., >, <, ==, !=)
- value (Any) – if the columnName is timestamp, please provide python datatime object
Returns: this will return a new datastream object with blank metadata
Return type:
-
crossJoin
(other)[source]¶ Returns the cartesian product with another DataStream
Parameters: other – Right side of the cartesian product. Returns: DataStream object with joined streams Examples
>>> ds.crossJoin(ds2.select("col_name")).collect()
-
crosstab
(col1, col2)[source]¶ Computes a pair-wise frequency table of the given columns. Also known as a contingency table. The number of distinct values for each column should be less than 1e4. At most 1e6 non-zero pair frequencies will be returned. The first column of each row will be the distinct values of col1 and the column names will be the distinct values of col2. The name of the first column will be $col1_$col2. Pairs that have no occurrences will have zero as their counts.
Parameters: - col1 (str) – The name of the first column. Distinct items will make the first item of each row.
- col2 (str) – The name of the second column. Distinct items will make the column names of the DataStream.
Returns: DataStream object
Examples
>>> ds.crosstab("col_1", "col_2")
-
data
¶ get stream data
Returns (DataFrame):
-
describe
(*cols)[source]¶ Computes basic statistics for numeric and string columns. This include count, mean, stddev, min, and max. If no columns are given, this function computes statistics for all numerical or string columns.
Parameters: *cols – Examples
>>> ds.describe(['col_name']).show() >>> ds.describe().show()
-
distinct
()[source]¶ Returns a new DataStream containing the distinct rows in this DataStream.
Returns: this will return a new datastream object with blank metadata Return type: DataStream Examples
>>> ds.distinct().count()
-
drop
(*cols)[source]¶ Returns a new Datastream that drops the specified column. This is a no-op if schema doesn’t contain the given column name(s).
Parameters: *cols – a string name of the column to drop, or a Column to drop, or a list of string name of the columns to drop. Returns: Return type: Datastream Examples
>>> ds.drop('col_name')
-
dropDuplicates
(subset=None)[source]¶ Return a new DataStream with duplicate rows removed, optionally only considering certain columns.
Parameters: subset – optional list of column names to consider. Returns: Return type: Datastream Examples
>>> ds.dropDuplicates().show() >>> # Example on how to use it with params >>> ds.dropDuplicates(['col_name1', 'col_name2']).show()
-
dropna
(how='any', thresh=None, subset=None)[source]¶ Returns a new DataStream omitting rows with null values.
Parameters: - how – ‘any’ or ‘all’. If ‘any’, drop a row if it contains any nulls. If ‘all’, drop a row only if all its values are null.
- thresh – int, default None If specified, drop rows that have less than thresh non-null values. This overwrites the how parameter.
- subset – optional list of column names to consider.
Returns: Return type: Datastream
Examples
>>> ds.dropna()
-
exceptAll
(other)[source]¶ Return a new DataStream containing rows in this DataStream but not in another DataStream while preserving duplicates.
Parameters: other – other DataStream object Returns: Return type: Datastream Examples
>>> ds1.exceptAll(ds2).show()
-
explain
(extended=False)[source]¶ Prints the (logical and physical) plans to the console for debugging purpose.
Parameters: extended – boolean, default False. If False, prints only the physical plan. Examples
>>> ds.explain()
-
fillna
(value, subset=None)[source]¶ Replace null values
Parameters: - value – int, long, float, string, bool or dict. Value to replace null values with. If the value is a dict, then subset is ignored and value must be a mapping from column name (string) to replacement value. The replacement value must be an int, long, float, boolean, or string.
- subset – optional list of column names to consider. Columns specified in subset that do not have matching data type are ignored. For example, if value is a string, and subset contains a non-string column, then the non-string column is simply ignored.
Returns: Return type: Datastream
Examples
>>> ds.fill(50).show() >>> ds.fill({'col1': 50, 'col2': 'unknown'}).show()
-
filter
(condition)[source]¶ Filters rows using the given condition
Parameters: condition – a Column of types.BooleanType or a string of SQL expression. Returns: this will return a new datastream object with blank metadata Return type: DataStream Examples
>>> ds.filter("age > 3") >>> df.filter(df.age > 3)
-
filter_user
(user_ids: List)[source]¶ filter data to get only selective users’ data
Parameters: user_ids (List[str]) – list of users’ UUIDs Returns: this will return a new datastream object with blank metadata Return type: DataStream
-
filter_version
(version: List)[source]¶ filter data to get only selective users’ data
Parameters: version (List[str]) – list of stream versions Returns: this will return a new datastream object with blank metadata Return type: DataStream Todo
Metadata version should be return with the data
-
first
()[source]¶ Returns the first row as a Row.
Returns: First row of a DataStream Examples
>>> ds.first()
-
foreach
(f)[source]¶ Applies the f function to all Row of DataStream. This is a shorthand for df.rdd.foreach()
Parameters: f – function Returns: DataStream object Examples
>>> def f(person): ... print(person.name) >>> ds.foreach(f)
-
freqItems
(cols, support=None)[source]¶ Finding frequent items for columns, possibly with false positives. Using the frequent element count algorithm described in “http://dx.doi.org/10.1145/762471.762473, proposed by Karp, Schenker, and Papadimitriou”.
Returns: Return type: DataStream Examples
>>> ds.freqItems("col-name")
-
get_metadata
() → cerebralcortex.core.metadata_manager.stream.metadata.Metadata[source]¶ get stream metadata
Returns: single version of a stream Return type: Metadata Raises: Exception
– if specified version is not available for the stream
-
groupby
(*cols)[source]¶ Groups the DataFrame using the specified columns, so we can run aggregation on them. This method will return pyspark.sql.GroupedData object.
Parameters: of columns to group by. Each element should be a column name (list) – Returns:
-
head
(n=None)[source]¶ Returns the first n rows.
Parameters: n (int) – default 1. Number of rows to return. Returns: If n is greater than 1, return a list of Row. If n is 1, return a single Row. Notes
This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory.
Examples
>>> ds.head(5)
-
intersect
(other)[source]¶ Return a new DataFrame containing rows only in both this frame and another frame. This is equivalent to INTERSECT in SQL.
Parameters: other (int) – DataStream object Returns: If n is greater than 1, return a list of Row. If n is 1, return a single Row. Examples
>>> ds.intersect(other=ds2)
-
intersectAll
(other)[source]¶ Return a new DataFrame containing rows in both this dataframe and other dataframe while preserving duplicates.
Parameters: other (int) – DataStream object Returns: If n is greater than 1, return a list of Row. If n is 1, return a single Row. Examples
>>> ds.intersectAll(ds2).show()
-
join
(other, on=None, how=None)[source]¶ Joins with another DataStream, using the given join expression.
Parameters: - other (DataStream) – Right side of the join
- – a string for the join column name, a list of column names, a join expression (on) –
- how (str) – inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, and left_anti.
Examples
>>> ds.join(ds2, 'user', 'outer').show()
Returns: DataStream object with joined streams
-
join_stress_streams
(dataStream, propagation='forward')[source]¶ filter data
Parameters: - columnName (str) – name of the column
- operator (str) – basic operators (e.g., >, <, ==, !=)
- value (Any) – if the columnName is timestamp, please provide python datatime object
Returns: this will return a new datastream object with blank metadata
Return type:
-
limit
(num)[source]¶ Limits the result count to the number specified.
Parameters: num – Returns: Return type: Datastream
-
map_stream
(window_ds)[source]¶ Map/join a stream to a windowed stream
Parameters: window_ds (Datastream) – windowed datastream object Returns: joined/mapped stream Return type: Datastream
-
repartition
(numPartitions, *cols)[source]¶ Returns a new DataStream partitioned by the given partitioning expressions. The resulting DataStream is hash partitioned.
numPartitions can be an int to specify the target number of partitions or a Column. If it is a Column, it will be used as the first partitioning column. If not specified, the default number of partitions is used.
Parameters: - numPartitions –
- *cols –
Returns:
-
replace
(to_replace, value, subset=None)[source]¶ Returns a new DataStream replacing a value with another value. Values to_replace and value must have the same type and can only be numerics, booleans, or strings. Value can have None. When replacing, the new value will be cast to the type of the existing column. For numeric replacements all values to be replaced should have unique floating point representation. In case of conflicts (for example with {42: -1, 42.0: 1}) and arbitrary replacement will be used.
Parameters: - to_replace – bool, int, long, float, string, list or dict. Value to be replaced. If the value is a dict, then value is ignored or can be omitted, and to_replace must be a mapping between a value and a replacement.
- value – bool, int, long, float, string, list or None. The replacement value must be a bool, int, long, float, string or None. If value is a list, value should be of the same length and type as to_replace. If value is a scalar and to_replace is a sequence, then value is used as a replacement for each item in to_replace.
- subset – optional list of column names to consider. Columns specified in subset that do not have matching data type are ignored. For example, if value is a string, and subset contains a non-string column, then the non-string column is simply ignored.
Returns: Return type: Datastream
Examples
>>> ds.replace(10, 20).show() >>> ds.replace('some-str', None).show() >>> ds.replace(['old_val1', 'new_val1'], ['old_val2', 'new_val2'], 'col_name').show()
-
select
(*cols)[source]¶ Projects a set of expressions and returns a new DataStream :param cols: list of column names (string) or expressions (Column). If one of the column names is ‘*’, that column is expanded to include all columns in the current DataStream :type cols: str
Returns: this will return a new datastream object with selected columns Return type: DataStream Examples
>>> ds.select('*') >>> ds.select('name', 'age') >>> ds.select(ds.name, (ds.age + 10).alias('age'))
-
selectExpr
(*expr)[source]¶ This is a variant of select() that accepts SQL expressions. Projects a set of expressions and returns a new DataStream
Parameters: expr (str) – Returns: this will return a new datastream object with selected columns Return type: DataStream Examples
>>> ds.selectExpr("age * 2")
-
show
(n=20, truncate=True, vertical=False)[source]¶ Parameters: - n – Number of rows to show.
- truncate – If set to
True
, truncate strings longer than 20 chars by default. - set to a number greater than one, truncates long strings to length truncate (If) –
- align cells right. (and) –
- vertical – If set to
True
, print output rows vertically (one line - column value) (per) –
Returns:
-
sort
(*cols, **kwargs)[source]¶ Returns a new DataStream sorted by the specified column(s).
Parameters: - cols – list of Column or column names to sort by.
- ascending – boolean or list of boolean (default True). Sort ascending vs. descending. Specify list for multiple sort orders. If a list is specified, length of the list must equal length of the cols.
Returns: DataStream object
Return type: object
Examples
>>> ds.sort("col_name", ascending=False)
-
summary
(*statistics)[source]¶ Computes specified statistics for numeric and string columns. Available statistics are: - count - mean - stddev - min - max - arbitrary approximate percentiles specified as a percentage (eg, 75%) If no statistics are given, this function computes count, mean, stddev, min, approximate quartiles (percentiles at 25%, 50%, and 75%), and max.
Parameters: *statistics – Examples
>>> ds.summary().show() >>> ds.summary("count", "min", "25%", "75%", "max").show() >>> # To do a summary for specific columns first select them: >>> ds.select("col1", "col2").summary("count").show()
-
take
(num)[source]¶ Returns the first num rows as a list of Row.
Returns: row(s) of a DataStream Return type: Row(list) Examples
>>> ds.take()
-
toPandas
()[source]¶ This method converts pyspark dataframe into pandas dataframe.
Notes
This method will collect all the data on master node to convert pyspark dataframe into pandas dataframe. After converting to pandas dataframe datastream objects helper methods will not be accessible.
Returns: this will return a new datastream object with blank metadata Return type: Datastream (Metadata, pandas.DataFrame) Examples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> ds = CC.get_stream("STREAM-NAME") >>> new_ds = ds.toPandas() >>> new_ds.data.head()
-
union
(other)[source]¶ Return a new Datastream containing union of rows in this and another frame.
This is equivalent to UNION ALL in SQL. To do a SQL-style set union (that does deduplication of elements), use this function followed by distinct().
Also as standard in SQL, this function resolves columns by position (not by name).
Parameters: other (DataStream) – Returns: Return type: Datastream Examples
>>> ds.union(ds2).collect()
-
unionByName
(other)[source]¶ Returns a new Datastream containing union of rows in this and another frame.
This is different from both UNION ALL and UNION DISTINCT in SQL. To do a SQL-style set union (that does deduplication of elements), use this function followed by distinct().
The difference between this function and union() is that this function resolves columns by name (not by position):
Parameters: other (DataStream) – Returns: Return type: Datastream Examples
>>> ds.unionByName(ds2).show()
-
where
(condition)[source]¶ where() is an alias for filter().
Parameters: condition – Returns: Return type: Datastream Examples
>>> ds.filter("age > 3").collect()
-
window
(windowDuration: int = None, groupByColumnName: List[str] = [], slideDuration: int = None, startTime=None, preserve_ts=False)[source]¶ Window data into fixed length chunks. If no columnName is provided then the windowing will be performed on all the columns.
Parameters: - windowDuration (int) – duration of a window in seconds
- List[str] (groupByColumnName) – groupby column names, for example, groupby user, col1, col2
- slideDuration (int) – slide duration of a window
- startTime (datetime) – The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15… provide startTime as 15 minutes. First time of data will be used as startTime if none is provided
- preserve_ts (bool) – setting this to True will return timestamps of corresponding to each windowed value
Returns: this will return a new datastream object with blank metadata
Return type: Note
This windowing method will use collect_list to return values for each window. collect_list is not optimized.
-
withColumn
(colName, col)[source]¶ Returns a new DataStream by adding a column or replacing the existing column that has the same name. The column expression must be an expression over this DataStream; attempting to add a column from some other datastream will raise an error. :param colName: name of the new column. :type colName: str :param col: a Column expression for the new column.
Examples
>>> ds.withColumn('col_name', ds.col_name + 2)
-
withColumnRenamed
(existing, new)[source]¶ Returns a new DataStream by renaming an existing column. This is a no-op if schema doesn’t contain the given column name.
Parameters: - existing (str) – string, name of the existing column to rename.
- new (str) – string, new name of the column.
Examples
>>> ds.withColumnRenamed('col_name', 'new_col_name')
Returns: DataStream object with new column name(s)
-
-
windowing_udf
(x)¶
-
class
DataStream
(data: object = None, metadata: cerebralcortex.core.metadata_manager.stream.metadata.Metadata = None)[source]¶ Bases:
pyspark.sql.dataframe.DataFrame
-
agg
(*exprs)[source]¶ Aggregate on the entire DataStream without groups
Parameters: *exprs – Returns: this will return a new datastream object with blank metadata Return type: DataStream Examples
>>> ds.agg({"age": "max"}).collect() >>> # Below example shows how to use pyspark functions in add method >>> from pyspark.sql import functions as F >>> ds.agg(F.min(ds.age)).collect()
-
alias
(alias)[source]¶ Returns a new DataStream with an alias set.
Parameters: alias – string, an alias name to be set for the datastream. Returns: DataStream object Return type: object Examples
>>> df_as1 = df.alias("df_as1") >>> df_as2 = df.alias("df_as2")
-
approxQuantile
(col, probabilities, relativeError)[source]¶ Calculates the approximate quantiles of numerical columns of a DataStream.
The result of this algorithm has the following deterministic bound: If the DataStream has N elements and if we request the quantile at probability p up to error err, then the algorithm will return a sample x from the DataStream so that the exact rank of x is close to (p * N). More precisely,
floor((p - err) * N) <= rank(x) <= ceil((p + err) * N).
This method implements a variation of the Greenwald-Khanna algorithm (with some speed optimizations). The algorithm was first present in [[http://dx.doi.org/10.1145/375663.375670 Space-efficient Online Computation of Quantile Summaries]] by Greenwald and Khanna.
Note that null values will be ignored in numerical columns before calculation. For columns only containing null values, an empty list is returned.
Parameters: - col (str[list]) – Can be a single column name, or a list of names for multiple columns.
- probabilities – a list of quantile probabilities Each number must belong to [0, 1]. For example 0 is the minimum, 0.5 is the median, 1 is the maximum.
- relativeError – The relative target precision to achieve (>= 0). If set to zero, the exact quantiles are computed, which could be very expensive. Note that values greater than 1 are accepted but give the same result as 1.
Returns: the approximate quantiles at the given probabilities. If the input col is a string, the output is a list of floats. If the input col is a list or tuple of strings, the output is also a list, but each element in it is a list of floats, i.e., the output is a list of list of floats.
-
colRegex
(colName)[source]¶ Selects column based on the column name specified as a regex and returns it as Column.
Parameters: colName (str) – column name specified as a regex. Returns: Return type: DataStream Examples
>>> ds.colRegex("colName")
-
collect
()[source]¶ Collect all the data to master node and return list of rows
Returns: rows of all the dataframe Return type: List Examples
>>> ds.collect()
-
compute
(udfName, windowDuration: int = None, slideDuration: int = None, groupByColumnName: List[str] = [], startTime=None)[source]¶ Run an algorithm. This method supports running an udf method on windowed data
Parameters: - udfName – Name of the algorithm
- windowDuration (int) – duration of a window in seconds
- slideDuration (int) – slide duration of a window
- List[str] (groupByColumnName) – groupby column names, for example, groupby user, col1, col2
- startTime (datetime) – The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15… provide startTime as 15 minutes. First time of data will be used as startTime if none is provided
Returns: this will return a new datastream object with blank metadata
Return type:
-
corr
(col1, col2, method=None)[source]¶ Calculates the correlation of two columns of a DataStream as a double value. Currently only supports the Pearson Correlation Coefficient.
Parameters: - col1 (str) – The name of the first column
- col2 (str) – The name of the second column
- method (str) – The correlation method. Currently only supports “pearson”
Returns: this will return a new datastream object with blank metadata
Return type: Examples
>>> ds.corr("cal1", "col2", "pearson").collect()
-
cov
(col1, col2)[source]¶ Calculate the sample covariance for the given columns, specified by their names, as a double value.
Parameters: - col1 (str) – The name of the first column
- col2 (str) – The name of the second column
Returns: this will return a new datastream object with blank metadata
Return type: Examples
>>> ds.cov("cal1", "col2", "pearson").collect()
-
create_windows
(window_length='hour')[source]¶ filter data
Parameters: - columnName (str) – name of the column
- operator (str) – basic operators (e.g., >, <, ==, !=)
- value (Any) – if the columnName is timestamp, please provide python datatime object
Returns: this will return a new datastream object with blank metadata
Return type:
-
crossJoin
(other)[source]¶ Returns the cartesian product with another DataStream
Parameters: other – Right side of the cartesian product. Returns: DataStream object with joined streams Examples
>>> ds.crossJoin(ds2.select("col_name")).collect()
-
crosstab
(col1, col2)[source]¶ Computes a pair-wise frequency table of the given columns. Also known as a contingency table. The number of distinct values for each column should be less than 1e4. At most 1e6 non-zero pair frequencies will be returned. The first column of each row will be the distinct values of col1 and the column names will be the distinct values of col2. The name of the first column will be $col1_$col2. Pairs that have no occurrences will have zero as their counts.
Parameters: - col1 (str) – The name of the first column. Distinct items will make the first item of each row.
- col2 (str) – The name of the second column. Distinct items will make the column names of the DataStream.
Returns: DataStream object
Examples
>>> ds.crosstab("col_1", "col_2")
-
data
¶ get stream data
Returns (DataFrame):
-
describe
(*cols)[source]¶ Computes basic statistics for numeric and string columns. This include count, mean, stddev, min, and max. If no columns are given, this function computes statistics for all numerical or string columns.
Parameters: *cols – Examples
>>> ds.describe(['col_name']).show() >>> ds.describe().show()
-
distinct
()[source]¶ Returns a new DataStream containing the distinct rows in this DataStream.
Returns: this will return a new datastream object with blank metadata Return type: DataStream Examples
>>> ds.distinct().count()
-
drop
(*cols)[source]¶ Returns a new Datastream that drops the specified column. This is a no-op if schema doesn’t contain the given column name(s).
Parameters: *cols – a string name of the column to drop, or a Column to drop, or a list of string name of the columns to drop. Returns: Return type: Datastream Examples
>>> ds.drop('col_name')
-
dropDuplicates
(subset=None)[source]¶ Return a new DataStream with duplicate rows removed, optionally only considering certain columns.
Parameters: subset – optional list of column names to consider. Returns: Return type: Datastream Examples
>>> ds.dropDuplicates().show() >>> # Example on how to use it with params >>> ds.dropDuplicates(['col_name1', 'col_name2']).show()
-
dropna
(how='any', thresh=None, subset=None)[source]¶ Returns a new DataStream omitting rows with null values.
Parameters: - how – ‘any’ or ‘all’. If ‘any’, drop a row if it contains any nulls. If ‘all’, drop a row only if all its values are null.
- thresh – int, default None If specified, drop rows that have less than thresh non-null values. This overwrites the how parameter.
- subset – optional list of column names to consider.
Returns: Return type: Datastream
Examples
>>> ds.dropna()
-
exceptAll
(other)[source]¶ Return a new DataStream containing rows in this DataStream but not in another DataStream while preserving duplicates.
Parameters: other – other DataStream object Returns: Return type: Datastream Examples
>>> ds1.exceptAll(ds2).show()
-
explain
(extended=False)[source]¶ Prints the (logical and physical) plans to the console for debugging purpose.
Parameters: extended – boolean, default False. If False, prints only the physical plan. Examples
>>> ds.explain()
-
fillna
(value, subset=None)[source]¶ Replace null values
Parameters: - value – int, long, float, string, bool or dict. Value to replace null values with. If the value is a dict, then subset is ignored and value must be a mapping from column name (string) to replacement value. The replacement value must be an int, long, float, boolean, or string.
- subset – optional list of column names to consider. Columns specified in subset that do not have matching data type are ignored. For example, if value is a string, and subset contains a non-string column, then the non-string column is simply ignored.
Returns: Return type: Datastream
Examples
>>> ds.fill(50).show() >>> ds.fill({'col1': 50, 'col2': 'unknown'}).show()
-
filter
(condition)[source]¶ Filters rows using the given condition
Parameters: condition – a Column of types.BooleanType or a string of SQL expression. Returns: this will return a new datastream object with blank metadata Return type: DataStream Examples
>>> ds.filter("age > 3") >>> df.filter(df.age > 3)
-
filter_user
(user_ids: List)[source]¶ filter data to get only selective users’ data
Parameters: user_ids (List[str]) – list of users’ UUIDs Returns: this will return a new datastream object with blank metadata Return type: DataStream
-
filter_version
(version: List)[source]¶ filter data to get only selective users’ data
Parameters: version (List[str]) – list of stream versions Returns: this will return a new datastream object with blank metadata Return type: DataStream Todo
Metadata version should be return with the data
-
first
()[source]¶ Returns the first row as a Row.
Returns: First row of a DataStream Examples
>>> ds.first()
-
foreach
(f)[source]¶ Applies the f function to all Row of DataStream. This is a shorthand for df.rdd.foreach()
Parameters: f – function Returns: DataStream object Examples
>>> def f(person): ... print(person.name) >>> ds.foreach(f)
-
freqItems
(cols, support=None)[source]¶ Finding frequent items for columns, possibly with false positives. Using the frequent element count algorithm described in “http://dx.doi.org/10.1145/762471.762473, proposed by Karp, Schenker, and Papadimitriou”.
Returns: Return type: DataStream Examples
>>> ds.freqItems("col-name")
-
get_metadata
() → cerebralcortex.core.metadata_manager.stream.metadata.Metadata[source]¶ get stream metadata
Returns: single version of a stream Return type: Metadata Raises: Exception
– if specified version is not available for the stream
-
groupby
(*cols)[source]¶ Groups the DataFrame using the specified columns, so we can run aggregation on them. This method will return pyspark.sql.GroupedData object.
Parameters: of columns to group by. Each element should be a column name (list) – Returns:
-
head
(n=None)[source]¶ Returns the first n rows.
Parameters: n (int) – default 1. Number of rows to return. Returns: If n is greater than 1, return a list of Row. If n is 1, return a single Row. Notes
This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory.
Examples
>>> ds.head(5)
-
intersect
(other)[source]¶ Return a new DataFrame containing rows only in both this frame and another frame. This is equivalent to INTERSECT in SQL.
Parameters: other (int) – DataStream object Returns: If n is greater than 1, return a list of Row. If n is 1, return a single Row. Examples
>>> ds.intersect(other=ds2)
-
intersectAll
(other)[source]¶ Return a new DataFrame containing rows in both this dataframe and other dataframe while preserving duplicates.
Parameters: other (int) – DataStream object Returns: If n is greater than 1, return a list of Row. If n is 1, return a single Row. Examples
>>> ds.intersectAll(ds2).show()
-
join
(other, on=None, how=None)[source]¶ Joins with another DataStream, using the given join expression.
Parameters: - other (DataStream) – Right side of the join
- – a string for the join column name, a list of column names, a join expression (on) –
- how (str) – inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, and left_anti.
Examples
>>> ds.join(ds2, 'user', 'outer').show()
Returns: DataStream object with joined streams
-
join_stress_streams
(dataStream, propagation='forward')[source]¶ filter data
Parameters: - columnName (str) – name of the column
- operator (str) – basic operators (e.g., >, <, ==, !=)
- value (Any) – if the columnName is timestamp, please provide python datatime object
Returns: this will return a new datastream object with blank metadata
Return type:
-
limit
(num)[source]¶ Limits the result count to the number specified.
Parameters: num – Returns: Return type: Datastream
-
map_stream
(window_ds)[source]¶ Map/join a stream to a windowed stream
Parameters: window_ds (Datastream) – windowed datastream object Returns: joined/mapped stream Return type: Datastream
-
repartition
(numPartitions, *cols)[source]¶ Returns a new DataStream partitioned by the given partitioning expressions. The resulting DataStream is hash partitioned.
numPartitions can be an int to specify the target number of partitions or a Column. If it is a Column, it will be used as the first partitioning column. If not specified, the default number of partitions is used.
Parameters: - numPartitions –
- *cols –
Returns:
-
replace
(to_replace, value, subset=None)[source]¶ Returns a new DataStream replacing a value with another value. Values to_replace and value must have the same type and can only be numerics, booleans, or strings. Value can have None. When replacing, the new value will be cast to the type of the existing column. For numeric replacements all values to be replaced should have unique floating point representation. In case of conflicts (for example with {42: -1, 42.0: 1}) and arbitrary replacement will be used.
Parameters: - to_replace – bool, int, long, float, string, list or dict. Value to be replaced. If the value is a dict, then value is ignored or can be omitted, and to_replace must be a mapping between a value and a replacement.
- value – bool, int, long, float, string, list or None. The replacement value must be a bool, int, long, float, string or None. If value is a list, value should be of the same length and type as to_replace. If value is a scalar and to_replace is a sequence, then value is used as a replacement for each item in to_replace.
- subset – optional list of column names to consider. Columns specified in subset that do not have matching data type are ignored. For example, if value is a string, and subset contains a non-string column, then the non-string column is simply ignored.
Returns: Return type: Datastream
Examples
>>> ds.replace(10, 20).show() >>> ds.replace('some-str', None).show() >>> ds.replace(['old_val1', 'new_val1'], ['old_val2', 'new_val2'], 'col_name').show()
-
select
(*cols)[source]¶ Projects a set of expressions and returns a new DataStream :param cols: list of column names (string) or expressions (Column). If one of the column names is ‘*’, that column is expanded to include all columns in the current DataStream :type cols: str
Returns: this will return a new datastream object with selected columns Return type: DataStream Examples
>>> ds.select('*') >>> ds.select('name', 'age') >>> ds.select(ds.name, (ds.age + 10).alias('age'))
-
selectExpr
(*expr)[source]¶ This is a variant of select() that accepts SQL expressions. Projects a set of expressions and returns a new DataStream
Parameters: expr (str) – Returns: this will return a new datastream object with selected columns Return type: DataStream Examples
>>> ds.selectExpr("age * 2")
-
show
(n=20, truncate=True, vertical=False)[source]¶ Parameters: - n – Number of rows to show.
- truncate – If set to
True
, truncate strings longer than 20 chars by default. - set to a number greater than one, truncates long strings to length truncate (If) –
- align cells right. (and) –
- vertical – If set to
True
, print output rows vertically (one line - column value) (per) –
Returns:
-
sort
(*cols, **kwargs)[source]¶ Returns a new DataStream sorted by the specified column(s).
Parameters: - cols – list of Column or column names to sort by.
- ascending – boolean or list of boolean (default True). Sort ascending vs. descending. Specify list for multiple sort orders. If a list is specified, length of the list must equal length of the cols.
Returns: DataStream object
Return type: object
Examples
>>> ds.sort("col_name", ascending=False)
-
summary
(*statistics)[source]¶ Computes specified statistics for numeric and string columns. Available statistics are: - count - mean - stddev - min - max - arbitrary approximate percentiles specified as a percentage (eg, 75%) If no statistics are given, this function computes count, mean, stddev, min, approximate quartiles (percentiles at 25%, 50%, and 75%), and max.
Parameters: *statistics – Examples
>>> ds.summary().show() >>> ds.summary("count", "min", "25%", "75%", "max").show() >>> # To do a summary for specific columns first select them: >>> ds.select("col1", "col2").summary("count").show()
-
take
(num)[source]¶ Returns the first num rows as a list of Row.
Returns: row(s) of a DataStream Return type: Row(list) Examples
>>> ds.take()
-
toPandas
()[source]¶ This method converts pyspark dataframe into pandas dataframe.
Notes
This method will collect all the data on master node to convert pyspark dataframe into pandas dataframe. After converting to pandas dataframe datastream objects helper methods will not be accessible.
Returns: this will return a new datastream object with blank metadata Return type: Datastream (Metadata, pandas.DataFrame) Examples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> ds = CC.get_stream("STREAM-NAME") >>> new_ds = ds.toPandas() >>> new_ds.data.head()
-
union
(other)[source]¶ Return a new Datastream containing union of rows in this and another frame.
This is equivalent to UNION ALL in SQL. To do a SQL-style set union (that does deduplication of elements), use this function followed by distinct().
Also as standard in SQL, this function resolves columns by position (not by name).
Parameters: other (DataStream) – Returns: Return type: Datastream Examples
>>> ds.union(ds2).collect()
-
unionByName
(other)[source]¶ Returns a new Datastream containing union of rows in this and another frame.
This is different from both UNION ALL and UNION DISTINCT in SQL. To do a SQL-style set union (that does deduplication of elements), use this function followed by distinct().
The difference between this function and union() is that this function resolves columns by name (not by position):
Parameters: other (DataStream) – Returns: Return type: Datastream Examples
>>> ds.unionByName(ds2).show()
-
where
(condition)[source]¶ where() is an alias for filter().
Parameters: condition – Returns: Return type: Datastream Examples
>>> ds.filter("age > 3").collect()
-
window
(windowDuration: int = None, groupByColumnName: List[str] = [], slideDuration: int = None, startTime=None, preserve_ts=False)[source]¶ Window data into fixed length chunks. If no columnName is provided then the windowing will be performed on all the columns.
Parameters: - windowDuration (int) – duration of a window in seconds
- List[str] (groupByColumnName) – groupby column names, for example, groupby user, col1, col2
- slideDuration (int) – slide duration of a window
- startTime (datetime) – The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15… provide startTime as 15 minutes. First time of data will be used as startTime if none is provided
- preserve_ts (bool) – setting this to True will return timestamps of corresponding to each windowed value
Returns: this will return a new datastream object with blank metadata
Return type: Note
This windowing method will use collect_list to return values for each window. collect_list is not optimized.
-
withColumn
(colName, col)[source]¶ Returns a new DataStream by adding a column or replacing the existing column that has the same name. The column expression must be an expression over this DataStream; attempting to add a column from some other datastream will raise an error. :param colName: name of the new column. :type colName: str :param col: a Column expression for the new column.
Examples
>>> ds.withColumn('col_name', ds.col_name + 2)
-
withColumnRenamed
(existing, new)[source]¶ Returns a new DataStream by renaming an existing column. This is a no-op if schema doesn’t contain the given column name.
Parameters: - existing (str) – string, name of the existing column to rename.
- new (str) – string, new name of the column.
Examples
>>> ds.withColumnRenamed('col_name', 'new_col_name')
Returns: DataStream object with new column name(s)
-
-
class
CCLogging
(CC)[source]¶ Bases:
cerebralcortex.core.log_manager.log_handler.LogHandler
-
class
DataDescriptor
[source]¶ Bases:
object
-
from_json
(obj)[source]¶ Cast DataDescriptor class object into json
Parameters: obj (DataDescriptor) – object of a data descriptor class Returns: Return type: self
-
set_attribute
(key, value)[source]¶ Attributes field is option in metadata object. Arbitrary number or attributes could be attached to a DataDescriptor
Parameters: - key (str) – key of an attribute
- value (str) – value of an attribute
Returns: Return type: self
Raises: ValueError
– if key/value are missing
-
-
class
Metadata
[source]¶ Bases:
object
-
add_annotation
(annotation: str)[source]¶ Add annotation stream name
Parameters: annotation (str) – name of annotation or list of strings Returns: self
-
add_dataDescriptor
(dd: cerebralcortex.core.metadata_manager.stream.data_descriptor.DataDescriptor)[source]¶ Add data description of a stream
Parameters: dd (DataDescriptor) – data descriptor Returns: self
-
add_input_stream
(input_stream: str)[source]¶ Add input streams that were used to derive a new stream
Parameters: input_stream (str) – name of input stream OR list of input_stream names Returns: self
-
add_module
(mod: cerebralcortex.core.metadata_manager.stream.module_info.ModuleMetadata)[source]¶ Add module metadata
Parameters: mod (ModuleMetadata) – module metadata Returns: self
-
from_json_file
(metadata: dict) → List[source]¶ Convert dict (json) objects into Metadata class objects
Parameters: dict (json_list) – metadata dict Returns: metadata class object Return type: Metadata
-
from_json_sql
(metadata_json: dict) → List[source]¶ Convert dict (json) objects into Metadata class objects
Parameters: dict (json_list) – metadata dict Returns: metadata class object Return type: Metadata
-
get_dataDescriptor
(name)[source]¶ get data descriptor by name
Parameters: name (str) – Returns: DataDescriptor object
-
get_hash
() → str[source]¶ Get the unique hash of metadata. Hash is generated based on “stream-name + data_descriptor + module-metadata”
Returns: hash id of metadata Return type: str
-
get_hash_by_json
(metadata: dict = None) → str[source]¶ Get the unique hash of metadata. Hash is generated based on “stream-name + data_descriptor + module-metadata”
Parameters: metadata – only pass this if this method is used on a dict object outside of Metadata class Returns: hash id of metadata Return type: str
-
is_valid
() → bool[source]¶ check whether all required fields are set
Returns: True if fields are set or throws an exception in case of missing values Return type: bool - Exception:
- ValueError: if metadata fields are not set
-
set_description
(stream_description: str)[source]¶ Add stream description
Parameters: stream_description (str) – textual description of a stream Returns: self
-
-
class
ModuleMetadata
[source]¶ Bases:
object
-
from_json
(obj)[source]¶ Cast ModuleMetadata class object into json
Parameters: obj (ModuleMetadata) – object of a ModuleMetadata class Returns: Return type: self
-
set_attribute
(key: str, value: str)[source]¶ Attributes field is option in metadata object. Arbitrary number or attributes could be attached to a DataDescriptor
Parameters: - key (str) – key of an attribute
- value (str) – value of an attribute
Returns: Return type: self
Raises: ValueError
– if key/value are missing
set author key/value pair. For example, key=name, value=md2k
Parameters: - key (str) – author metadata key
- value (str) – author metadata value
Returns: Return type: self
set author key/value pair. For example, key=name, value=md2k
Parameters: authors (list[dict]) – List of authors names and emails ids in dict. For example, authors = [{“ali”:”ali@gmail.com”}, {“nasir”:”nasir@gmail.com”}] Returns: Return type: self
-
-
class
Metadata
[source]¶ Bases:
object
-
add_annotation
(annotation: str)[source]¶ Add annotation stream name
Parameters: annotation (str) – name of annotation or list of strings Returns: self
-
add_dataDescriptor
(dd: cerebralcortex.core.metadata_manager.stream.data_descriptor.DataDescriptor)[source]¶ Add data description of a stream
Parameters: dd (DataDescriptor) – data descriptor Returns: self
-
add_input_stream
(input_stream: str)[source]¶ Add input streams that were used to derive a new stream
Parameters: input_stream (str) – name of input stream OR list of input_stream names Returns: self
-
add_module
(mod: cerebralcortex.core.metadata_manager.stream.module_info.ModuleMetadata)[source]¶ Add module metadata
Parameters: mod (ModuleMetadata) – module metadata Returns: self
-
from_json_file
(metadata: dict) → List[source]¶ Convert dict (json) objects into Metadata class objects
Parameters: dict (json_list) – metadata dict Returns: metadata class object Return type: Metadata
-
from_json_sql
(metadata_json: dict) → List[source]¶ Convert dict (json) objects into Metadata class objects
Parameters: dict (json_list) – metadata dict Returns: metadata class object Return type: Metadata
-
get_dataDescriptor
(name)[source]¶ get data descriptor by name
Parameters: name (str) – Returns: DataDescriptor object
-
get_hash
() → str[source]¶ Get the unique hash of metadata. Hash is generated based on “stream-name + data_descriptor + module-metadata”
Returns: hash id of metadata Return type: str
-
get_hash_by_json
(metadata: dict = None) → str[source]¶ Get the unique hash of metadata. Hash is generated based on “stream-name + data_descriptor + module-metadata”
Parameters: metadata – only pass this if this method is used on a dict object outside of Metadata class Returns: hash id of metadata Return type: str
-
is_valid
() → bool[source]¶ check whether all required fields are set
Returns: True if fields are set or throws an exception in case of missing values Return type: bool - Exception:
- ValueError: if metadata fields are not set
-
set_description
(stream_description: str)[source]¶ Add stream description
Parameters: stream_description (str) – textual description of a stream Returns: self
-
-
class
DataDescriptor
[source]¶ Bases:
object
-
from_json
(obj)[source]¶ Cast DataDescriptor class object into json
Parameters: obj (DataDescriptor) – object of a data descriptor class Returns: Return type: self
-
set_attribute
(key, value)[source]¶ Attributes field is option in metadata object. Arbitrary number or attributes could be attached to a DataDescriptor
Parameters: - key (str) – key of an attribute
- value (str) – value of an attribute
Returns: Return type: self
Raises: ValueError
– if key/value are missing
-
-
class
ModuleMetadata
[source]¶ Bases:
object
-
from_json
(obj)[source]¶ Cast ModuleMetadata class object into json
Parameters: obj (ModuleMetadata) – object of a ModuleMetadata class Returns: Return type: self
-
set_attribute
(key: str, value: str)[source]¶ Attributes field is option in metadata object. Arbitrary number or attributes could be attached to a DataDescriptor
Parameters: - key (str) – key of an attribute
- value (str) – value of an attribute
Returns: Return type: self
Raises: ValueError
– if key/value are missing
set author key/value pair. For example, key=name, value=md2k
Parameters: - key (str) – author metadata key
- value (str) – author metadata value
Returns: Return type: self
set author key/value pair. For example, key=name, value=md2k
Parameters: authors (list[dict]) – List of authors names and emails ids in dict. For example, authors = [{“ali”:”ali@gmail.com”}, {“nasir”:”nasir@gmail.com”}] Returns: Return type: self
-
-
class
User
(user_id: uuid.UUID, username: str, password: str, token: str = None, token_issued_at: datetime.datetime = None, token_expiry: datetime.datetime = None, user_role: datetime.datetime = None, user_metadata: dict = None, active: bool = 1)[source]¶ Bases:
object
-
isactive
¶ user status
Type: Returns (int)
-
password
¶ encrypted password
Type: Returns Type: (str)
-
token
¶ auth token
Type: Returns Type: (str)
-
token_expiry
¶ date and time when token will expire
Type: Returns Type: (datetime)
-
token_issued_at
¶ date and time when token was issues
Type: Returns Type: (datetime)
-
user_id
¶ user id
Type: Returns Type: (str)
-
user_metadata
¶ metadata of a user
Type: Returns (dict)
-
user_role
¶ role
Type: Returns (str)
-
username
¶ user name
Type: Returns Type: (str)
-
-
get_or_create_sc
(type='sparkContext', name='CerebralCortex-Kernal', enable_spark_ui=False)[source]¶ get or create spark context
Parameters: - type (str) – type (sparkContext, SparkSessionBuilder, sparkSession, sqlContext). (default=”sparkContext”)
- name (str) – spark app name (default=”CerebralCortex-Kernal”)
Returns:
Module contents¶
cerebralcortex.examples package¶
Submodules¶
cerebralcortex.examples.brushing_detection module¶
cerebralcortex.examples.mprov_get module¶
cerebralcortex.examples.mprov_gps_example module¶
cerebralcortex.examples.stress_from_ecg module¶
Module contents¶
cerebralcortex.markers package¶
Subpackages¶
-
compute_corr_mse_accel_gyro
(self, exclude_col_names: list = [], accel_column_names: list = ['accelerometer_x', 'accelerometer_y', 'accelerometer_z'], gyro_column_names: list = ['gyroscope_y', 'gyroscope_x', 'gyroscope_z'], windowDuration: int = None, slideDuration: int = None, groupByColumnName: List[str] = [], startTime=None)[source]¶ Compute correlation and mean standard error of accel and gyro sensors
Parameters: - list (gyro_column_names) – name of the columns on which features should not be computed
- list – name of accel data column
- list – name of gyro data column
- windowDuration (int) – duration of a window in seconds
- slideDuration (int) – slide duration of a window
- List[str] (groupByColumnName) – groupby column names, for example, groupby user, col1, col2
- startTime (datetime) – The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15… provide startTime as 15 minutes. First time of data will be used as startTime if none is provided
Returns: DataStream object with all the existing data columns and FFT features
-
compute_fourier_features
(self, exclude_col_names: list = [], feature_names=['fft_centroid', 'fft_spread', 'spectral_entropy', 'spectral_entropy_old', 'fft_flux', 'spectral_falloff'], windowDuration: int = None, slideDuration: int = None, groupByColumnName: List[str] = [], startTime=None)[source]¶ Transforms data from time domain to frequency domain.
Parameters: - list (feature_names) – name of the columns on which features should not be computed
- list – names of the features. Supported features are fft_centroid, fft_spread, spectral_entropy, spectral_entropy_old, fft_flux, spectral_falloff
- windowDuration (int) – duration of a window in seconds
- slideDuration (int) – slide duration of a window
- List[str] (groupByColumnName) – groupby column names, for example, groupby user, col1, col2
- startTime (datetime) – The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15… provide startTime as 15 minutes. First time of data will be used as startTime if none is provided
Returns: DataStream object with all the existing data columns and FFT features
-
get_candidates
(ds, uper_limit: float = 0.1, threshold: float = 0.5)[source]¶ Get brushing candidates. Data is windowed into potential brushing candidate :param ds: :type ds: DataStream :param uper_limit: threashold for accel. This is used to know how high the hand is :type uper_limit: float :param threshold: :type threshold: float
Returns:
-
get_max_features
(ds)[source]¶ This method will compute what are the max values for accel and gyro statistical/FFT features :param ds: :type ds: DataStream
Returns: DataStream
-
get_orientation_data
(ds, wrist, ori=1, is_new_device=False, accelerometer_x='accelerometer_x', accelerometer_y='accelerometer_y', accelerometer_z='accelerometer_z', gyroscope_x='gyroscope_x', gyroscope_y='gyroscope_y', gyroscope_z='gyroscope_z')[source]¶ Get the orientation of hand using accel and gyro data. :param ds: DataStream object :param wrist: name of the wrist smart watch was worn :param ori: :param is_new_device: this param is for motionsense smart watch version :param accelerometer_x: :type accelerometer_x: float :param accelerometer_y: :type accelerometer_y: float :param accelerometer_z: :type accelerometer_z: float :param gyroscope_x: :type gyroscope_x: float :param gyroscope_y: :type gyroscope_y: float :param gyroscope_z: :type gyroscope_z: float
Returns: DataStream object
-
stress_from_ecg
(ecg_data: cerebralcortex.core.datatypes.datastream.DataStream, sensor_name: str = 'autosense', Fs: int = 64, model_path='./model/stress_ecg_final.p')[source]¶ Compute stress episodes from ecg timeseries data
Parameters: - ecg_data (DataStream) – ecg data
- sensor_name (str) – name of the sensor used to collect ecg data. Currently supports ‘autosense’ only
- Fs (int) – frequency of sensor data
Returns: stress episodes
Return type:
-
compute_encounters
(data_all_v3, data_all_v4, data_map_stream, data_key_stream, start_time, end_time, ltime=True)[source]¶
Module contents¶
cerebralcortex.plotting package¶
Subpackages¶
-
plot_hist
(ds, user_id: str, x_axis_column=None)[source]¶ histogram plot of timeseries data
Parameters: - ds (DataStream) –
- user_id (str) – uuid of a user
- x_axis_column (str) – x axis column of the plot
-
plot_timeseries
(ds: cerebralcortex.core.datatypes.datastream.DataStream, user_id: str, y_axis_column: str = None)[source]¶ line plot of timeseries data
Parameters: - ds (DataStream) –
- user_id (str) – uuid of a user
- y_axis_column (str) – x axis column is hard coded as timestamp column. only y-axis can be passed as a param
-
plot_gps_clusters
(ds, user_id: str, zoom=5)[source]¶ Plots GPS coordinates
Parameters: - ds (DataStream) – datastream object
- user_id (str) – uuid of a user
- zoom – min 0 and max 100, zoom map
-
plot_comparison
(ds, x_axis_column='stresser_main', usr_id=None, compare_with='all')[source]¶ Parameters: - ds –
- x_axis_column –
- usr_id –
- compare_with –
Submodules¶
cerebralcortex.plotting.util module¶
Module contents¶
cerebralcortex.test_suite package¶
Subpackages¶
-
gen_location_datastream
(user_id, stream_name) → object[source]¶ Create pyspark dataframe with some sample gps data (Memphis, TN, lat, long, alt coordinates)
Parameters: - user_id (str) – id of a user
- stream_name (str) – sample gps stream name
Returns: datastream object of gps location stream with its metadata
Return type:
-
gen_phone_battery_data
() → object[source]¶ Create pyspark dataframe with some sample phone battery data
Returns: pyspark dataframe object with columns: [“timestamp”, “offset”, “battery_level”, “ver”, “user”] Return type: DataFrame
Submodules¶
cerebralcortex.test_suite.join_spark module¶
cerebralcortex.test_suite.test_glucose_metrics module¶
cerebralcortex.test_suite.test_gps_cluster_udf module¶
cerebralcortex.test_suite.test_import_data module¶
cerebralcortex.test_suite.test_main module¶
-
class
TestCerebralCortex
(methodName='runTest')[source]¶ Bases:
unittest.case.TestCase
,cerebralcortex.test_suite.test_nosql_storage.NoSqlStorageTest
,cerebralcortex.test_suite.test_sql_storage.SqlStorageTest
cerebralcortex.test_suite.test_rest_api_server module¶
cerebralcortex.test_suite.tt module¶
Module contents¶
cerebralcortex.util package¶
Submodules¶
cerebralcortex.util.helper_methods module¶
-
get_study_names
(configs_dir_path: str) → List[str][source]¶ CerebralCortex constructor
Parameters: configs_dir_path (str) – Directory path of cerebralcortex configurations. Returns: list of study names available Return type: list(str) Raises: ValueError
– If configuration_filepath is None or empty.Examples
>>> get_study_names("/directory/path/of/configs/")
Module contents¶
Submodules¶
cerebralcortex.kernel module¶
-
class
Kernel
(configs_dir_path: str = '', cc_configs: dict = None, study_name: str = 'default', new_study: bool = False, enable_spark: bool = True, enable_spark_ui=False)[source]¶ Bases:
object
-
connect
(username: str, password: str, encrypt_password: bool = False) → dict[source]¶ Authenticate a user based on username and password and return an auth token
Parameters: - username (str) – username of a user
- password (str) – password of a user
- encrypt_password (str) – is password encrypted or not. mCerebrum sends encrypted passwords
Raises: ValueError
– User name and password cannot be empty/None.Returns: return eturn {“status”:bool, “auth_token”: str, “msg”: str}
Return type: dict
Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.connect("nasir_ali", "2ksdfhoi2r2ljndf823hlkf8234hohwef0234hlkjwer98u234", True) >>> True
-
create_user
(username: str, user_password: str, user_role: str, user_metadata: dict, user_settings: dict, encrypt_password: bool = False) → bool[source]¶ Create a user in SQL storage if it doesn’t exist
Parameters: - username (str) – Only alphanumeric usernames are allowed with the max length of 25 chars.
- user_password (str) – no size limit on password
- user_role (str) – role of a user
- user_metadata (dict) – metadata of a user
- user_settings (dict) – user settings, mCerebrum configurations of a user
- encrypt_password (bool) – encrypt password if set to true
Returns: True if user is successfully registered or throws any error in case of failure
Return type: bool
Raises: ValueError
– if selected username is not availableException
– if sql query fails
-
encrypt_user_password
(user_password: str) → str[source]¶ Encrypt password
Parameters: user_password (str) – unencrypted password Raises: ValueError
– password cannot be None or empty.Returns: encrypted password Return type: str
-
gen_random_pass
(string_type: str = 'varchar', size: int = 8) → str[source]¶ Generate a random password
Parameters: - string_type – Accepted parameters are “varchar” and “char”. (Default=”varchar”)
- size – password length (default=8)
Returns: random password
Return type: str
-
get_stream
(stream_name: str, version: str = 'latest', user_id: str = None, data_type=<DataSet.COMPLETE: (1, )>) → cerebralcortex.core.datatypes.datastream.DataStream[source]¶ Retrieve a data-stream with it’s metadata.
Parameters: - stream_name (str) – name of a stream
- version (str) – version of a stream. Acceptable parameters are all, latest, or a specific version of a stream (e.g., 2.0) (Default=”all”)
- data_type (DataSet) – DataSet.COMPLETE returns both Data and Metadata. DataSet.ONLY_DATA returns only Data. DataSet.ONLY_METADATA returns only metadata of a stream. (Default=DataSet.COMPLETE)
Returns: contains Data and/or metadata
Return type: Raises: ValueError
– if stream name is empty or NoneNote
Please specify a version if you know the exact version of a stream. Getting all the stream data and then filtering versions won’t be efficient.
Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> ds = CC.get_stream("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> ds.data # an object of a dataframe >>> ds.metadata # an object of MetaData class >>> ds.get_metadata(version=1) # get the specific version metadata of a stream
-
get_stream_metadata_by_hash
(metadata_hash: <module 'uuid' from '/home/docs/.pyenv/versions/3.6.8/lib/python3.6/uuid.py'>) → str[source]¶ metadata_hash are unique to each stream version. This reverse look can return the stream name of a metadata_hash.
Parameters: metadata_hash (uuid) – This could be an actual uuid object or a string form of uuid. Returns: [stream_name, metadata] Return type: List Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.get_stream_metadata_by_hash("00ab666c-afb8-476e-9872-6472b4e66b68") >>> ["name" .....] # stream metadata and other information
-
get_stream_metadata_by_name
(stream_name: str, version: str = 1) → List[cerebralcortex.core.metadata_manager.stream.metadata.Metadata][source]¶ Get a list of metadata for all versions available for a stream.
Parameters: - stream_name (str) – name of a stream
- version (str) – version of a stream. Acceptable parameters are all, latest, or a specific version of a stream (e.g., 2.0) (Default=”all”)
Returns: Returns an empty list if no metadata is available for a stream_name or a list of metadata otherwise.
Return type: Raises: ValueError
– stream_name cannot be None or empty.Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.get_stream_metadata_by_name("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST", version=1) >>> Metadata # list of MetaData class objects
-
get_stream_metadata_hash
(stream_name: str) → list[source]¶ Get all the metadata_hash associated with a stream name.
Parameters: stream_name (str) – name of a stream Returns: list of all the metadata hashes with name and versions Return type: list Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.get_stream_metadata_hash("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> [["stream_name", "version", "metadata_hash"]]
-
get_stream_name
(metadata_hash: <module 'uuid' from '/home/docs/.pyenv/versions/3.6.8/lib/python3.6/uuid.py'>) → str[source]¶ metadata_hash are unique to each stream version. This reverse look can return the stream name of a metadata_hash.
Parameters: metadata_hash (uuid) – This could be an actual uuid object or a string form of uuid. Returns: name of a stream Return type: str Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.get_stream_name("00ab666c-afb8-476e-9872-6472b4e66b68") >>> ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST
-
get_stream_versions
(stream_name: str) → list[source]¶ Returns a list of versions available for a stream
Parameters: stream_name (str) – name of a stream Returns: list of int Return type: list Raises: ValueError
– if stream_name is empty or NoneExamples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.get_stream_versions("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> [1, 2, 4]
-
get_user_id
(user_name: str) → str[source]¶ Get the user id linked to user_name.
Parameters: user_name (str) – username of a user Returns: user id associated to user_name Return type: str Raises: ValueError
– User name is a required field.Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.get_user_id("nasir_ali") >>> '76cc444c-4fb8-776e-2872-9472b4e66b16'
-
get_user_metadata
(user_id: str = None, username: str = None) → dict[source]¶ Get user metadata by user_id or by username
Parameters: - user_id (str) – id (uuid) of a user
- user_name (str) – username of a user
Returns: user metadata
Return type: dict
Todo
Return list of User class object
Raises: ValueError
– User ID/name cannot be empty.Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.get_user_metadata(username="nasir_ali") >>> {"study_name":"mperf"........}
-
get_user_name
(user_id: str) → str[source]¶ Get the user name linked to a user id.
Parameters: user_name (str) – username of a user Returns: user_id associated to username Return type: bool Raises: ValueError
– User ID is a required field.Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.get_username("76cc444c-4fb8-776e-2872-9472b4e66b16") >>> 'nasir_ali'
-
get_user_settings
(username: str = None, auth_token: str = None) → dict[source]¶ Get user settings by auth-token or by username. These are user’s mCerebrum settings
Parameters: - username (str) – username of a user
- auth_token (str) – auth-token
Returns: List of dictionaries of user metadata
Return type: list[dict]
Todo
Return list of User class object
Raises: ValueError
– User ID/name cannot be empty.Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.get_user_settings(username="nasir_ali") >>> [{"mcerebrum":"some-conf"........}]
-
is_auth_token_valid
(username: str, auth_token: str, checktime: bool = False) → bool[source]¶ Validate whether a token is valid or expired based on the token expiry datetime stored in SQL
Parameters: - username (str) – username of a user
- auth_token (str) – token generated by API-Server
- checktime (bool) – setting this to False will only check if the token is available in system. Setting this to true will check if the token is expired based on the token expiry date.
Raises: ValueError
– Auth token and auth-token expiry time cannot be null/empty.Returns: returns True if token is valid or False otherwise.
Return type: bool
-
is_stream
(stream_name: str) → bool[source]¶ Returns true if provided stream exists.
Parameters: stream_name (str) – name of a stream Returns: True if stream_name exist False otherwise Return type: bool Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.is_stream("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> True
-
is_user
(user_id: str = None, user_name: str = None) → bool[source]¶ Checks whether a user exists in the system. One of both parameters could be set to verify whether user exist.
Parameters: - user_id (str) – id (uuid) of a user
- user_name (str) – username of a user
Returns: True if a user exists in the system or False otherwise.
Return type: bool
Raises: ValueError
– Both user_id and user_name cannot be None or empty.Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.is_user(user_id="76cc444c-4fb8-776e-2872-9472b4e66b16") >>> True
-
list_streams
() → List[str][source]¶ Get all the available stream names with metadata
Returns: list of available streams metadata Return type: List[str] Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.list_streams()
-
list_users
() → List[dict][source]¶ Get a list of all users part of a study.
Parameters: study_name (str) – name of a study. If no study_name is provided then all users’ list will be returned Raises: ValueError
– Study name is a requied field.Returns: Returns empty list if there is no user associated to the study_name and/or study_name does not exist. Return type: list[dict] Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.list_users() >>> [{"76cc444c-4fb8-776e-2872-9472b4e66b16": "nasir_ali"}] # [{user_id, user_name}]
-
read_csv
(file_path, stream_name: str, header: bool = False, delimiter: str = ', ', column_names: list = [], timestamp_column_index: int = 0, timein: str = 'milliseconds', metadata: cerebralcortex.core.metadata_manager.stream.metadata.Metadata = None) → cerebralcortex.core.datatypes.datastream.DataStream[source]¶ Reads a csv file (compressed or uncompressed), parse it, convert it into CC DataStream object format and returns it
Parameters: - file_path (str) – path of the file
- stream_name (str) – name of the stream
- header (bool) – set it to True if csv contains header column
- delimiter (str) – seprator used in csv file. Default is comma
- column_names (list[str]) – list of column names
- timestamp_column_index (int) – index of the timestamp column name
- timein (str) – if timestamp is epoch time, provide whether it is in milliseconds or seconds
- metadata (Metadata) – metadata object for the csv file
Returns: DataStream object
-
save_stream
(datastream: cerebralcortex.core.datatypes.datastream.DataStream, overwrite=False) → bool[source]¶ Saves datastream raw data in selected NoSQL storage and metadata in MySQL.
Parameters: - datastream (DataStream) – a DataStream object
- overwrite (bool) – if set to true, whole existing datastream data will be overwritten by new data
Returns: True if stream is successfully stored or throws an exception
Return type: bool
Raises: Exception
– log or throws exception if stream is not storedTodo
Add functionality to store data in influxdb.
Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> ds = DataStream(dataframe, MetaData) >>> CC.save_stream(ds)
-
search_stream
(stream_name)[source]¶ Find all the stream names similar to stream_name arg. For example, passing “location” argument will return all stream names that contain the word location
Returns: list of stream names similar to stream_name arg Return type: List[str] Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.search_stream("battery") >>> ["BATTERY--org.md2k.motionsense--MOTION_SENSE_HRV--LEFT_WRIST", "BATTERY--org.md2k.phonesensor--PHONE".....]
-
update_auth_token
(username: str, auth_token: str, auth_token_issued_time: datetime.datetime, auth_token_expiry_time: datetime.datetime) → bool[source]¶ Update an auth token in SQL database to keep user stay logged in. Auth token valid duration can be changed in configuration files.
Notes
This method is used by API-server to store newly created auth-token
Parameters: - username (str) – username of a user
- auth_token (str) – issued new auth token
- auth_token_issued_time (datetime) – datetime when the old auth token was issue
- auth_token_expiry_time (datetime) – datetime when the token will get expired
Raises: ValueError
– Auth token and auth-token issue/expiry time cannot be None/empty.Returns: Returns True if the new auth token is set or False otherwise.
Return type: bool
-
Module contents¶
-
class
Kernel
(configs_dir_path: str = '', cc_configs: dict = None, study_name: str = 'default', new_study: bool = False, enable_spark: bool = True, enable_spark_ui=False)[source]¶ Bases:
object
-
connect
(username: str, password: str, encrypt_password: bool = False) → dict[source]¶ Authenticate a user based on username and password and return an auth token
Parameters: - username (str) – username of a user
- password (str) – password of a user
- encrypt_password (str) – is password encrypted or not. mCerebrum sends encrypted passwords
Raises: ValueError
– User name and password cannot be empty/None.Returns: return eturn {“status”:bool, “auth_token”: str, “msg”: str}
Return type: dict
Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.connect("nasir_ali", "2ksdfhoi2r2ljndf823hlkf8234hohwef0234hlkjwer98u234", True) >>> True
-
create_user
(username: str, user_password: str, user_role: str, user_metadata: dict, user_settings: dict, encrypt_password: bool = False) → bool[source]¶ Create a user in SQL storage if it doesn’t exist
Parameters: - username (str) – Only alphanumeric usernames are allowed with the max length of 25 chars.
- user_password (str) – no size limit on password
- user_role (str) – role of a user
- user_metadata (dict) – metadata of a user
- user_settings (dict) – user settings, mCerebrum configurations of a user
- encrypt_password (bool) – encrypt password if set to true
Returns: True if user is successfully registered or throws any error in case of failure
Return type: bool
Raises: ValueError
– if selected username is not availableException
– if sql query fails
-
encrypt_user_password
(user_password: str) → str[source]¶ Encrypt password
Parameters: user_password (str) – unencrypted password Raises: ValueError
– password cannot be None or empty.Returns: encrypted password Return type: str
-
gen_random_pass
(string_type: str = 'varchar', size: int = 8) → str[source]¶ Generate a random password
Parameters: - string_type – Accepted parameters are “varchar” and “char”. (Default=”varchar”)
- size – password length (default=8)
Returns: random password
Return type: str
-
get_stream
(stream_name: str, version: str = 'latest', user_id: str = None, data_type=<DataSet.COMPLETE: (1, )>) → cerebralcortex.core.datatypes.datastream.DataStream[source]¶ Retrieve a data-stream with it’s metadata.
Parameters: - stream_name (str) – name of a stream
- version (str) – version of a stream. Acceptable parameters are all, latest, or a specific version of a stream (e.g., 2.0) (Default=”all”)
- data_type (DataSet) – DataSet.COMPLETE returns both Data and Metadata. DataSet.ONLY_DATA returns only Data. DataSet.ONLY_METADATA returns only metadata of a stream. (Default=DataSet.COMPLETE)
Returns: contains Data and/or metadata
Return type: Raises: ValueError
– if stream name is empty or NoneNote
Please specify a version if you know the exact version of a stream. Getting all the stream data and then filtering versions won’t be efficient.
Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> ds = CC.get_stream("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> ds.data # an object of a dataframe >>> ds.metadata # an object of MetaData class >>> ds.get_metadata(version=1) # get the specific version metadata of a stream
-
get_stream_metadata_by_hash
(metadata_hash: <module 'uuid' from '/home/docs/.pyenv/versions/3.6.8/lib/python3.6/uuid.py'>) → str[source]¶ metadata_hash are unique to each stream version. This reverse look can return the stream name of a metadata_hash.
Parameters: metadata_hash (uuid) – This could be an actual uuid object or a string form of uuid. Returns: [stream_name, metadata] Return type: List Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.get_stream_metadata_by_hash("00ab666c-afb8-476e-9872-6472b4e66b68") >>> ["name" .....] # stream metadata and other information
-
get_stream_metadata_by_name
(stream_name: str, version: str = 1) → List[cerebralcortex.core.metadata_manager.stream.metadata.Metadata][source]¶ Get a list of metadata for all versions available for a stream.
Parameters: - stream_name (str) – name of a stream
- version (str) – version of a stream. Acceptable parameters are all, latest, or a specific version of a stream (e.g., 2.0) (Default=”all”)
Returns: Returns an empty list if no metadata is available for a stream_name or a list of metadata otherwise.
Return type: Raises: ValueError
– stream_name cannot be None or empty.Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.get_stream_metadata_by_name("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST", version=1) >>> Metadata # list of MetaData class objects
-
get_stream_metadata_hash
(stream_name: str) → list[source]¶ Get all the metadata_hash associated with a stream name.
Parameters: stream_name (str) – name of a stream Returns: list of all the metadata hashes with name and versions Return type: list Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.get_stream_metadata_hash("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> [["stream_name", "version", "metadata_hash"]]
-
get_stream_name
(metadata_hash: <module 'uuid' from '/home/docs/.pyenv/versions/3.6.8/lib/python3.6/uuid.py'>) → str[source]¶ metadata_hash are unique to each stream version. This reverse look can return the stream name of a metadata_hash.
Parameters: metadata_hash (uuid) – This could be an actual uuid object or a string form of uuid. Returns: name of a stream Return type: str Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.get_stream_name("00ab666c-afb8-476e-9872-6472b4e66b68") >>> ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST
-
get_stream_versions
(stream_name: str) → list[source]¶ Returns a list of versions available for a stream
Parameters: stream_name (str) – name of a stream Returns: list of int Return type: list Raises: ValueError
– if stream_name is empty or NoneExamples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.get_stream_versions("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> [1, 2, 4]
-
get_user_id
(user_name: str) → str[source]¶ Get the user id linked to user_name.
Parameters: user_name (str) – username of a user Returns: user id associated to user_name Return type: str Raises: ValueError
– User name is a required field.Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.get_user_id("nasir_ali") >>> '76cc444c-4fb8-776e-2872-9472b4e66b16'
-
get_user_metadata
(user_id: str = None, username: str = None) → dict[source]¶ Get user metadata by user_id or by username
Parameters: - user_id (str) – id (uuid) of a user
- user_name (str) – username of a user
Returns: user metadata
Return type: dict
Todo
Return list of User class object
Raises: ValueError
– User ID/name cannot be empty.Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.get_user_metadata(username="nasir_ali") >>> {"study_name":"mperf"........}
-
get_user_name
(user_id: str) → str[source]¶ Get the user name linked to a user id.
Parameters: user_name (str) – username of a user Returns: user_id associated to username Return type: bool Raises: ValueError
– User ID is a required field.Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.get_username("76cc444c-4fb8-776e-2872-9472b4e66b16") >>> 'nasir_ali'
-
get_user_settings
(username: str = None, auth_token: str = None) → dict[source]¶ Get user settings by auth-token or by username. These are user’s mCerebrum settings
Parameters: - username (str) – username of a user
- auth_token (str) – auth-token
Returns: List of dictionaries of user metadata
Return type: list[dict]
Todo
Return list of User class object
Raises: ValueError
– User ID/name cannot be empty.Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.get_user_settings(username="nasir_ali") >>> [{"mcerebrum":"some-conf"........}]
-
is_auth_token_valid
(username: str, auth_token: str, checktime: bool = False) → bool[source]¶ Validate whether a token is valid or expired based on the token expiry datetime stored in SQL
Parameters: - username (str) – username of a user
- auth_token (str) – token generated by API-Server
- checktime (bool) – setting this to False will only check if the token is available in system. Setting this to true will check if the token is expired based on the token expiry date.
Raises: ValueError
– Auth token and auth-token expiry time cannot be null/empty.Returns: returns True if token is valid or False otherwise.
Return type: bool
-
is_stream
(stream_name: str) → bool[source]¶ Returns true if provided stream exists.
Parameters: stream_name (str) – name of a stream Returns: True if stream_name exist False otherwise Return type: bool Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.is_stream("ACCELEROMETER--org.md2k.motionsense--MOTION_SENSE_HRV--RIGHT_WRIST") >>> True
-
is_user
(user_id: str = None, user_name: str = None) → bool[source]¶ Checks whether a user exists in the system. One of both parameters could be set to verify whether user exist.
Parameters: - user_id (str) – id (uuid) of a user
- user_name (str) – username of a user
Returns: True if a user exists in the system or False otherwise.
Return type: bool
Raises: ValueError
– Both user_id and user_name cannot be None or empty.Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.is_user(user_id="76cc444c-4fb8-776e-2872-9472b4e66b16") >>> True
-
list_streams
() → List[str][source]¶ Get all the available stream names with metadata
Returns: list of available streams metadata Return type: List[str] Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.list_streams()
-
list_users
() → List[dict][source]¶ Get a list of all users part of a study.
Parameters: study_name (str) – name of a study. If no study_name is provided then all users’ list will be returned Raises: ValueError
– Study name is a requied field.Returns: Returns empty list if there is no user associated to the study_name and/or study_name does not exist. Return type: list[dict] Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.list_users() >>> [{"76cc444c-4fb8-776e-2872-9472b4e66b16": "nasir_ali"}] # [{user_id, user_name}]
-
read_csv
(file_path, stream_name: str, header: bool = False, delimiter: str = ', ', column_names: list = [], timestamp_column_index: int = 0, timein: str = 'milliseconds', metadata: cerebralcortex.core.metadata_manager.stream.metadata.Metadata = None) → cerebralcortex.core.datatypes.datastream.DataStream[source]¶ Reads a csv file (compressed or uncompressed), parse it, convert it into CC DataStream object format and returns it
Parameters: - file_path (str) – path of the file
- stream_name (str) – name of the stream
- header (bool) – set it to True if csv contains header column
- delimiter (str) – seprator used in csv file. Default is comma
- column_names (list[str]) – list of column names
- timestamp_column_index (int) – index of the timestamp column name
- timein (str) – if timestamp is epoch time, provide whether it is in milliseconds or seconds
- metadata (Metadata) – metadata object for the csv file
Returns: DataStream object
-
save_stream
(datastream: cerebralcortex.core.datatypes.datastream.DataStream, overwrite=False) → bool[source]¶ Saves datastream raw data in selected NoSQL storage and metadata in MySQL.
Parameters: - datastream (DataStream) – a DataStream object
- overwrite (bool) – if set to true, whole existing datastream data will be overwritten by new data
Returns: True if stream is successfully stored or throws an exception
Return type: bool
Raises: Exception
– log or throws exception if stream is not storedTodo
Add functionality to store data in influxdb.
Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> ds = DataStream(dataframe, MetaData) >>> CC.save_stream(ds)
-
search_stream
(stream_name)[source]¶ Find all the stream names similar to stream_name arg. For example, passing “location” argument will return all stream names that contain the word location
Returns: list of stream names similar to stream_name arg Return type: List[str] Examples
>>> CC = Kernel("/directory/path/of/configs/", study_name="default") >>> CC.search_stream("battery") >>> ["BATTERY--org.md2k.motionsense--MOTION_SENSE_HRV--LEFT_WRIST", "BATTERY--org.md2k.phonesensor--PHONE".....]
-
update_auth_token
(username: str, auth_token: str, auth_token_issued_time: datetime.datetime, auth_token_expiry_time: datetime.datetime) → bool[source]¶ Update an auth token in SQL database to keep user stay logged in. Auth token valid duration can be changed in configuration files.
Notes
This method is used by API-server to store newly created auth-token
Parameters: - username (str) – username of a user
- auth_token (str) – issued new auth token
- auth_token_issued_time (datetime) – datetime when the old auth token was issue
- auth_token_expiry_time (datetime) – datetime when the token will get expired
Raises: ValueError
– Auth token and auth-token issue/expiry time cannot be None/empty.Returns: Returns True if the new auth token is set or False otherwise.
Return type: bool
-