cerebralcortex.core.datatypes package

Submodules

cerebralcortex.core.datatypes.datastream module

class DataStream(data: object = None, metadata: cerebralcortex.core.metadata_manager.stream.metadata.Metadata = None)[source]

Bases: object

collect()[source]

Collect all the data to master node and return list of rows

Returns:rows of all the dataframe
Return type:List
compute(udfName, timeInterval=None)[source]
compute_average(windowDuration: int = None, colmnName: str = None) → object[source]

Window data and compute average of a windowed data of a single or all columns

Parameters:
  • windowDuration (int) – duration of a window in seconds. If it is not set then stats will be computed for the whole data in a column(s)
  • colmnName (str) – average will be computed for all the columns if columnName param is not provided (for all windows)
Returns:

this will return a new datastream object with blank metadata

Return type:

DataStream

compute_max(windowDuration: int = None, colmnName: str = None) → object[source]

Window data and compute max of a windowed data of a single or all columns

Parameters:
  • windowDuration (int) – duration of a window in seconds. If it is not set then stats will be computed for the whole data in a column(s)
  • colmnName (str) – max will be computed for all the columns if columnName param is not provided (for all windows)
Returns:

this will return a new datastream object with blank metadata

Return type:

DataStream

compute_min(windowDuration: int = None, colmnName: str = None) → object[source]

Window data and compute min of a windowed data of a single or all columns

Parameters:
  • windowDuration (int) – duration of a window in seconds. If it is not set then stats will be computed for the whole data in a column(s)
  • colmnName (str) – min value will be computed for all the columns if columnName param is not provided (for all windows)
Returns:

this will return a new datastream object with blank metadata

Return type:

DataStream

compute_sqrt(windowDuration: int = None, colmnName: str = None) → object[source]

Window data and compute square root of a windowed data of a single or all columns

Parameters:
  • windowDuration (int) – duration of a window in seconds. If it is not set then stats will be computed for the whole data in a column(s)
  • colmnName (str) – square root will be computed for all the columns if columnName param is not provided (for all windows)
Returns:

this will return a new datastream object with blank metadata

Return type:

DataStream

compute_stddev(windowDuration: int = None, colmnName: str = None) → object[source]

Window data and compute standard deviation of a windowed data of a single or all columns

Parameters:
  • windowDuration (int) – duration of a window in seconds. If it is not set then stats will be computed for the whole data in a column(s)
  • colmnName (str) – standard deviation will be computed for all the columns if columnName param is not provided (for all windows)
Returns:

this will return a new datastream object with blank metadata

Return type:

DataStream

compute_sum(windowDuration: int = None, colmnName: str = None) → object[source]

Window data and compute sum of a windowed data of a single or all columns

Parameters:
  • windowDuration (int) – duration of a window in seconds. If it is not set then stats will be computed for the whole data in a column(s)
  • colmnName (str) – average will be computed for all the columns if columnName param is not provided (for all windows)
Returns:

this will return a new datastream object with blank metadata

Return type:

DataStream

compute_variance(windowDuration: int = None, colmnName: str = None) → object[source]

Window data and compute variance of a windowed data of a single or all columns

Parameters:
  • windowDuration (int) – duration of a window in seconds. If it is not set then stats will be computed for the whole data in a column(s)
  • colmnName (str) – variance will be computed for all the columns if columnName param is not provided (for all windows)
Returns:

this will return a new datastream object with blank metadata

Return type:

DataStream

create_windows(window_length='hour')[source]

filter data

Parameters:
  • columnName (str) – name of the column
  • operator (str) – basic operators (e.g., >, <, ==, !=)
  • value (Any) – if the columnName is timestamp, please provide python datatime object
Returns:

this will return a new datastream object with blank metadata

Return type:

DataStream

data

get stream data

Returns (DataFrame):

drop_column(*args, **kwargs)[source]

calls deafult dataframe drop

Parameters:
  • *args
  • **kwargs
filter(columnName, operator, value)[source]

filter data

Parameters:
  • columnName (str) – name of the column
  • operator (str) – basic operators (e.g., >, <, ==, !=)
  • value (Any) – if the columnName is timestamp, please provide python datatime object
Returns:

this will return a new datastream object with blank metadata

Return type:

DataStream

filter_user(user_ids: List)[source]

filter data to get only selective users’ data

Parameters:user_ids (List[str]) – list of users’ UUIDs
Returns:this will return a new datastream object with blank metadata
Return type:DataStream
filter_version(version: List)[source]

filter data to get only selective users’ data

