cerebralcortex.core.datatypes package

Submodules

cerebralcortex.core.datatypes.datastream module

class DataStream(data: object = None, metadata: cerebralcortex.core.metadata_manager.stream.metadata.Metadata = None)[source]

Bases: pyspark.sql.dataframe.DataFrame

agg(*exprs)[source]

Aggregate on the entire DataStream without groups

Parameters:*exprs
Returns:this will return a new datastream object with blank metadata
Return type:DataStream

Examples

>>> ds.agg({"age": "max"}).collect()
>>> # Below example shows how to use pyspark functions in add method
>>> from pyspark.sql import functions as F
>>> ds.agg(F.min(ds.age)).collect()
alias(alias)[source]

Returns a new DataStream with an alias set.

Parameters:alias – string, an alias name to be set for the datastream.
Returns:DataStream object
Return type:object

Examples

>>> df_as1 = df.alias("df_as1")
>>> df_as2 = df.alias("df_as2")
approxQuantile(col, probabilities, relativeError)[source]

Calculates the approximate quantiles of numerical columns of a DataStream.

The result of this algorithm has the following deterministic bound: If the DataStream has N elements and if we request the quantile at probability p up to error err, then the algorithm will return a sample x from the DataStream so that the exact rank of x is close to (p * N). More precisely,

floor((p - err) * N) <= rank(x) <= ceil((p + err) * N).

