cerebralcortex.core.datatypes package¶
Submodules¶
cerebralcortex.core.datatypes.datastream module¶
-
class
DataStream
(data: object = None, metadata: cerebralcortex.core.metadata_manager.stream.metadata.Metadata = None)[source]¶ Bases:
object
-
collect
()[source]¶ Collect all the data to master node and return list of rows
Returns: rows of all the dataframe Return type: List
-
compute_average
(windowDuration: int = None, colmnName: str = None) → object[source]¶ Window data and compute average of a windowed data of a single or all columns
Parameters: - windowDuration (int) – duration of a window in seconds. If it is not set then stats will be computed for the whole data in a column(s)
- colmnName (str) – average will be computed for all the columns if columnName param is not provided (for all windows)
Returns: this will return a new datastream object with blank metadata
Return type:
-
compute_max
(windowDuration: int = None, colmnName: str = None) → object[source]¶ Window data and compute max of a windowed data of a single or all columns
Parameters: - windowDuration (int) – duration of a window in seconds. If it is not set then stats will be computed for the whole data in a column(s)
- colmnName (str) – max will be computed for all the columns if columnName param is not provided (for all windows)
Returns: this will return a new datastream object with blank metadata
Return type:
-
compute_min
(windowDuration: int = None, colmnName: str = None) → object[source]¶ Window data and compute min of a windowed data of a single or all columns
Parameters: - windowDuration (int) – duration of a window in seconds. If it is not set then stats will be computed for the whole data in a column(s)
- colmnName (str) – min value will be computed for all the columns if columnName param is not provided (for all windows)
Returns: this will return a new datastream object with blank metadata
Return type:
-
compute_sqrt
(windowDuration: int = None, colmnName: str = None) → object[source]¶ Window data and compute square root of a windowed data of a single or all columns
Parameters: - windowDuration (int) – duration of a window in seconds. If it is not set then stats will be computed for the whole data in a column(s)
- colmnName (str) – square root will be computed for all the columns if columnName param is not provided (for all windows)
Returns: this will return a new datastream object with blank metadata
Return type:
-
compute_stddev
(windowDuration: int = None, colmnName: str = None) → object[source]¶ Window data and compute standard deviation of a windowed data of a single or all columns
Parameters: - windowDuration (int) – duration of a window in seconds. If it is not set then stats will be computed for the whole data in a column(s)
- colmnName (str) – standard deviation will be computed for all the columns if columnName param is not provided (for all windows)
Returns: this will return a new datastream object with blank metadata
Return type:
-
compute_sum
(windowDuration: int = None, colmnName: str = None) → object[source]¶ Window data and compute sum of a windowed data of a single or all columns
Parameters: - windowDuration (int) – duration of a window in seconds. If it is not set then stats will be computed for the whole data in a column(s)
- colmnName (str) – average will be computed for all the columns if columnName param is not provided (for all windows)
Returns: this will return a new datastream object with blank metadata
Return type:
-
compute_variance
(windowDuration: int = None, colmnName: str = None) → object[source]¶ Window data and compute variance of a windowed data of a single or all columns
Parameters: - windowDuration (int) – duration of a window in seconds. If it is not set then stats will be computed for the whole data in a column(s)
- colmnName (str) – variance will be computed for all the columns if columnName param is not provided (for all windows)
Returns: this will return a new datastream object with blank metadata
Return type:
-
create_windows
(window_length='hour')[source]¶ filter data
Parameters: - columnName (str) – name of the column
- operator (str) – basic operators (e.g., >, <, ==, !=)
- value (Any) – if the columnName is timestamp, please provide python datatime object
Returns: this will return a new datastream object with blank metadata
Return type:
-
data
¶ get stream data
Returns (DataFrame):
-
filter
(columnName, operator, value)[source]¶ filter data
Parameters: - columnName (str) – name of the column
- operator (str) – basic operators (e.g., >, <, ==, !=)
- value (Any) – if the columnName is timestamp, please provide python datatime object
Returns: this will return a new datastream object with blank metadata
Return type:
-
filter_user
(user_ids: List)[source]¶ filter data to get only selective users’ data
Parameters: user_ids (List[str]) – list of users’ UUIDs Returns: this will return a new datastream object with blank metadata Return type: DataStream
-
filter_version
(version: List)[source]¶ filter data to get only selective users’ data
Parameters: version (List[str]) – list of stream versions Returns: this will return a new datastream object with blank metadata Return type: DataStream Todo
Metadata version should be return with the data
-
get_metadata
(version: int = None) → cerebralcortex.core.metadata_manager.stream.metadata.Metadata[source]¶ get stream metadata
Parameters: version (int) – version of a stream Returns: single version of a stream Return type: Metadata Raises: Exception
– if specified version is not available for the stream
-
groupby
(*columnName)[source]¶ Group data by column name :param columnName: name of the column to group by with :type columnName: str
Returns:
-
join
(dataStream, propagation='forward')[source]¶ filter data
Parameters: - columnName (str) – name of the column
- operator (str) – basic operators (e.g., >, <, ==, !=)
- value (Any) – if the columnName is timestamp, please provide python datatime object
Returns: this will return a new datastream object with blank metadata
Return type:
-
map_stream
(window_ds)[source]¶ Map/join a stream to a windowed stream
Parameters: window_ds (Datastream) – windowed datastream object Returns: joined/mapped stream Return type: Datastream
-
plot_stress_sankey
(cat_cols=['stresser_main', 'stresser_sub'], value_cols='density', title="Stressers' Sankey Diagram")[source]¶
-
run_algorithm
(udfName, columnNames: List[str] = [], windowDuration: int = 60, slideDuration: int = None, groupByColumnName: List[str] = [], startTime=None, preserve_ts=False)[source]¶ Run an algorithm
Parameters: - udfName – Name of the algorithm
- List[str] (groupByColumnName) – column names on which windowing should be performed. Windowing will be performed on all columns if none is provided
- windowDuration (int) – duration of a window in seconds
- slideDuration (int) – slide duration of a window
- List[str] – groupby column names, for example, groupby user, col1, col2
- startTime (datetime) – The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15… provide startTime as 15 minutes. First time of data will be used as startTime if none is provided
- preserve_ts (bool) – setting this to True will return timestamps of corresponding to each windowed value
Returns: this will return a new datastream object with blank metadata
Return type:
-
schema
()[source]¶ Get data schema (e.g., column names and number of columns etc.)
Returns: pyspark dataframe schema object
-
sort
(columnNames: list = [], ascending=True)[source]¶ Sort data column in ASC or DESC order
Returns: DataStream object Return type: object
-
to_pandas
()[source]¶ This method converts pyspark dataframe into pandas dataframe.
Notes
This method will collect all the data on master node to convert pyspark dataframe into pandas dataframe. After converting to pandas dataframe datastream objects helper methods will not be accessible.
Returns: this will return a new datastream object with blank metadata Return type: Datastream (Metadata, pandas.DataFrame) Examples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> ds = CC.get_stream("STREAM-NAME") >>> new_ds = ds.to_pandas() >>> new_ds.data.head()
-
window
(windowDuration: int = 60, groupByColumnName: List[str] = [], columnName: List[str] = [], slideDuration: int = None, startTime=None, preserve_ts=False)[source]¶ Window data into fixed length chunks. If no columnName is provided then the windowing will be performed on all the columns.
Parameters: - windowDuration (int) – duration of a window in seconds
- List[str] (columnName) – groupby column names, for example, groupby user, col1, col2
- List[str] – column names on which windowing should be performed. Windowing will be performed on all columns if none is provided
- slideDuration (int) – slide duration of a window
- startTime (datetime) – The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15… provide startTime as 15 minutes. First time of data will be used as startTime if none is provided
- preserve_ts (bool) – setting this to True will return timestamps of corresponding to each windowed value
Returns: this will return a new datastream object with blank metadata
Return type: Note
This windowing method will use collect_list to return values for each window. collect_list is not optimized.
-
-
windowing_udf
(x)¶
Module contents¶
-
class
DataStream
(data: object = None, metadata: cerebralcortex.core.metadata_manager.stream.metadata.Metadata = None)[source]¶ Bases:
object
-
collect
()[source]¶ Collect all the data to master node and return list of rows
Returns: rows of all the dataframe Return type: List
-
compute_average
(windowDuration: int = None, colmnName: str = None) → object[source]¶ Window data and compute average of a windowed data of a single or all columns
Parameters: - windowDuration (int) – duration of a window in seconds. If it is not set then stats will be computed for the whole data in a column(s)
- colmnName (str) – average will be computed for all the columns if columnName param is not provided (for all windows)
Returns: this will return a new datastream object with blank metadata
Return type:
-
compute_max
(windowDuration: int = None, colmnName: str = None) → object[source]¶ Window data and compute max of a windowed data of a single or all columns
Parameters: - windowDuration (int) – duration of a window in seconds. If it is not set then stats will be computed for the whole data in a column(s)
- colmnName (str) – max will be computed for all the columns if columnName param is not provided (for all windows)
Returns: this will return a new datastream object with blank metadata
Return type:
-
compute_min
(windowDuration: int = None, colmnName: str = None) → object[source]¶ Window data and compute min of a windowed data of a single or all columns
Parameters: - windowDuration (int) – duration of a window in seconds. If it is not set then stats will be computed for the whole data in a column(s)
- colmnName (str) – min value will be computed for all the columns if columnName param is not provided (for all windows)
Returns: this will return a new datastream object with blank metadata
Return type:
-
compute_sqrt
(windowDuration: int = None, colmnName: str = None) → object[source]¶ Window data and compute square root of a windowed data of a single or all columns
Parameters: - windowDuration (int) – duration of a window in seconds. If it is not set then stats will be computed for the whole data in a column(s)
- colmnName (str) – square root will be computed for all the columns if columnName param is not provided (for all windows)
Returns: this will return a new datastream object with blank metadata
Return type:
-
compute_stddev
(windowDuration: int = None, colmnName: str = None) → object[source]¶ Window data and compute standard deviation of a windowed data of a single or all columns
Parameters: - windowDuration (int) – duration of a window in seconds. If it is not set then stats will be computed for the whole data in a column(s)
- colmnName (str) – standard deviation will be computed for all the columns if columnName param is not provided (for all windows)
Returns: this will return a new datastream object with blank metadata
Return type:
-
compute_sum
(windowDuration: int = None, colmnName: str = None) → object[source]¶ Window data and compute sum of a windowed data of a single or all columns
Parameters: - windowDuration (int) – duration of a window in seconds. If it is not set then stats will be computed for the whole data in a column(s)
- colmnName (str) – average will be computed for all the columns if columnName param is not provided (for all windows)
Returns: this will return a new datastream object with blank metadata
Return type:
-
compute_variance
(windowDuration: int = None, colmnName: str = None) → object[source]¶ Window data and compute variance of a windowed data of a single or all columns
Parameters: - windowDuration (int) – duration of a window in seconds. If it is not set then stats will be computed for the whole data in a column(s)
- colmnName (str) – variance will be computed for all the columns if columnName param is not provided (for all windows)
Returns: this will return a new datastream object with blank metadata
Return type:
-
create_windows
(window_length='hour')[source]¶ filter data
Parameters: - columnName (str) – name of the column
- operator (str) – basic operators (e.g., >, <, ==, !=)
- value (Any) – if the columnName is timestamp, please provide python datatime object
Returns: this will return a new datastream object with blank metadata
Return type:
-
data
¶ get stream data
Returns (DataFrame):
-
filter
(columnName, operator, value)[source]¶ filter data
Parameters: - columnName (str) – name of the column
- operator (str) – basic operators (e.g., >, <, ==, !=)
- value (Any) – if the columnName is timestamp, please provide python datatime object
Returns: this will return a new datastream object with blank metadata
Return type:
-
filter_user
(user_ids: List)[source]¶ filter data to get only selective users’ data
Parameters: user_ids (List[str]) – list of users’ UUIDs Returns: this will return a new datastream object with blank metadata Return type: DataStream
-
filter_version
(version: List)[source]¶ filter data to get only selective users’ data
Parameters: version (List[str]) – list of stream versions Returns: this will return a new datastream object with blank metadata Return type: DataStream Todo
Metadata version should be return with the data
-
get_metadata
(version: int = None) → cerebralcortex.core.metadata_manager.stream.metadata.Metadata[source]¶ get stream metadata
Parameters: version (int) – version of a stream Returns: single version of a stream Return type: Metadata Raises: Exception
– if specified version is not available for the stream
-
groupby
(*columnName)[source]¶ Group data by column name :param columnName: name of the column to group by with :type columnName: str
Returns:
-
join
(dataStream, propagation='forward')[source]¶ filter data
Parameters: - columnName (str) – name of the column
- operator (str) – basic operators (e.g., >, <, ==, !=)
- value (Any) – if the columnName is timestamp, please provide python datatime object
Returns: this will return a new datastream object with blank metadata
Return type:
-
map_stream
(window_ds)[source]¶ Map/join a stream to a windowed stream
Parameters: window_ds (Datastream) – windowed datastream object Returns: joined/mapped stream Return type: Datastream
-
plot_stress_sankey
(cat_cols=['stresser_main', 'stresser_sub'], value_cols='density', title="Stressers' Sankey Diagram")[source]¶
-
run_algorithm
(udfName, columnNames: List[str] = [], windowDuration: int = 60, slideDuration: int = None, groupByColumnName: List[str] = [], startTime=None, preserve_ts=False)[source]¶ Run an algorithm
Parameters: - udfName – Name of the algorithm
- List[str] (groupByColumnName) – column names on which windowing should be performed. Windowing will be performed on all columns if none is provided
- windowDuration (int) – duration of a window in seconds
- slideDuration (int) – slide duration of a window
- List[str] – groupby column names, for example, groupby user, col1, col2
- startTime (datetime) – The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15… provide startTime as 15 minutes. First time of data will be used as startTime if none is provided
- preserve_ts (bool) – setting this to True will return timestamps of corresponding to each windowed value
Returns: this will return a new datastream object with blank metadata
Return type:
-
schema
()[source]¶ Get data schema (e.g., column names and number of columns etc.)
Returns: pyspark dataframe schema object
-
sort
(columnNames: list = [], ascending=True)[source]¶ Sort data column in ASC or DESC order
Returns: DataStream object Return type: object
-
to_pandas
()[source]¶ This method converts pyspark dataframe into pandas dataframe.
Notes
This method will collect all the data on master node to convert pyspark dataframe into pandas dataframe. After converting to pandas dataframe datastream objects helper methods will not be accessible.
Returns: this will return a new datastream object with blank metadata Return type: Datastream (Metadata, pandas.DataFrame) Examples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> ds = CC.get_stream("STREAM-NAME") >>> new_ds = ds.to_pandas() >>> new_ds.data.head()
-
window
(windowDuration: int = 60, groupByColumnName: List[str] = [], columnName: List[str] = [], slideDuration: int = None, startTime=None, preserve_ts=False)[source]¶ Window data into fixed length chunks. If no columnName is provided then the windowing will be performed on all the columns.
Parameters: - windowDuration (int) – duration of a window in seconds
- List[str] (columnName) – groupby column names, for example, groupby user, col1, col2
- List[str] – column names on which windowing should be performed. Windowing will be performed on all columns if none is provided
- slideDuration (int) – slide duration of a window
- startTime (datetime) – The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15… provide startTime as 15 minutes. First time of data will be used as startTime if none is provided
- preserve_ts (bool) – setting this to True will return timestamps of corresponding to each windowed value
Returns: this will return a new datastream object with blank metadata
Return type: Note
This windowing method will use collect_list to return values for each window. collect_list is not optimized.
-