Parameters:version (List[str]) – list of stream versions
Returns:this will return a new datastream object with blank metadata
Return type:DataStream

Todo

Metadata version should be return with the data

get_metadata(version: int = None) → cerebralcortex.core.metadata_manager.stream.metadata.Metadata[source]

get stream metadata

Parameters:version (int) – version of a stream
Returns:single version of a stream
Return type:Metadata
Raises:Exception – if specified version is not available for the stream
groupby(*columnName)[source]

Group data by column name :param columnName: name of the column to group by with :type columnName: str

Returns:

join(dataStream, propagation='forward')[source]

filter data

Parameters:
  • columnName (str) – name of the column
  • operator (str) – basic operators (e.g., >, <, ==, !=)
  • value (Any) – if the columnName is timestamp, please provide python datatime object
Returns:

this will return a new datastream object with blank metadata

Return type:

DataStream

limit(*args, **kwargs)[source]

calls deafult dataframe limit

Parameters:
  • *args
  • **kwargs
map_stream(window_ds)[source]

Map/join a stream to a windowed stream

Parameters:window_ds (Datastream) – windowed datastream object
Returns:joined/mapped stream
Return type:Datastream
metadata

return stream metadata

Returns:
Return type:Metadata
plot(y_axis_column=None)[source]
plot_gps_cords(zoom=5)[source]
plot_hist(x_axis_column=None)[source]
plot_stress_bar(x_axis_column='stresser_main')[source]
plot_stress_comparison(x_axis_column='stresser_main', usr_id=None, compare_with='all')[source]
plot_stress_gantt()[source]
plot_stress_pie(x_axis_column='stresser_main')[source]
plot_stress_sankey(cat_cols=['stresser_main', 'stresser_sub'], value_cols='density', title="Stressers' Sankey Diagram")[source]
run_algorithm(udfName, columnNames: List[str] = [], windowDuration: int = 60, slideDuration: int = None, groupByColumnName: List[str] = [], startTime=None, preserve_ts=False)[source]

Run an algorithm

Parameters:
  • udfName – Name of the algorithm
  • List[str] (groupByColumnName) – column names on which windowing should be performed. Windowing will be performed on all columns if none is provided
  • windowDuration (int) – duration of a window in seconds
  • slideDuration (int) – slide duration of a window
  • List[str] – groupby column names, for example, groupby user, col1, col2
  • startTime (datetime) – The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15… provide startTime as 15 minutes. First time of data will be used as startTime if none is provided
  • preserve_ts (bool) – setting this to True will return timestamps of corresponding to each windowed value
Returns:

this will return a new datastream object with blank metadata

Return type:

DataStream

schema()[source]

Get data schema (e.g., column names and number of columns etc.)

Returns:pyspark dataframe schema object
show(*args, **kwargs)[source]
sort(columnNames: list = [], ascending=True)[source]

Sort data column in ASC or DESC order

Returns:DataStream object
Return type:object
summary()[source]

print the summary of the data

to_pandas()[source]

This method converts pyspark dataframe into pandas dataframe.

Notes

This method will collect all the data on master node to convert pyspark dataframe into pandas dataframe. After converting to pandas dataframe datastream objects helper methods will not be accessible.

Returns:this will return a new datastream object with blank metadata
Return type:Datastream (Metadata, pandas.DataFrame)

Examples

>>> CC = CerebralCortex("/directory/path/of/configs/")
>>> ds = CC.get_stream("STREAM-NAME")
>>> new_ds = ds.to_pandas()
>>> new_ds.data.head()
where(*args, **kwargs)[source]

calls deafult dataframe where

Parameters:
  • *args
  • **kwargs
window(windowDuration: int = 60, groupByColumnName: List[str] = [], columnName: List[str] = [], slideDuration: int = None, startTime=None, preserve_ts=False)[source]

Window data into fixed length chunks. If no columnName is provided then the windowing will be performed on all the columns.

Parameters:
  • windowDuration (int) – duration of a window in seconds
  • List[str] (columnName) – groupby column names, for example, groupby user, col1, col2
  • List[str] – column names on which windowing should be performed. Windowing will be performed on all columns if none is provided
  • slideDuration (int) – slide duration of a window
  • startTime (datetime) – The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15… provide startTime as 15 minutes. First time of data will be used as startTime if none is provided
  • preserve_ts (bool) – setting this to True will return timestamps of corresponding to each windowed value
Returns:

this will return a new datastream object with blank metadata

Return type:

DataStream

Note

This windowing method will use collect_list to return values for each window. collect_list is not optimized.