This method implements a variation of the Greenwald-Khanna algorithm (with some speed optimizations). The algorithm was first present in [[http://dx.doi.org/10.1145/375663.375670 Space-efficient Online Computation of Quantile Summaries]] by Greenwald and Khanna.

Note that null values will be ignored in numerical columns before calculation. For columns only containing null values, an empty list is returned.

Parameters:
  • col (str[list]) – Can be a single column name, or a list of names for multiple columns.
  • probabilities – a list of quantile probabilities Each number must belong to [0, 1]. For example 0 is the minimum, 0.5 is the median, 1 is the maximum.
  • relativeError – The relative target precision to achieve (>= 0). If set to zero, the exact quantiles are computed, which could be very expensive. Note that values greater than 1 are accepted but give the same result as 1.
Returns:

the approximate quantiles at the given probabilities. If the input col is a string, the output is a list of floats. If the input col is a list or tuple of strings, the output is also a list, but each element in it is a list of floats, i.e., the output is a list of list of floats.

colRegex(colName)[source]

Selects column based on the column name specified as a regex and returns it as Column.

Parameters:colName (str) – column name specified as a regex.
Returns:
Return type:DataStream

Examples

>>> ds.colRegex("colName")
collect()[source]

Collect all the data to master node and return list of rows

Returns:rows of all the dataframe
Return type:List

Examples

>>> ds.collect()
compute(udfName, windowDuration: int = None, slideDuration: int = None, groupByColumnName: List[str] = [], startTime=None)[source]

Run an algorithm. This method supports running an udf method on windowed data

Parameters:
  • udfName – Name of the algorithm
  • windowDuration (int) – duration of a window in seconds
  • slideDuration (int) – slide duration of a window
  • List[str] (groupByColumnName) – groupby column names, for example, groupby user, col1, col2
  • startTime (datetime) – The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15… provide startTime as 15 minutes. First time of data will be used as startTime if none is provided
Returns:

this will return a new datastream object with blank metadata

Return type:

DataStream

corr(col1, col2, method=None)[source]

Calculates the correlation of two columns of a DataStream as a double value. Currently only supports the Pearson Correlation Coefficient.

Parameters:
  • col1 (str) – The name of the first column
  • col2 (str) – The name of the second column
  • method (str) – The correlation method. Currently only supports “pearson”
Returns:

this will return a new datastream object with blank metadata

Return type:

DataStream

Examples

>>> ds.corr("cal1", "col2", "pearson").collect()
count()[source]

Returns the number of rows in this DataStream.

Examples

>>> ds.count()
cov(col1, col2)[source]

Calculate the sample covariance for the given columns, specified by their names, as a double value.

Parameters:
  • col1 (str) – The name of the first column
  • col2 (str) – The name of the second column
Returns:

this will return a new datastream object with blank metadata

Return type:

DataStream

Examples

>>> ds.cov("cal1", "col2", "pearson").collect()
create_windows(window_length='hour')[source]

filter data

Parameters:
  • columnName (str) – name of the column
  • operator (str) – basic operators (e.g., >, <, ==, !=)
  • value (Any) – if the columnName is timestamp, please provide python datatime object
Returns:

this will return a new datastream object with blank metadata

Return type:

DataStream

crossJoin(other)[source]

Returns the cartesian product with another DataStream

Parameters:other – Right side of the cartesian product.
Returns:DataStream object with joined streams

Examples

>>> ds.crossJoin(ds2.select("col_name")).collect()
crosstab(col1, col2)[source]

Computes a pair-wise frequency table of the given columns. Also known as a contingency table. The number of distinct values for each column should be less than 1e4. At most 1e6 non-zero pair frequencies will be returned. The first column of each row will be the distinct values of col1 and the column names will be the distinct values of col2. The name of the first column will be $col1_$col2. Pairs that have no occurrences will have zero as their counts.

Parameters:
  • col1 (str) – The name of the first column. Distinct items will make the first item of each row.
  • col2 (str) – The name of the second column. Distinct items will make the column names of the DataStream.
Returns:

DataStream object

Examples

>>> ds.crosstab("col_1", "col_2")
data

get stream data

Returns (DataFrame):

describe(*cols)[source]

Computes basic statistics for numeric and string columns. This include count, mean, stddev, min, and max. If no columns are given, this function computes statistics for all numerical or string columns.

Parameters:*cols

Examples

>>> ds.describe(['col_name']).show()
>>> ds.describe().show()
distinct()[source]

Returns a new DataStream containing the distinct rows in this DataStream.

Returns:this will return a new datastream object with blank metadata
Return type:DataStream

Examples

>>> ds.distinct().count()
drop(*cols)[source]

Returns a new Datastream that drops the specified column. This is a no-op if schema doesn’t contain the given column name(s).

Parameters:*cols – a string name of the column to drop, or a Column to drop, or a list of string name of the columns to drop.
Returns:
Return type:Datastream

Examples

>>> ds.drop('col_name')
dropDuplicates(subset=None)[source]

Return a new DataStream with duplicate rows removed, optionally only considering certain columns.

Parameters:subset – optional list of column names to consider.
Returns:
Return type:Datastream

Examples

>>> ds.dropDuplicates().show()
>>> # Example on how to use it with params
>>> ds.dropDuplicates(['col_name1', 'col_name2']).show()
dropna(how='any', thresh=None, subset=None)[source]

Returns a new DataStream omitting rows with null values.

Parameters:
  • how – ‘any’ or ‘all’. If ‘any’, drop a row if it contains any nulls. If ‘all’, drop a row only if all its values are null.
  • thresh – int, default None If specified, drop rows that have less than thresh non-null values. This overwrites the how parameter.
  • subset – optional list of column names to consider.
Returns:

Return type:

Datastream

Examples

>>> ds.dropna()
exceptAll(other)[source]

Return a new DataStream containing rows in this DataStream but not in another DataStream while preserving duplicates.

Parameters:other – other DataStream object
Returns:
Return type:Datastream

Examples

>>> ds1.exceptAll(ds2).show()
explain(extended=False)[source]

Prints the (logical and physical) plans to the console for debugging purpose.

Parameters:extended – boolean, default False. If False, prints only the physical plan.

Examples

>>> ds.explain()
fillna(value, subset=None)[source]

Replace null values

Parameters:
  • value – int, long, float, string, bool or dict. Value to replace null values with. If the value is a dict, then subset is ignored and value must be a mapping from column name (string) to replacement value. The replacement value must be an int, long, float, boolean, or string.
  • subset – optional list of column names to consider. Columns specified in subset that do not have matching data type are ignored. For example, if value is a string, and subset contains a non-string column, then the non-string column is simply ignored.
Returns:

Return type:

Datastream

Examples

>>> ds.fill(50).show()
>>> ds.fill({'col1': 50, 'col2': 'unknown'}).show()
filter(condition)[source]

Filters rows using the given condition

Parameters:condition – a Column of types.BooleanType or a string of SQL expression.
Returns:this will return a new datastream object with blank metadata
Return type:DataStream

Examples

>>> ds.filter("age > 3")
>>> df.filter(df.age > 3)
filter_user(user_ids: List)[source]

filter data to get only selective users’ data

Parameters:user_ids (List[str]) – list of users’ UUIDs
Returns:this will return a new datastream object with blank metadata
Return type:DataStream
filter_version(version: List)[source]

filter data to get only selective users’ data

Parameters:version (List[str]) – list of stream versions
Returns:this will return a new datastream object with blank metadata
Return type:DataStream

Todo

Metadata version should be return with the data

first()[source]

Returns the first row as a Row.

Returns:First row of a DataStream

Examples

>>> ds.first()
foreach(f)[source]

Applies the f function to all Row of DataStream. This is a shorthand for df.rdd.foreach()

Parameters:f – function
Returns:DataStream object

Examples

>>> def f(person):
...     print(person.name)
>>> ds.foreach(f)
freqItems(cols, support=None)[source]

Finding frequent items for columns, possibly with false positives. Using the frequent element count algorithm described in “http://dx.doi.org/10.1145/762471.762473, proposed by Karp, Schenker, and Papadimitriou”.

Returns:
Return type:DataStream

Examples

>>> ds.freqItems("col-name")
get_metadata() → cerebralcortex.core.metadata_manager.stream.metadata.Metadata[source]

get stream metadata

Returns:single version of a stream
Return type:Metadata
Raises:Exception – if specified version is not available for the stream
groupby(*cols)[source]

Groups the DataFrame using the specified columns, so we can run aggregation on them. This method will return pyspark.sql.GroupedData object.

Parameters:of columns to group by. Each element should be a column name (list) –

Returns:

head(n=None)[source]

Returns the first n rows.

Parameters:n (int) – default 1. Number of rows to return.
Returns:If n is greater than 1, return a list of Row. If n is 1, return a single Row.

Notes

This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory.

Examples

>>> ds.head(5)
intersect(other)[source]

Return a new DataFrame containing rows only in both this frame and another frame. This is equivalent to INTERSECT in SQL.

Parameters:other (int) – DataStream object
Returns:If n is greater than 1, return a list of Row. If n is 1, return a single Row.

Examples

>>> ds.intersect(other=ds2)
intersectAll(other)[source]

Return a new DataFrame containing rows in both this dataframe and other dataframe while preserving duplicates.

Parameters:other (int) – DataStream object
Returns:If n is greater than 1, return a list of Row. If n is 1, return a single Row.

Examples

>>> ds.intersectAll(ds2).show()
join(other, on=None, how=None)[source]

Joins with another DataStream, using the given join expression.

Parameters:
  • other (DataStream) – Right side of the join
  • – a string for the join column name, a list of column names, a join expression (on) –
  • how (str) – inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, and left_anti.

Examples

>>> ds.join(ds2, 'user', 'outer').show()
Returns:DataStream object with joined streams
join_stress_streams(dataStream, propagation='forward')[source]

filter data

Parameters:
  • columnName (str) – name of the column
  • operator (str) – basic operators (e.g., >, <, ==, !=)
  • value (Any) – if the columnName is timestamp, please provide python datatime object
Returns:

this will return a new datastream object with blank metadata

Return type:

DataStream

limit(num)[source]

Limits the result count to the number specified.

Parameters:num
Returns:
Return type:Datastream
map_stream(window_ds)[source]

Map/join a stream to a windowed stream

Parameters:window_ds (Datastream) – windowed datastream object
Returns:joined/mapped stream
Return type:Datastream
metadata

return stream metadata

Returns:
Return type:Metadata
orderBy(*cols)[source]

order by column name

Parameters:*cols
Returns:
Return type:Datastream
printSchema()[source]

Prints out the schema in the tree format.

Examples

>>> ds.printSchema()
repartition(numPartitions, *cols)[source]

Returns a new DataStream partitioned by the given partitioning expressions. The resulting DataStream is hash partitioned.

numPartitions can be an int to specify the target number of partitions or a Column. If it is a Column, it will be used as the first partitioning column. If not specified, the default number of partitions is used.

Parameters:
  • numPartitions
  • *cols

Returns:

replace(to_replace, value, subset=None)[source]

Returns a new DataStream replacing a value with another value. Values to_replace and value must have the same type and can only be numerics, booleans, or strings. Value can have None. When replacing, the new value will be cast to the type of the existing column. For numeric replacements all values to be replaced should have unique floating point representation. In case of conflicts (for example with {42: -1, 42.0: 1}) and arbitrary replacement will be used.

Parameters:
  • to_replace – bool, int, long, float, string, list or dict. Value to be replaced. If the value is a dict, then value is ignored or can be omitted, and to_replace must be a mapping between a value and a replacement.
  • value – bool, int, long, float, string, list or None. The replacement value must be a bool, int, long, float, string or None. If value is a list, value should be of the same length and type as to_replace. If value is a scalar and to_replace is a sequence, then value is used as a replacement for each item in to_replace.
  • subset – optional list of column names to consider. Columns specified in subset that do not have matching data type are ignored. For example, if value is a string, and subset contains a non-string column, then the non-string column is simply ignored.
Returns:

Return type:

Datastream

Examples

>>> ds.replace(10, 20).show()
>>> ds.replace('some-str', None).show()
>>> ds.replace(['old_val1', 'new_val1'], ['old_val2', 'new_val2'], 'col_name').show()
select(*cols)[source]

Projects a set of expressions and returns a new DataStream :param cols: list of column names (string) or expressions (Column). If one of the column names is ‘*’, that column is expanded to include all columns in the current DataStream :type cols: str

Returns:this will return a new datastream object with selected columns
Return type:DataStream

Examples

>>> ds.select('*')
>>> ds.select('name', 'age')
>>> ds.select(ds.name, (ds.age + 10).alias('age'))
selectExpr(*expr)[source]

This is a variant of select() that accepts SQL expressions. Projects a set of expressions and returns a new DataStream

Parameters:expr (str) –
Returns:this will return a new datastream object with selected columns
Return type:DataStream

Examples

>>> ds.selectExpr("age * 2")
show(n=20, truncate=True, vertical=False)[source]
Parameters:
  • n – Number of rows to show.
  • truncate – If set to True, truncate strings longer than 20 chars by default.
  • set to a number greater than one, truncates long strings to length truncate (If) –
  • align cells right. (and) –
  • vertical – If set to True, print output rows vertically (one line
  • column value) (per) –

Returns:

sort(*cols, **kwargs)[source]

Returns a new DataStream sorted by the specified column(s).

Parameters:
  • cols – list of Column or column names to sort by.
  • ascending – boolean or list of boolean (default True). Sort ascending vs. descending. Specify list for multiple sort orders. If a list is specified, length of the list must equal length of the cols.
Returns:

DataStream object

Return type:

object

Examples

>>> ds.sort("col_name", ascending=False)
summary(*statistics)[source]

Computes specified statistics for numeric and string columns. Available statistics are: - count - mean - stddev - min - max - arbitrary approximate percentiles specified as a percentage (eg, 75%) If no statistics are given, this function computes count, mean, stddev, min, approximate quartiles (percentiles at 25%, 50%, and 75%), and max.

Parameters:*statistics

Examples

>>> ds.summary().show()
>>> ds.summary("count", "min", "25%", "75%", "max").show()
>>> # To do a summary for specific columns first select them:
>>> ds.select("col1", "col2").summary("count").show()
take(num)[source]

Returns the first num rows as a list of Row.

Returns:row(s) of a DataStream
Return type:Row(list)

Examples

>>> ds.take()
toPandas()[source]

This method converts pyspark dataframe into pandas dataframe.

Notes

This method will collect all the data on master node to convert pyspark dataframe into pandas dataframe. After converting to pandas dataframe datastream objects helper methods will not be accessible.

Returns:this will return a new datastream object with blank metadata
Return type:Datastream (Metadata, pandas.DataFrame)

Examples

>>> CC = CerebralCortex("/directory/path/of/configs/")
>>> ds = CC.get_stream("STREAM-NAME")
>>> new_ds = ds.toPandas()
>>> new_ds.data.head()
union(other)[source]

Return a new Datastream containing union of rows in this and another frame.

This is equivalent to UNION ALL in SQL. To do a SQL-style set union (that does deduplication of elements), use this function followed by distinct().

Also as standard in SQL, this function resolves columns by position (not by name).

Parameters:other (DataStream) –
Returns:
Return type:Datastream

Examples

>>> ds.union(ds2).collect()
unionByName(other)[source]

Returns a new Datastream containing union of rows in this and another frame.

This is different from both UNION ALL and UNION DISTINCT in SQL. To do a SQL-style set union (that does deduplication of elements), use this function followed by distinct().

The difference between this function and union() is that this function resolves columns by name (not by position):

Parameters:other (DataStream) –
Returns:
Return type:Datastream

Examples

>>> ds.unionByName(ds2).show()
where(condition)[source]

where() is an alias for filter().

Parameters:condition
Returns:
Return type:Datastream

Examples

>>> ds.filter("age > 3").collect()
window(windowDuration: int = None, groupByColumnName: List[str] = [], slideDuration: int = None, startTime=None, preserve_ts=False)[source]

Window data into fixed length chunks. If no columnName is provided then the windowing will be performed on all the columns.

Parameters:
  • windowDuration (int) – duration of a window in seconds
  • List[str] (groupByColumnName) – groupby column names, for example, groupby user, col1, col2
  • slideDuration (int) – slide duration of a window
  • startTime (datetime) – The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15… provide startTime as 15 minutes. First time of data will be used as startTime if none is provided
  • preserve_ts (bool) – setting this to True will return timestamps of corresponding to each windowed value
Returns:

this will return a new datastream object with blank metadata

Return type:

DataStream

Note

This windowing method will use collect_list to return values for each window. collect_list is not optimized.

withColumn(colName, col)[source]

Returns a new DataStream by adding a column or replacing the existing column that has the same name. The column expression must be an expression over this DataStream; attempting to add a column from some other datastream will raise an error. :param colName: name of the new column. :type colName: str :param col: a Column expression for the new column.

Examples

>>> ds.withColumn('col_name', ds.col_name + 2)
withColumnRenamed(existing, new)[source]

Returns a new DataStream by renaming an existing column. This is a no-op if schema doesn’t contain the given column name.

Parameters:
  • existing (str) – string, name of the existing column to rename.
  • new (str) – string, new name of the column.

Examples

>>> ds.withColumnRenamed('col_name', 'new_col_name')
Returns:DataStream object with new column name(s)
write()[source]

Interface for saving the content of the non-streaming DataFrame out into external storage.

Returns:DataFrameWriter

New in version 1.4.

writeStream()[source]

Interface for saving the content of the streaming DataFrame out into external storage.

Note

Evolving.

Returns:DataStreamWriter

New in version 2.0.

get_window(x)[source]
windowing_udf(x)

Module contents

class DataStream(data: object = None, metadata: cerebralcortex.core.metadata_manager.stream.metadata.Metadata = None)[source]

Bases: pyspark.sql.dataframe.DataFrame

agg(*exprs)[source]

Aggregate on the entire DataStream without groups

Parameters:*exprs
Returns:this will return a new datastream object with blank metadata
Return type:DataStream

Examples

>>> ds.agg({"age": "max"}).collect()
>>> # Below example shows how to use pyspark functions in add method
>>> from pyspark.sql import functions as F
>>> ds.agg(F.min(ds.age)).collect()
alias(alias)[source]

Returns a new DataStream with an alias set.

Parameters:alias – string, an alias name to be set for the datastream.
Returns:DataStream object
Return type:object

Examples

>>> df_as1 = df.alias("df_as1")
>>> df_as2 = df.alias("df_as2")
approxQuantile(col, probabilities, relativeError)[source]

Calculates the approximate quantiles of numerical columns of a DataStream.

The result of this algorithm has the following deterministic bound: If the DataStream has N elements and if we request the quantile at probability p up to error err, then the algorithm will return a sample x from the DataStream so that the exact rank of x is close to (p * N). More precisely,

floor((p - err) * N) <= rank(x) <= ceil((p + err) * N).

This method implements a variation of the Greenwald-Khanna algorithm (with some speed optimizations). The algorithm was first present in [[http://dx.doi.org/10.1145/375663.375670 Space-efficient Online Computation of Quantile Summaries]] by Greenwald and Khanna.

Note that null values will be ignored in numerical columns before calculation. For columns only containing null values, an empty list is returned.

Parameters:
  • col (str[list]) – Can be a single column name, or a list of names for multiple columns.
  • probabilities – a list of quantile probabilities Each number must belong to [0, 1]. For example 0 is the minimum, 0.5 is the median, 1 is the maximum.
  • relativeError – The relative target precision to achieve (>= 0). If set to zero, the exact quantiles are computed, which could be very expensive. Note that values greater than 1 are accepted but give the same result as 1.
Returns:

the approximate quantiles at the given probabilities. If the input col is a string, the output is a list of floats. If the input col is a list or tuple of strings, the output is also a list, but each element in it is a list of floats, i.e., the output is a list of list of floats.

colRegex(colName)[source]

Selects column based on the column name specified as a regex and returns it as Column.

Parameters:colName (str) – column name specified as a regex.
Returns:
Return type:DataStream

Examples

>>> ds.colRegex("colName")
collect()[source]

Collect all the data to master node and return list of rows

Returns:rows of all the dataframe
Return type:List

Examples

>>> ds.collect()
compute(udfName, windowDuration: int = None, slideDuration: int = None, groupByColumnName: List[str] = [], startTime=None)[source]

Run an algorithm. This method supports running an udf method on windowed data

Parameters:
  • udfName – Name of the algorithm
  • windowDuration (int) – duration of a window in seconds
  • slideDuration (int) – slide duration of a window
  • List[str] (groupByColumnName) – groupby column names, for example, groupby user, col1, col2
  • startTime (datetime) – The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15… provide startTime as 15 minutes. First time of data will be used as startTime if none is provided
Returns:

this will return a new datastream object with blank metadata

Return type:

DataStream

corr(col1, col2, method=None)[source]

Calculates the correlation of two columns of a DataStream as a double value. Currently only supports the Pearson Correlation Coefficient.

Parameters:
  • col1 (str) – The name of the first column
  • col2 (str) – The name of the second column
  • method (str) – The correlation method. Currently only supports “pearson”
Returns:

this will return a new datastream object with blank metadata

Return type:

DataStream

Examples

>>> ds.corr("cal1", "col2", "pearson").collect()
count()[source]

Returns the number of rows in this DataStream.

Examples

>>> ds.count()
cov(col1, col2)[source]

Calculate the sample covariance for the given columns, specified by their names, as a double value.

Parameters:
  • col1 (str) – The name of the first column
  • col2 (str) – The name of the second column
Returns:

this will return a new datastream object with blank metadata

Return type:

DataStream

Examples

>>> ds.cov("cal1", "col2", "pearson").collect()
create_windows(window_length='hour')[source]

filter data

Parameters:
  • columnName (str) – name of the column
  • operator (str) – basic operators (e.g., >, <, ==, !=)
  • value (Any) – if the columnName is timestamp, please provide python datatime object
Returns:

this will return a new datastream object with blank metadata

Return type:

DataStream

crossJoin(other)[source]

Returns the cartesian product with another DataStream

Parameters:other – Right side of the cartesian product.
Returns:DataStream object with joined streams

Examples

>>> ds.crossJoin(ds2.select("col_name")).collect()
crosstab(col1, col2)[source]

Computes a pair-wise frequency table of the given columns. Also known as a contingency table. The number of distinct values for each column should be less than 1e4. At most 1e6 non-zero pair frequencies will be returned. The first column of each row will be the distinct values of col1 and the column names will be the distinct values of col2. The name of the first column will be $col1_$col2. Pairs that have no occurrences will have zero as their counts.

Parameters:
  • col1 (str) – The name of the first column. Distinct items will make the first item of each row.
  • col2 (str) – The name of the second column. Distinct items will make the column names of the DataStream.
Returns:

DataStream object

Examples

>>> ds.crosstab("col_1", "col_2")
data

get stream data

Returns (DataFrame):

describe(*cols)[source]

Computes basic statistics for numeric and string columns. This include count, mean, stddev, min, and max. If no columns are given, this function computes statistics for all numerical or string columns.

Parameters:*cols

Examples

>>> ds.describe(['col_name']).show()
>>> ds.describe().show()
distinct()[source]

Returns a new DataStream containing the distinct rows in this DataStream.

Returns:this will return a new datastream object with blank metadata
Return type:DataStream

Examples

>>> ds.distinct().count()
drop(*cols)[source]

Returns a new Datastream that drops the specified column. This is a no-op if schema doesn’t contain the given column name(s).

Parameters:*cols – a string name of the column to drop, or a Column to drop, or a list of string name of the columns to drop.
Returns:
Return type:Datastream

Examples

>>> ds.drop('col_name')
dropDuplicates(subset=None)[source]

Return a new DataStream with duplicate rows removed, optionally only considering certain columns.

Parameters:subset – optional list of column names to consider.
Returns:
Return type:Datastream

Examples

>>> ds.dropDuplicates().show()
>>> # Example on how to use it with params
>>> ds.dropDuplicates(['col_name1', 'col_name2']).show()
dropna(how='any', thresh=None, subset=None)[source]

Returns a new DataStream omitting rows with null values.

Parameters:
  • how – ‘any’ or ‘all’. If ‘any’, drop a row if it contains any nulls. If ‘all’, drop a row only if all its values are null.
  • thresh – int, default None If specified, drop rows that have less than thresh non-null values. This overwrites the how parameter.
  • subset – optional list of column names to consider.
Returns:

Return type:

Datastream

Examples

>>> ds.dropna()
exceptAll(other)[source]

Return a new DataStream containing rows in this DataStream but not in another DataStream while preserving duplicates.

Parameters:other – other DataStream object
Returns:
Return type:Datastream

Examples

>>> ds1.exceptAll(ds2).show()
explain(extended=False)[source]

Prints the (logical and physical) plans to the console for debugging purpose.

Parameters:extended – boolean, default False. If False, prints only the physical plan.

Examples

>>> ds.explain()
fillna(value, subset=None)[source]

Replace null values

Parameters:
  • value – int, long, float, string, bool or dict. Value to replace null values with. If the value is a dict, then subset is ignored and value must be a mapping from column name (string) to replacement value. The replacement value must be an int, long, float, boolean, or string.
  • subset – optional list of column names to consider. Columns specified in subset that do not have matching data type are ignored. For example, if value is a string, and subset contains a non-string column, then the non-string column is simply ignored.
Returns:

Return type:

Datastream

Examples

>>> ds.fill(50).show()
>>> ds.fill({'col1': 50, 'col2': 'unknown'}).show()
filter(condition)[source]

Filters rows using the given condition

Parameters:condition – a Column of types.BooleanType or a string of SQL expression.
Returns:this will return a new datastream object with blank metadata
Return type:DataStream

Examples

>>> ds.filter("age > 3")
>>> df.filter(df.age > 3)
filter_user(user_ids: List)[source]

filter data to get only selective users’ data

Parameters:user_ids (List[str]) – list of users’ UUIDs
Returns:this will return a new datastream object with blank metadata
Return type:DataStream
filter_version(version: List)[source]

filter data to get only selective users’ data

Parameters:version (List[str]) – list of stream versions
Returns:this will return a new datastream object with blank metadata
Return type:DataStream

Todo

Metadata version should be return with the data

first()[source]

Returns the first row as a Row.

Returns:First row of a DataStream

Examples

>>> ds.first()
foreach(f)[source]

Applies the f function to all Row of DataStream. This is a shorthand for df.rdd.foreach()

Parameters:f – function
Returns:DataStream object

Examples

>>> def f(person):
...     print(person.name)
>>> ds.foreach(f)
freqItems(cols, support=None)[source]

Finding frequent items for columns, possibly with false positives. Using the frequent element count algorithm described in “http://dx.doi.org/10.1145/762471.762473, proposed by Karp, Schenker, and Papadimitriou”.

Returns:
Return type:DataStream

Examples

>>> ds.freqItems("col-name")
get_metadata() → cerebralcortex.core.metadata_manager.stream.metadata.Metadata[source]

get stream metadata

Returns:single version of a stream
Return type:Metadata
Raises:Exception – if specified version is not available for the stream
groupby(*cols)[source]

Groups the DataFrame using the specified columns, so we can run aggregation on them. This method will return pyspark.sql.GroupedData object.

Parameters:of columns to group by. Each element should be a column name (list) –

Returns:

head(n=None)[source]

Returns the first n rows.

Parameters:n (int) – default 1. Number of rows to return.
Returns:If n is greater than 1, return a list of Row. If n is 1, return a single Row.

Notes

This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory.

Examples

>>> ds.head(5)
intersect(other)[source]

Return a new DataFrame containing rows only in both this frame and another frame. This is equivalent to INTERSECT in SQL.

Parameters:other (int) – DataStream object
Returns:If n is greater than 1, return a list of Row. If n is 1, return a single Row.

Examples

>>> ds.intersect(other=ds2)
intersectAll(other)[source]

Return a new DataFrame containing rows in both this dataframe and other dataframe while preserving duplicates.

Parameters:other (int) – DataStream object
Returns:If n is greater than 1, return a list of Row. If n is 1, return a single Row.

Examples

>>> ds.intersectAll(ds2).show()
join(other, on=None, how=None)[source]

Joins with another DataStream, using the given join expression.

Parameters:
  • other (DataStream) – Right side of the join
  • – a string for the join column name, a list of column names, a join expression (on) –
  • how (str) – inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, and left_anti.

Examples

>>> ds.join(ds2, 'user', 'outer').show()
Returns:DataStream object with joined streams
join_stress_streams(dataStream, propagation='forward')[source]

filter data

Parameters:
  • columnName (str) – name of the column
  • operator (str) – basic operators (e.g., >, <, ==, !=)
  • value (Any) – if the columnName is timestamp, please provide python datatime object
Returns:

this will return a new datastream object with blank metadata

Return type:

DataStream

limit(num)[source]

Limits the result count to the number specified.

Parameters:num
Returns:
Return type:Datastream
map_stream(window_ds)[source]

Map/join a stream to a windowed stream

Parameters:window_ds (Datastream) – windowed datastream object
Returns:joined/mapped stream
Return type:Datastream
metadata

return stream metadata

Returns:
Return type:Metadata
orderBy(*cols)[source]

order by column name

Parameters:*cols
Returns:
Return type:Datastream
printSchema()[source]

Prints out the schema in the tree format.

Examples

>>> ds.printSchema()
repartition(numPartitions, *cols)[source]

Returns a new DataStream partitioned by the given partitioning expressions. The resulting DataStream is hash partitioned.

numPartitions can be an int to specify the target number of partitions or a Column. If it is a Column, it will be used as the first partitioning column. If not specified, the default number of partitions is used.

Parameters:
  • numPartitions
  • *cols

Returns:

replace(to_replace, value, subset=None)[source]

Returns a new DataStream replacing a value with another value. Values to_replace and value must have the same type and can only be numerics, booleans, or strings. Value can have None. When replacing, the new value will be cast to the type of the existing column. For numeric replacements all values to be replaced should have unique floating point representation. In case of conflicts (for example with {42: -1, 42.0: 1}) and arbitrary replacement will be used.

Parameters:
  • to_replace – bool, int, long, float, string, list or dict. Value to be replaced. If the value is a dict, then value is ignored or can be omitted, and to_replace must be a mapping between a value and a replacement.
  • value – bool, int, long, float, string, list or None. The replacement value must be a bool, int, long, float, string or None. If value is a list, value should be of the same length and type as to_replace. If value is a scalar and to_replace is a sequence, then value is used as a replacement for each item in to_replace.
  • subset – optional list of column names to consider. Columns specified in subset that do not have matching data type are ignored. For example, if value is a string, and subset contains a non-string column, then the non-string column is simply ignored.
Returns:

Return type:

Datastream

Examples

>>> ds.replace(10, 20).show()
>>> ds.replace('some-str', None).show()
>>> ds.replace(['old_val1', 'new_val1'], ['old_val2', 'new_val2'], 'col_name').show()
select(*cols)[source]

Projects a set of expressions and returns a new DataStream :param cols: list of column names (string) or expressions (Column). If one of the column names is ‘*’, that column is expanded to include all columns in the current DataStream :type cols: str

Returns:this will return a new datastream object with selected columns
Return type:DataStream

Examples

>>> ds.select('*')
>>> ds.select('name', 'age')
>>> ds.select(ds.name, (ds.age + 10).alias('age'))
selectExpr(*expr)[source]

This is a variant of select() that accepts SQL expressions. Projects a set of expressions and returns a new DataStream

Parameters:expr (str) –
Returns:this will return a new datastream object with selected columns
Return type:DataStream

Examples

>>> ds.selectExpr("age * 2")
show(n=20, truncate=True, vertical=False)[source]
Parameters:
  • n – Number of rows to show.
  • truncate – If set to True, truncate strings longer than 20 chars by default.
  • set to a number greater than one, truncates long strings to length truncate (If) –
  • align cells right. (and) –
  • vertical – If set to True, print output rows vertically (one line
  • column value) (per) –

Returns:

sort(*cols, **kwargs)[source]

Returns a new DataStream sorted by the specified column(s).

Parameters:
  • cols – list of Column or column names to sort by.
  • ascending – boolean or list of boolean (default True). Sort ascending vs. descending. Specify list for multiple sort orders. If a list is specified, length of the list must equal length of the cols.
Returns:

DataStream object

Return type:

object

Examples

>>> ds.sort("col_name", ascending=False)
summary(*statistics)[source]

Computes specified statistics for numeric and string columns. Available statistics are: - count - mean - stddev - min - max - arbitrary approximate percentiles specified as a percentage (eg, 75%) If no statistics are given, this function computes count, mean, stddev, min, approximate quartiles (percentiles at 25%, 50%, and 75%), and max.

Parameters:*statistics

Examples

>>> ds.summary().show()
>>> ds.summary("count", "min", "25%", "75%", "max").show()
>>> # To do a summary for specific columns first select them:
>>> ds.select("col1", "col2").summary("count").show()
take(num)[source]

Returns the first num rows as a list of Row.

Returns:row(s) of a DataStream
Return type:Row(list)

Examples

>>> ds.take()
toPandas()[source]

This method converts pyspark dataframe into pandas dataframe.

Notes

This method will collect all the data on master node to convert pyspark dataframe into pandas dataframe. After converting to pandas dataframe datastream objects helper methods will not be accessible.

Returns:this will return a new datastream object with blank metadata
Return type:Datastream (Metadata, pandas.DataFrame)

Examples

>>> CC = CerebralCortex("/directory/path/of/configs/")
>>> ds = CC.get_stream("STREAM-NAME")
>>> new_ds = ds.toPandas()
>>> new_ds.data.head()
union(other)[source]

Return a new Datastream containing union of rows in this and another frame.

This is equivalent to UNION ALL in SQL. To do a SQL-style set union (that does deduplication of elements), use this function followed by distinct().

Also as standard in SQL, this function resolves columns by position (not by name).

Parameters:other (DataStream) –
Returns:
Return type:Datastream

Examples

>>> ds.union(ds2).collect()
unionByName(other)[source]

Returns a new Datastream containing union of rows in this and another frame.

This is different from both UNION ALL and UNION DISTINCT in SQL. To do a SQL-style set union (that does deduplication of elements), use this function followed by distinct().

The difference between this function and union() is that this function resolves columns by name (not by position):

Parameters:other (DataStream) –
Returns:
Return type:Datastream

Examples

>>> ds.unionByName(ds2).show()
where(condition)[source]

where() is an alias for filter().

Parameters:condition
Returns:
Return type:Datastream

Examples

>>> ds.filter("age > 3").collect()
window(windowDuration: int = None, groupByColumnName: List[str] = [], slideDuration: int = None, startTime=None, preserve_ts=False)[source]

Window data into fixed length chunks. If no columnName is provided then the windowing will be performed on all the columns.

Parameters:
  • windowDuration (int) – duration of a window in seconds
  • List[str] (groupByColumnName) – groupby column names, for example, groupby user, col1, col2
  • slideDuration (int) – slide duration of a window
  • startTime (datetime) – The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15… provide startTime as 15 minutes. First time of data will be used as startTime if none is provided
  • preserve_ts (bool) – setting this to True will return timestamps of corresponding to each windowed value
Returns:

this will return a new datastream object with blank metadata

Return type:

DataStream

Note

This windowing method will use collect_list to return values for each window. collect_list is not optimized.

withColumn(colName, col)[source]

Returns a new DataStream by adding a column or replacing the existing column that has the same name. The column expression must be an expression over this DataStream; attempting to add a column from some other datastream will raise an error. :param colName: name of the new column. :type colName: str :param col: a Column expression for the new column.

Examples

>>> ds.withColumn('col_name', ds.col_name + 2)
withColumnRenamed(existing, new)[source]

Returns a new DataStream by renaming an existing column. This is a no-op if schema doesn’t contain the given column name.

Parameters:
  • existing (str) – string, name of the existing column to rename.
  • new (str) – string, new name of the column.

Examples

>>> ds.withColumnRenamed('col_name', 'new_col_name')
Returns:DataStream object with new column name(s)
write()[source]

Interface for saving the content of the non-streaming DataFrame out into external storage.

Returns:DataFrameWriter

New in version 1.4.

writeStream()[source]

Interface for saving the content of the streaming DataFrame out into external storage.

Note

Evolving.

Returns:DataStreamWriter

New in version 2.0.