get_window(x)[source]
windowing_udf(x)

Module contents

class DataStream(data: object = None, metadata: cerebralcortex.core.metadata_manager.stream.metadata.Metadata = None)[source]

Bases: object

collect()[source]

Collect all the data to master node and return list of rows

Returns:rows of all the dataframe
Return type:List
compute(udfName, timeInterval=None)[source]
compute_average(windowDuration: int = None, colmnName: str = None) → object[source]

Window data and compute average of a windowed data of a single or all columns

Parameters:
  • windowDuration (int) – duration of a window in seconds. If it is not set then stats will be computed for the whole data in a column(s)
  • colmnName (str) – average will be computed for all the columns if columnName param is not provided (for all windows)
Returns:

this will return a new datastream object with blank metadata

Return type:

DataStream

compute_max(windowDuration: int = None, colmnName: str = None) → object[source]

Window data and compute max of a windowed data of a single or all columns

Parameters:
  • windowDuration (int) – duration of a window in seconds. If it is not set then stats will be computed for the whole data in a column(s)
  • colmnName (str) – max will be computed for all the columns if columnName param is not provided (for all windows)
Returns:

this will return a new datastream object with blank metadata

Return type:

DataStream

compute_min(windowDuration: int = None, colmnName: str = None) → object[source]

Window data and compute min of a windowed data of a single or all columns

Parameters:
  • windowDuration (int) – duration of a window in seconds. If it is not set then stats will be computed for the whole data in a column(s)
  • colmnName (str) – min value will be computed for all the columns if columnName param is not provided (for all windows)
Returns:

this will return a new datastream object with blank metadata

Return type:

DataStream

compute_sqrt(windowDuration: int = None, colmnName: str = None) → object[source]

Window data and compute square root of a windowed data of a single or all columns

Parameters:
  • windowDuration (int) – duration of a window in seconds. If it is not set then stats will be computed for the whole data in a column(s)
  • colmnName (str) – square root will be computed for all the columns if columnName param is not provided (for all windows)
Returns:

this will return a new datastream object with blank metadata

Return type:

DataStream

compute_stddev(windowDuration: int = None, colmnName: str = None) → object[source]

Window data and compute standard deviation of a windowed data of a single or all columns

Parameters:
  • windowDuration (int) – duration of a window in seconds. If it is not set then stats will be computed for the whole data in a column(s)
  • colmnName (str) – standard deviation will be computed for all the columns if columnName param is not provided (for all windows)
Returns:

this will return a new datastream object with blank metadata

Return type:

DataStream

compute_sum(windowDuration: int = None, colmnName: str = None) → object[source]

Window data and compute sum of a windowed data of a single or all columns

Parameters:
  • windowDuration (int) – duration of a window in seconds. If it is not set then stats will be computed for the whole data in a column(s)
  • colmnName (str) – average will be computed for all the columns if columnName param is not provided (for all windows)
Returns:

this will return a new datastream object with blank metadata

Return type:

DataStream

compute_variance(windowDuration: int = None, colmnName: str = None) → object[source]

Window data and compute variance of a windowed data of a single or all columns

Parameters:
  • windowDuration (int) – duration of a window in seconds. If it is not set then stats will be computed for the whole data in a column(s)
  • colmnName (str) – variance will be computed for all the columns if columnName param is not provided (for all windows)
Returns:

this will return a new datastream object with blank metadata

Return type:

DataStream

create_windows(window_length='hour')[source]

filter data

Parameters:
  • columnName (str) – name of the column
  • operator (str) – basic operators (e.g., >, <, ==, !=)
  • value (Any) – if the columnName is timestamp, please provide python datatime object
Returns:

this will return a new datastream object with blank metadata

Return type:

DataStream

data

get stream data

Returns (DataFrame):

drop_column(*args, **kwargs)[source]

calls deafult dataframe drop

Parameters:
  • *args
  • **kwargs
filter(columnName, operator, value)[source]

filter data

Parameters:
  • columnName (str) – name of the column
  • operator (str) – basic operators (e.g., >, <, ==, !=)
  • value (Any) – if the columnName is timestamp, please provide python datatime object
Returns:

this will return a new datastream object with blank metadata

Return type:

DataStream

filter_user(user_ids: List)[source]

filter data to get only selective users’ data

Parameters:user_ids (List[str]) – list of users’ UUIDs
Returns:this will return a new datastream object with blank metadata
Return type:DataStream
filter_version(version: List)[source]

filter data to get only selective users’ data

Parameters:version (List[str]) – list of stream versions
Returns:this will return a new datastream object with blank metadata
Return type:DataStream

Todo

Metadata version should be return with the data

get_metadata(version: int = None) → cerebralcortex.core.metadata_manager.stream.metadata.Metadata[source]

get stream metadata

Parameters:version (int) – version of a stream
Returns:single version of a stream
Return type:Metadata
Raises:Exception – if specified version is not available for the stream
groupby(*columnName)[source]

Group data by column name :param columnName: name of the column to group by with :type columnName: str

Returns:

join(dataStream, propagation='forward')[source]

filter data

Parameters:
  • columnName (str) – name of the column
  • operator (str) – basic operators (e.g., >, <, ==, !=)
  • value (Any) – if the columnName is timestamp, please provide python datatime object
Returns:

this will return a new datastream object with blank metadata

Return type:

DataStream

limit(*args, **kwargs)[source]

calls deafult dataframe limit

Parameters:
  • *args
  • **kwargs
map_stream(window_ds)[source]

Map/join a stream to a windowed stream

Parameters:window_ds (Datastream) – windowed datastream object
Returns:joined/mapped stream
Return type:Datastream
metadata

return stream metadata

Returns:
Return type:Metadata
plot(y_axis_column=None)[source]
plot_gps_cords(zoom=5)[source]
plot_hist(x_axis_column=None)[source]
plot_stress_bar(x_axis_column='stresser_main')[source]
plot_stress_comparison(x_axis_column='stresser_main', usr_id=None, compare_with='all')[source]
plot_stress_gantt()[source]
plot_stress_pie(x_axis_column='stresser_main')[source]
plot_stress_sankey(cat_cols=['stresser_main', 'stresser_sub'], value_cols='density', title="Stressers' Sankey Diagram")[source]
run_algorithm(udfName, columnNames: List[str] = [], windowDuration: int = 60, slideDuration: int = None, groupByColumnName: List[str] = [], startTime=None, preserve_ts=False)[source]

Run an algorithm

Parameters:
  • udfName – Name of the algorithm
  • List[str] (groupByColumnName) – column names on which windowing should be performed. Windowing will be performed on all columns if none is provided
  • windowDuration (int) – duration of a window in seconds
  • slideDuration (int) – slide duration of a window
  • List[str] – groupby column names, for example, groupby user, col1, col2
  • startTime (datetime) – The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15… provide startTime as 15 minutes. First time of data will be used as startTime if none is provided
  • preserve_ts (bool) – setting this to True will return timestamps of corresponding to each windowed value
Returns:

this will return a new datastream object with blank metadata

Return type:

DataStream

schema()[source]

Get data schema (e.g., column names and number of columns etc.)

Returns:pyspark dataframe schema object
show(*args, **kwargs)[source]
sort(columnNames: list = [], ascending=True)[source]

Sort data column in ASC or DESC order

Returns:DataStream object
Return type:object
summary()[source]

print the summary of the data

to_pandas()[source]

This method converts pyspark dataframe into pandas dataframe.

Notes

This method will collect all the data on master node to convert pyspark dataframe into pandas dataframe. After converting to pandas dataframe datastream objects helper methods will not be accessible.

Returns:this will return a new datastream object with blank metadata
Return type:Datastream (Metadata, pandas.DataFrame)

Examples

>>> CC = CerebralCortex("/directory/path/of/configs/")
>>> ds = CC.get_stream("STREAM-NAME")
>>> new_ds = ds.to_pandas()
>>> new_ds.data.head()
where(*args, **kwargs)[source]

calls deafult dataframe where

Parameters:
  • *args
  • **kwargs
window(windowDuration: int = 60, groupByColumnName: List[str] = [], columnName: List[str] = [], slideDuration: int = None, startTime=None, preserve_ts=False)[source]

Window data into fixed length chunks. If no columnName is provided then the windowing will be performed on all the columns.

Parameters:
  • windowDuration (int) – duration of a window in seconds
  • List[str] (columnName) – groupby column names, for example, groupby user, col1, col2
  • List[str] – column names on which windowing should be performed. Windowing will be performed on all columns if none is provided
  • slideDuration (int) – slide duration of a window
  • startTime (datetime) – The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15… provide startTime as 15 minutes. First time of data will be used as startTime if none is provided
  • preserve_ts (bool) – setting this to True will return timestamps of corresponding to each windowed value
Returns:

this will return a new datastream object with blank metadata

Return type:

DataStream

Note

This windowing method will use collect_list to return values for each window. collect_list is not optimized.