cerebralcortex.core.datatypes package¶
Submodules¶
cerebralcortex.core.datatypes.datastream module¶
-
class
DataStream
(data: object = None, metadata: cerebralcortex.core.metadata_manager.stream.metadata.Metadata = None)[source]¶ Bases:
pyspark.sql.dataframe.DataFrame
-
agg
(*exprs)[source]¶ Aggregate on the entire DataStream without groups
Parameters: *exprs – Returns: this will return a new datastream object with blank metadata Return type: DataStream Examples
>>> ds.agg({"age": "max"}).collect() >>> # Below example shows how to use pyspark functions in add method >>> from pyspark.sql import functions as F >>> ds.agg(F.min(ds.age)).collect()
-
alias
(alias)[source]¶ Returns a new DataStream with an alias set.
Parameters: alias – string, an alias name to be set for the datastream. Returns: DataStream object Return type: object Examples
>>> df_as1 = df.alias("df_as1") >>> df_as2 = df.alias("df_as2")
-
approxQuantile
(col, probabilities, relativeError)[source]¶ Calculates the approximate quantiles of numerical columns of a DataStream.
The result of this algorithm has the following deterministic bound: If the DataStream has N elements and if we request the quantile at probability p up to error err, then the algorithm will return a sample x from the DataStream so that the exact rank of x is close to (p * N). More precisely,
floor((p - err) * N) <= rank(x) <= ceil((p + err) * N).
This method implements a variation of the Greenwald-Khanna algorithm (with some speed optimizations). The algorithm was first present in [[http://dx.doi.org/10.1145/375663.375670 Space-efficient Online Computation of Quantile Summaries]] by Greenwald and Khanna.
Note that null values will be ignored in numerical columns before calculation. For columns only containing null values, an empty list is returned.
Parameters: - col (str[list]) – Can be a single column name, or a list of names for multiple columns.
- probabilities – a list of quantile probabilities Each number must belong to [0, 1]. For example 0 is the minimum, 0.5 is the median, 1 is the maximum.
- relativeError – The relative target precision to achieve (>= 0). If set to zero, the exact quantiles are computed, which could be very expensive. Note that values greater than 1 are accepted but give the same result as 1.
Returns: the approximate quantiles at the given probabilities. If the input col is a string, the output is a list of floats. If the input col is a list or tuple of strings, the output is also a list, but each element in it is a list of floats, i.e., the output is a list of list of floats.
-
colRegex
(colName)[source]¶ Selects column based on the column name specified as a regex and returns it as Column.
Parameters: colName (str) – column name specified as a regex. Returns: Return type: DataStream Examples
>>> ds.colRegex("colName")
-
collect
()[source]¶ Collect all the data to master node and return list of rows
Returns: rows of all the dataframe Return type: List Examples
>>> ds.collect()
-
compute
(udfName, windowDuration: int = None, slideDuration: int = None, groupByColumnName: List[str] = [], startTime=None)[source]¶ Run an algorithm. This method supports running an udf method on windowed data
Parameters: - udfName – Name of the algorithm
- windowDuration (int) – duration of a window in seconds
- slideDuration (int) – slide duration of a window
- List[str] (groupByColumnName) – groupby column names, for example, groupby user, col1, col2
- startTime (datetime) – The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15… provide startTime as 15 minutes. First time of data will be used as startTime if none is provided
Returns: this will return a new datastream object with blank metadata
Return type:
-
corr
(col1, col2, method=None)[source]¶ Calculates the correlation of two columns of a DataStream as a double value. Currently only supports the Pearson Correlation Coefficient.
Parameters: - col1 (str) – The name of the first column
- col2 (str) – The name of the second column
- method (str) – The correlation method. Currently only supports “pearson”
Returns: this will return a new datastream object with blank metadata
Return type: Examples
>>> ds.corr("cal1", "col2", "pearson").collect()
-
cov
(col1, col2)[source]¶ Calculate the sample covariance for the given columns, specified by their names, as a double value.
Parameters: - col1 (str) – The name of the first column
- col2 (str) – The name of the second column
Returns: this will return a new datastream object with blank metadata
Return type: Examples
>>> ds.cov("cal1", "col2", "pearson").collect()
-
create_windows
(window_length='hour')[source]¶ filter data
Parameters: - columnName (str) – name of the column
- operator (str) – basic operators (e.g., >, <, ==, !=)
- value (Any) – if the columnName is timestamp, please provide python datatime object
Returns: this will return a new datastream object with blank metadata
Return type:
-
crossJoin
(other)[source]¶ Returns the cartesian product with another DataStream
Parameters: other – Right side of the cartesian product. Returns: DataStream object with joined streams Examples
>>> ds.crossJoin(ds2.select("col_name")).collect()
-
crosstab
(col1, col2)[source]¶ Computes a pair-wise frequency table of the given columns. Also known as a contingency table. The number of distinct values for each column should be less than 1e4. At most 1e6 non-zero pair frequencies will be returned. The first column of each row will be the distinct values of col1 and the column names will be the distinct values of col2. The name of the first column will be $col1_$col2. Pairs that have no occurrences will have zero as their counts.
Parameters: - col1 (str) – The name of the first column. Distinct items will make the first item of each row.
- col2 (str) – The name of the second column. Distinct items will make the column names of the DataStream.
Returns: DataStream object
Examples
>>> ds.crosstab("col_1", "col_2")
-
data
¶ get stream data
Returns (DataFrame):
-
describe
(*cols)[source]¶ Computes basic statistics for numeric and string columns. This include count, mean, stddev, min, and max. If no columns are given, this function computes statistics for all numerical or string columns.
Parameters: *cols – Examples
>>> ds.describe(['col_name']).show() >>> ds.describe().show()
-
distinct
()[source]¶ Returns a new DataStream containing the distinct rows in this DataStream.
Returns: this will return a new datastream object with blank metadata Return type: DataStream Examples
>>> ds.distinct().count()
-
drop
(*cols)[source]¶ Returns a new Datastream that drops the specified column. This is a no-op if schema doesn’t contain the given column name(s).
Parameters: *cols – a string name of the column to drop, or a Column to drop, or a list of string name of the columns to drop. Returns: Return type: Datastream Examples
>>> ds.drop('col_name')
-
dropDuplicates
(subset=None)[source]¶ Return a new DataStream with duplicate rows removed, optionally only considering certain columns.
Parameters: subset – optional list of column names to consider. Returns: Return type: Datastream Examples
>>> ds.dropDuplicates().show() >>> # Example on how to use it with params >>> ds.dropDuplicates(['col_name1', 'col_name2']).show()
-
dropna
(how='any', thresh=None, subset=None)[source]¶ Returns a new DataStream omitting rows with null values.
Parameters: - how – ‘any’ or ‘all’. If ‘any’, drop a row if it contains any nulls. If ‘all’, drop a row only if all its values are null.
- thresh – int, default None If specified, drop rows that have less than thresh non-null values. This overwrites the how parameter.
- subset – optional list of column names to consider.
Returns: Return type: Datastream
Examples
>>> ds.dropna()
-
exceptAll
(other)[source]¶ Return a new DataStream containing rows in this DataStream but not in another DataStream while preserving duplicates.
Parameters: other – other DataStream object Returns: Return type: Datastream Examples
>>> ds1.exceptAll(ds2).show()
-
explain
(extended=False)[source]¶ Prints the (logical and physical) plans to the console for debugging purpose.
Parameters: extended – boolean, default False. If False, prints only the physical plan. Examples
>>> ds.explain()
-
fillna
(value, subset=None)[source]¶ Replace null values
Parameters: - value – int, long, float, string, bool or dict. Value to replace null values with. If the value is a dict, then subset is ignored and value must be a mapping from column name (string) to replacement value. The replacement value must be an int, long, float, boolean, or string.
- subset – optional list of column names to consider. Columns specified in subset that do not have matching data type are ignored. For example, if value is a string, and subset contains a non-string column, then the non-string column is simply ignored.
Returns: Return type: Datastream
Examples
>>> ds.fill(50).show() >>> ds.fill({'col1': 50, 'col2': 'unknown'}).show()
-
filter
(condition)[source]¶ Filters rows using the given condition
Parameters: condition – a Column of types.BooleanType or a string of SQL expression. Returns: this will return a new datastream object with blank metadata Return type: DataStream Examples
>>> ds.filter("age > 3") >>> df.filter(df.age > 3)
-
filter_user
(user_ids: List)[source]¶ filter data to get only selective users’ data
Parameters: user_ids (List[str]) – list of users’ UUIDs Returns: this will return a new datastream object with blank metadata Return type: DataStream
-
filter_version
(version: List)[source]¶ filter data to get only selective users’ data
Parameters: version (List[str]) – list of stream versions Returns: this will return a new datastream object with blank metadata Return type: DataStream Todo
Metadata version should be return with the data
-
first
()[source]¶ Returns the first row as a Row.
Returns: First row of a DataStream Examples
>>> ds.first()
-
foreach
(f)[source]¶ Applies the f function to all Row of DataStream. This is a shorthand for df.rdd.foreach()
Parameters: f – function Returns: DataStream object Examples
>>> def f(person): ... print(person.name) >>> ds.foreach(f)
-
freqItems
(cols, support=None)[source]¶ Finding frequent items for columns, possibly with false positives. Using the frequent element count algorithm described in “http://dx.doi.org/10.1145/762471.762473, proposed by Karp, Schenker, and Papadimitriou”.
Returns: Return type: DataStream Examples
>>> ds.freqItems("col-name")
-
get_metadata
() → cerebralcortex.core.metadata_manager.stream.metadata.Metadata[source]¶ get stream metadata
Returns: single version of a stream Return type: Metadata Raises: Exception
– if specified version is not available for the stream
-
groupby
(*cols)[source]¶ Groups the DataFrame using the specified columns, so we can run aggregation on them. This method will return pyspark.sql.GroupedData object.
Parameters: of columns to group by. Each element should be a column name (list) – Returns:
-
head
(n=None)[source]¶ Returns the first n rows.
Parameters: n (int) – default 1. Number of rows to return. Returns: If n is greater than 1, return a list of Row. If n is 1, return a single Row. Notes
This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory.
Examples
>>> ds.head(5)
-
intersect
(other)[source]¶ Return a new DataFrame containing rows only in both this frame and another frame. This is equivalent to INTERSECT in SQL.
Parameters: other (int) – DataStream object Returns: If n is greater than 1, return a list of Row. If n is 1, return a single Row. Examples
>>> ds.intersect(other=ds2)
-
intersectAll
(other)[source]¶ Return a new DataFrame containing rows in both this dataframe and other dataframe while preserving duplicates.
Parameters: other (int) – DataStream object Returns: If n is greater than 1, return a list of Row. If n is 1, return a single Row. Examples
>>> ds.intersectAll(ds2).show()
-
join
(other, on=None, how=None)[source]¶ Joins with another DataStream, using the given join expression.
Parameters: - other (DataStream) – Right side of the join
- – a string for the join column name, a list of column names, a join expression (on) –
- how (str) – inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, and left_anti.
Examples
>>> ds.join(ds2, 'user', 'outer').show()
Returns: DataStream object with joined streams
-
join_stress_streams
(dataStream, propagation='forward')[source]¶ filter data
Parameters: - columnName (str) – name of the column
- operator (str) – basic operators (e.g., >, <, ==, !=)
- value (Any) – if the columnName is timestamp, please provide python datatime object
Returns: this will return a new datastream object with blank metadata
Return type:
-
limit
(num)[source]¶ Limits the result count to the number specified.
Parameters: num – Returns: Return type: Datastream
-
map_stream
(window_ds)[source]¶ Map/join a stream to a windowed stream
Parameters: window_ds (Datastream) – windowed datastream object Returns: joined/mapped stream Return type: Datastream
-
repartition
(numPartitions, *cols)[source]¶ Returns a new DataStream partitioned by the given partitioning expressions. The resulting DataStream is hash partitioned.
numPartitions can be an int to specify the target number of partitions or a Column. If it is a Column, it will be used as the first partitioning column. If not specified, the default number of partitions is used.
Parameters: - numPartitions –
- *cols –
Returns:
-
replace
(to_replace, value, subset=None)[source]¶ Returns a new DataStream replacing a value with another value. Values to_replace and value must have the same type and can only be numerics, booleans, or strings. Value can have None. When replacing, the new value will be cast to the type of the existing column. For numeric replacements all values to be replaced should have unique floating point representation. In case of conflicts (for example with {42: -1, 42.0: 1}) and arbitrary replacement will be used.
Parameters: - to_replace – bool, int, long, float, string, list or dict. Value to be replaced. If the value is a dict, then value is ignored or can be omitted, and to_replace must be a mapping between a value and a replacement.
- value – bool, int, long, float, string, list or None. The replacement value must be a bool, int, long, float, string or None. If value is a list, value should be of the same length and type as to_replace. If value is a scalar and to_replace is a sequence, then value is used as a replacement for each item in to_replace.
- subset – optional list of column names to consider. Columns specified in subset that do not have matching data type are ignored. For example, if value is a string, and subset contains a non-string column, then the non-string column is simply ignored.
Returns: Return type: Datastream
Examples
>>> ds.replace(10, 20).show() >>> ds.replace('some-str', None).show() >>> ds.replace(['old_val1', 'new_val1'], ['old_val2', 'new_val2'], 'col_name').show()
-
select
(*cols)[source]¶ Projects a set of expressions and returns a new DataStream :param cols: list of column names (string) or expressions (Column). If one of the column names is ‘*’, that column is expanded to include all columns in the current DataStream :type cols: str
Returns: this will return a new datastream object with selected columns Return type: DataStream Examples
>>> ds.select('*') >>> ds.select('name', 'age') >>> ds.select(ds.name, (ds.age + 10).alias('age'))
-
selectExpr
(*expr)[source]¶ This is a variant of select() that accepts SQL expressions. Projects a set of expressions and returns a new DataStream
Parameters: expr (str) – Returns: this will return a new datastream object with selected columns Return type: DataStream Examples
>>> ds.selectExpr("age * 2")
-
show
(n=20, truncate=True, vertical=False)[source]¶ Parameters: - n – Number of rows to show.
- truncate – If set to
True
, truncate strings longer than 20 chars by default. - set to a number greater than one, truncates long strings to length truncate (If) –
- align cells right. (and) –
- vertical – If set to
True
, print output rows vertically (one line - column value) (per) –
Returns:
-
sort
(*cols, **kwargs)[source]¶ Returns a new DataStream sorted by the specified column(s).
Parameters: - cols – list of Column or column names to sort by.
- ascending – boolean or list of boolean (default True). Sort ascending vs. descending. Specify list for multiple sort orders. If a list is specified, length of the list must equal length of the cols.
Returns: DataStream object
Return type: object
Examples
>>> ds.sort("col_name", ascending=False)
-
summary
(*statistics)[source]¶ Computes specified statistics for numeric and string columns. Available statistics are: - count - mean - stddev - min - max - arbitrary approximate percentiles specified as a percentage (eg, 75%) If no statistics are given, this function computes count, mean, stddev, min, approximate quartiles (percentiles at 25%, 50%, and 75%), and max.
Parameters: *statistics – Examples
>>> ds.summary().show() >>> ds.summary("count", "min", "25%", "75%", "max").show() >>> # To do a summary for specific columns first select them: >>> ds.select("col1", "col2").summary("count").show()
-
take
(num)[source]¶ Returns the first num rows as a list of Row.
Returns: row(s) of a DataStream Return type: Row(list) Examples
>>> ds.take()
-
toPandas
()[source]¶ This method converts pyspark dataframe into pandas dataframe.
Notes
This method will collect all the data on master node to convert pyspark dataframe into pandas dataframe. After converting to pandas dataframe datastream objects helper methods will not be accessible.
Returns: this will return a new datastream object with blank metadata Return type: Datastream (Metadata, pandas.DataFrame) Examples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> ds = CC.get_stream("STREAM-NAME") >>> new_ds = ds.toPandas() >>> new_ds.data.head()
-
union
(other)[source]¶ Return a new Datastream containing union of rows in this and another frame.
This is equivalent to UNION ALL in SQL. To do a SQL-style set union (that does deduplication of elements), use this function followed by distinct().
Also as standard in SQL, this function resolves columns by position (not by name).
Parameters: other (DataStream) – Returns: Return type: Datastream Examples
>>> ds.union(ds2).collect()
-
unionByName
(other)[source]¶ Returns a new Datastream containing union of rows in this and another frame.
This is different from both UNION ALL and UNION DISTINCT in SQL. To do a SQL-style set union (that does deduplication of elements), use this function followed by distinct().
The difference between this function and union() is that this function resolves columns by name (not by position):
Parameters: other (DataStream) – Returns: Return type: Datastream Examples
>>> ds.unionByName(ds2).show()
-
where
(condition)[source]¶ where() is an alias for filter().
Parameters: condition – Returns: Return type: Datastream Examples
>>> ds.filter("age > 3").collect()
-
window
(windowDuration: int = None, groupByColumnName: List[str] = [], slideDuration: int = None, startTime=None, preserve_ts=False)[source]¶ Window data into fixed length chunks. If no columnName is provided then the windowing will be performed on all the columns.
Parameters: - windowDuration (int) – duration of a window in seconds
- List[str] (groupByColumnName) – groupby column names, for example, groupby user, col1, col2
- slideDuration (int) – slide duration of a window
- startTime (datetime) – The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15… provide startTime as 15 minutes. First time of data will be used as startTime if none is provided
- preserve_ts (bool) – setting this to True will return timestamps of corresponding to each windowed value
Returns: this will return a new datastream object with blank metadata
Return type: Note
This windowing method will use collect_list to return values for each window. collect_list is not optimized.
-
withColumn
(colName, col)[source]¶ Returns a new DataStream by adding a column or replacing the existing column that has the same name. The column expression must be an expression over this DataStream; attempting to add a column from some other datastream will raise an error. :param colName: name of the new column. :type colName: str :param col: a Column expression for the new column.
Examples
>>> ds.withColumn('col_name', ds.col_name + 2)
-
withColumnRenamed
(existing, new)[source]¶ Returns a new DataStream by renaming an existing column. This is a no-op if schema doesn’t contain the given column name.
Parameters: - existing (str) – string, name of the existing column to rename.
- new (str) – string, new name of the column.
Examples
>>> ds.withColumnRenamed('col_name', 'new_col_name')
Returns: DataStream object with new column name(s)
-
-
windowing_udf
(x)¶
Module contents¶
-
class
DataStream
(data: object = None, metadata: cerebralcortex.core.metadata_manager.stream.metadata.Metadata = None)[source]¶ Bases:
pyspark.sql.dataframe.DataFrame
-
agg
(*exprs)[source]¶ Aggregate on the entire DataStream without groups
Parameters: *exprs – Returns: this will return a new datastream object with blank metadata Return type: DataStream Examples
>>> ds.agg({"age": "max"}).collect() >>> # Below example shows how to use pyspark functions in add method >>> from pyspark.sql import functions as F >>> ds.agg(F.min(ds.age)).collect()
-
alias
(alias)[source]¶ Returns a new DataStream with an alias set.
Parameters: alias – string, an alias name to be set for the datastream. Returns: DataStream object Return type: object Examples
>>> df_as1 = df.alias("df_as1") >>> df_as2 = df.alias("df_as2")
-
approxQuantile
(col, probabilities, relativeError)[source]¶ Calculates the approximate quantiles of numerical columns of a DataStream.
The result of this algorithm has the following deterministic bound: If the DataStream has N elements and if we request the quantile at probability p up to error err, then the algorithm will return a sample x from the DataStream so that the exact rank of x is close to (p * N). More precisely,
floor((p - err) * N) <= rank(x) <= ceil((p + err) * N).
This method implements a variation of the Greenwald-Khanna algorithm (with some speed optimizations). The algorithm was first present in [[http://dx.doi.org/10.1145/375663.375670 Space-efficient Online Computation of Quantile Summaries]] by Greenwald and Khanna.
Note that null values will be ignored in numerical columns before calculation. For columns only containing null values, an empty list is returned.
Parameters: - col (str[list]) – Can be a single column name, or a list of names for multiple columns.
- probabilities – a list of quantile probabilities Each number must belong to [0, 1]. For example 0 is the minimum, 0.5 is the median, 1 is the maximum.
- relativeError – The relative target precision to achieve (>= 0). If set to zero, the exact quantiles are computed, which could be very expensive. Note that values greater than 1 are accepted but give the same result as 1.
Returns: the approximate quantiles at the given probabilities. If the input col is a string, the output is a list of floats. If the input col is a list or tuple of strings, the output is also a list, but each element in it is a list of floats, i.e., the output is a list of list of floats.
-
colRegex
(colName)[source]¶ Selects column based on the column name specified as a regex and returns it as Column.
Parameters: colName (str) – column name specified as a regex. Returns: Return type: DataStream Examples
>>> ds.colRegex("colName")
-
collect
()[source]¶ Collect all the data to master node and return list of rows
Returns: rows of all the dataframe Return type: List Examples
>>> ds.collect()
-
compute
(udfName, windowDuration: int = None, slideDuration: int = None, groupByColumnName: List[str] = [], startTime=None)[source]¶ Run an algorithm. This method supports running an udf method on windowed data
Parameters: - udfName – Name of the algorithm
- windowDuration (int) – duration of a window in seconds
- slideDuration (int) – slide duration of a window
- List[str] (groupByColumnName) – groupby column names, for example, groupby user, col1, col2
- startTime (datetime) – The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15… provide startTime as 15 minutes. First time of data will be used as startTime if none is provided
Returns: this will return a new datastream object with blank metadata
Return type:
-
corr
(col1, col2, method=None)[source]¶ Calculates the correlation of two columns of a DataStream as a double value. Currently only supports the Pearson Correlation Coefficient.
Parameters: - col1 (str) – The name of the first column
- col2 (str) – The name of the second column
- method (str) – The correlation method. Currently only supports “pearson”
Returns: this will return a new datastream object with blank metadata
Return type: Examples
>>> ds.corr("cal1", "col2", "pearson").collect()
-
cov
(col1, col2)[source]¶ Calculate the sample covariance for the given columns, specified by their names, as a double value.
Parameters: - col1 (str) – The name of the first column
- col2 (str) – The name of the second column
Returns: this will return a new datastream object with blank metadata
Return type: Examples
>>> ds.cov("cal1", "col2", "pearson").collect()
-
create_windows
(window_length='hour')[source]¶ filter data
Parameters: - columnName (str) – name of the column
- operator (str) – basic operators (e.g., >, <, ==, !=)
- value (Any) – if the columnName is timestamp, please provide python datatime object
Returns: this will return a new datastream object with blank metadata
Return type:
-
crossJoin
(other)[source]¶ Returns the cartesian product with another DataStream
Parameters: other – Right side of the cartesian product. Returns: DataStream object with joined streams Examples
>>> ds.crossJoin(ds2.select("col_name")).collect()
-
crosstab
(col1, col2)[source]¶ Computes a pair-wise frequency table of the given columns. Also known as a contingency table. The number of distinct values for each column should be less than 1e4. At most 1e6 non-zero pair frequencies will be returned. The first column of each row will be the distinct values of col1 and the column names will be the distinct values of col2. The name of the first column will be $col1_$col2. Pairs that have no occurrences will have zero as their counts.
Parameters: - col1 (str) – The name of the first column. Distinct items will make the first item of each row.
- col2 (str) – The name of the second column. Distinct items will make the column names of the DataStream.
Returns: DataStream object
Examples
>>> ds.crosstab("col_1", "col_2")
-
data
¶ get stream data
Returns (DataFrame):
-
describe
(*cols)[source]¶ Computes basic statistics for numeric and string columns. This include count, mean, stddev, min, and max. If no columns are given, this function computes statistics for all numerical or string columns.
Parameters: *cols – Examples
>>> ds.describe(['col_name']).show() >>> ds.describe().show()
-
distinct
()[source]¶ Returns a new DataStream containing the distinct rows in this DataStream.
Returns: this will return a new datastream object with blank metadata Return type: DataStream Examples
>>> ds.distinct().count()
-
drop
(*cols)[source]¶ Returns a new Datastream that drops the specified column. This is a no-op if schema doesn’t contain the given column name(s).
Parameters: *cols – a string name of the column to drop, or a Column to drop, or a list of string name of the columns to drop. Returns: Return type: Datastream Examples
>>> ds.drop('col_name')
-
dropDuplicates
(subset=None)[source]¶ Return a new DataStream with duplicate rows removed, optionally only considering certain columns.
Parameters: subset – optional list of column names to consider. Returns: Return type: Datastream Examples
>>> ds.dropDuplicates().show() >>> # Example on how to use it with params >>> ds.dropDuplicates(['col_name1', 'col_name2']).show()
-
dropna
(how='any', thresh=None, subset=None)[source]¶ Returns a new DataStream omitting rows with null values.
Parameters: - how – ‘any’ or ‘all’. If ‘any’, drop a row if it contains any nulls. If ‘all’, drop a row only if all its values are null.
- thresh – int, default None If specified, drop rows that have less than thresh non-null values. This overwrites the how parameter.
- subset – optional list of column names to consider.
Returns: Return type: Datastream
Examples
>>> ds.dropna()
-
exceptAll
(other)[source]¶ Return a new DataStream containing rows in this DataStream but not in another DataStream while preserving duplicates.
Parameters: other – other DataStream object Returns: Return type: Datastream Examples
>>> ds1.exceptAll(ds2).show()
-
explain
(extended=False)[source]¶ Prints the (logical and physical) plans to the console for debugging purpose.
Parameters: extended – boolean, default False. If False, prints only the physical plan. Examples
>>> ds.explain()
-
fillna
(value, subset=None)[source]¶ Replace null values
Parameters: - value – int, long, float, string, bool or dict. Value to replace null values with. If the value is a dict, then subset is ignored and value must be a mapping from column name (string) to replacement value. The replacement value must be an int, long, float, boolean, or string.
- subset – optional list of column names to consider. Columns specified in subset that do not have matching data type are ignored. For example, if value is a string, and subset contains a non-string column, then the non-string column is simply ignored.
Returns: Return type: Datastream
Examples
>>> ds.fill(50).show() >>> ds.fill({'col1': 50, 'col2': 'unknown'}).show()
-
filter
(condition)[source]¶ Filters rows using the given condition
Parameters: condition – a Column of types.BooleanType or a string of SQL expression. Returns: this will return a new datastream object with blank metadata Return type: DataStream Examples
>>> ds.filter("age > 3") >>> df.filter(df.age > 3)
-
filter_user
(user_ids: List)[source]¶ filter data to get only selective users’ data
Parameters: user_ids (List[str]) – list of users’ UUIDs Returns: this will return a new datastream object with blank metadata Return type: DataStream
-
filter_version
(version: List)[source]¶ filter data to get only selective users’ data
Parameters: version (List[str]) – list of stream versions Returns: this will return a new datastream object with blank metadata Return type: DataStream Todo
Metadata version should be return with the data
-
first
()[source]¶ Returns the first row as a Row.
Returns: First row of a DataStream Examples
>>> ds.first()
-
foreach
(f)[source]¶ Applies the f function to all Row of DataStream. This is a shorthand for df.rdd.foreach()
Parameters: f – function Returns: DataStream object Examples
>>> def f(person): ... print(person.name) >>> ds.foreach(f)
-
freqItems
(cols, support=None)[source]¶ Finding frequent items for columns, possibly with false positives. Using the frequent element count algorithm described in “http://dx.doi.org/10.1145/762471.762473, proposed by Karp, Schenker, and Papadimitriou”.
Returns: Return type: DataStream Examples
>>> ds.freqItems("col-name")
-
get_metadata
() → cerebralcortex.core.metadata_manager.stream.metadata.Metadata[source]¶ get stream metadata
Returns: single version of a stream Return type: Metadata Raises: Exception
– if specified version is not available for the stream
-
groupby
(*cols)[source]¶ Groups the DataFrame using the specified columns, so we can run aggregation on them. This method will return pyspark.sql.GroupedData object.
Parameters: of columns to group by. Each element should be a column name (list) – Returns:
-
head
(n=None)[source]¶ Returns the first n rows.
Parameters: n (int) – default 1. Number of rows to return. Returns: If n is greater than 1, return a list of Row. If n is 1, return a single Row. Notes
This method should only be used if the resulting array is expected to be small, as all the data is loaded into the driver’s memory.
Examples
>>> ds.head(5)
-
intersect
(other)[source]¶ Return a new DataFrame containing rows only in both this frame and another frame. This is equivalent to INTERSECT in SQL.
Parameters: other (int) – DataStream object Returns: If n is greater than 1, return a list of Row. If n is 1, return a single Row. Examples
>>> ds.intersect(other=ds2)
-
intersectAll
(other)[source]¶ Return a new DataFrame containing rows in both this dataframe and other dataframe while preserving duplicates.
Parameters: other (int) – DataStream object Returns: If n is greater than 1, return a list of Row. If n is 1, return a single Row. Examples
>>> ds.intersectAll(ds2).show()
-
join
(other, on=None, how=None)[source]¶ Joins with another DataStream, using the given join expression.
Parameters: - other (DataStream) – Right side of the join
- – a string for the join column name, a list of column names, a join expression (on) –
- how (str) – inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, and left_anti.
Examples
>>> ds.join(ds2, 'user', 'outer').show()
Returns: DataStream object with joined streams
-
join_stress_streams
(dataStream, propagation='forward')[source]¶ filter data
Parameters: - columnName (str) – name of the column
- operator (str) – basic operators (e.g., >, <, ==, !=)
- value (Any) – if the columnName is timestamp, please provide python datatime object
Returns: this will return a new datastream object with blank metadata
Return type:
-
limit
(num)[source]¶ Limits the result count to the number specified.
Parameters: num – Returns: Return type: Datastream
-
map_stream
(window_ds)[source]¶ Map/join a stream to a windowed stream
Parameters: window_ds (Datastream) – windowed datastream object Returns: joined/mapped stream Return type: Datastream
-
repartition
(numPartitions, *cols)[source]¶ Returns a new DataStream partitioned by the given partitioning expressions. The resulting DataStream is hash partitioned.
numPartitions can be an int to specify the target number of partitions or a Column. If it is a Column, it will be used as the first partitioning column. If not specified, the default number of partitions is used.
Parameters: - numPartitions –
- *cols –
Returns:
-
replace
(to_replace, value, subset=None)[source]¶ Returns a new DataStream replacing a value with another value. Values to_replace and value must have the same type and can only be numerics, booleans, or strings. Value can have None. When replacing, the new value will be cast to the type of the existing column. For numeric replacements all values to be replaced should have unique floating point representation. In case of conflicts (for example with {42: -1, 42.0: 1}) and arbitrary replacement will be used.
Parameters: - to_replace – bool, int, long, float, string, list or dict. Value to be replaced. If the value is a dict, then value is ignored or can be omitted, and to_replace must be a mapping between a value and a replacement.
- value – bool, int, long, float, string, list or None. The replacement value must be a bool, int, long, float, string or None. If value is a list, value should be of the same length and type as to_replace. If value is a scalar and to_replace is a sequence, then value is used as a replacement for each item in to_replace.
- subset – optional list of column names to consider. Columns specified in subset that do not have matching data type are ignored. For example, if value is a string, and subset contains a non-string column, then the non-string column is simply ignored.
Returns: Return type: Datastream
Examples
>>> ds.replace(10, 20).show() >>> ds.replace('some-str', None).show() >>> ds.replace(['old_val1', 'new_val1'], ['old_val2', 'new_val2'], 'col_name').show()
-
select
(*cols)[source]¶ Projects a set of expressions and returns a new DataStream :param cols: list of column names (string) or expressions (Column). If one of the column names is ‘*’, that column is expanded to include all columns in the current DataStream :type cols: str
Returns: this will return a new datastream object with selected columns Return type: DataStream Examples
>>> ds.select('*') >>> ds.select('name', 'age') >>> ds.select(ds.name, (ds.age + 10).alias('age'))
-
selectExpr
(*expr)[source]¶ This is a variant of select() that accepts SQL expressions. Projects a set of expressions and returns a new DataStream
Parameters: expr (str) – Returns: this will return a new datastream object with selected columns Return type: DataStream Examples
>>> ds.selectExpr("age * 2")
-
show
(n=20, truncate=True, vertical=False)[source]¶ Parameters: - n – Number of rows to show.
- truncate – If set to
True
, truncate strings longer than 20 chars by default. - set to a number greater than one, truncates long strings to length truncate (If) –
- align cells right. (and) –
- vertical – If set to
True
, print output rows vertically (one line - column value) (per) –
Returns:
-
sort
(*cols, **kwargs)[source]¶ Returns a new DataStream sorted by the specified column(s).
Parameters: - cols – list of Column or column names to sort by.
- ascending – boolean or list of boolean (default True). Sort ascending vs. descending. Specify list for multiple sort orders. If a list is specified, length of the list must equal length of the cols.
Returns: DataStream object
Return type: object
Examples
>>> ds.sort("col_name", ascending=False)
-
summary
(*statistics)[source]¶ Computes specified statistics for numeric and string columns. Available statistics are: - count - mean - stddev - min - max - arbitrary approximate percentiles specified as a percentage (eg, 75%) If no statistics are given, this function computes count, mean, stddev, min, approximate quartiles (percentiles at 25%, 50%, and 75%), and max.
Parameters: *statistics – Examples
>>> ds.summary().show() >>> ds.summary("count", "min", "25%", "75%", "max").show() >>> # To do a summary for specific columns first select them: >>> ds.select("col1", "col2").summary("count").show()
-
take
(num)[source]¶ Returns the first num rows as a list of Row.
Returns: row(s) of a DataStream Return type: Row(list) Examples
>>> ds.take()
-
toPandas
()[source]¶ This method converts pyspark dataframe into pandas dataframe.
Notes
This method will collect all the data on master node to convert pyspark dataframe into pandas dataframe. After converting to pandas dataframe datastream objects helper methods will not be accessible.
Returns: this will return a new datastream object with blank metadata Return type: Datastream (Metadata, pandas.DataFrame) Examples
>>> CC = CerebralCortex("/directory/path/of/configs/") >>> ds = CC.get_stream("STREAM-NAME") >>> new_ds = ds.toPandas() >>> new_ds.data.head()
-
union
(other)[source]¶ Return a new Datastream containing union of rows in this and another frame.
This is equivalent to UNION ALL in SQL. To do a SQL-style set union (that does deduplication of elements), use this function followed by distinct().
Also as standard in SQL, this function resolves columns by position (not by name).
Parameters: other (DataStream) – Returns: Return type: Datastream Examples
>>> ds.union(ds2).collect()
-
unionByName
(other)[source]¶ Returns a new Datastream containing union of rows in this and another frame.
This is different from both UNION ALL and UNION DISTINCT in SQL. To do a SQL-style set union (that does deduplication of elements), use this function followed by distinct().
The difference between this function and union() is that this function resolves columns by name (not by position):
Parameters: other (DataStream) – Returns: Return type: Datastream Examples
>>> ds.unionByName(ds2).show()
-
where
(condition)[source]¶ where() is an alias for filter().
Parameters: condition – Returns: Return type: Datastream Examples
>>> ds.filter("age > 3").collect()
-
window
(windowDuration: int = None, groupByColumnName: List[str] = [], slideDuration: int = None, startTime=None, preserve_ts=False)[source]¶ Window data into fixed length chunks. If no columnName is provided then the windowing will be performed on all the columns.
Parameters: - windowDuration (int) – duration of a window in seconds
- List[str] (groupByColumnName) – groupby column names, for example, groupby user, col1, col2
- slideDuration (int) – slide duration of a window
- startTime (datetime) – The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start window intervals. For example, in order to have hourly tumbling windows that start 15 minutes past the hour, e.g. 12:15-13:15, 13:15-14:15… provide startTime as 15 minutes. First time of data will be used as startTime if none is provided
- preserve_ts (bool) – setting this to True will return timestamps of corresponding to each windowed value
Returns: this will return a new datastream object with blank metadata
Return type: Note
This windowing method will use collect_list to return values for each window. collect_list is not optimized.
-
withColumn
(colName, col)[source]¶ Returns a new DataStream by adding a column or replacing the existing column that has the same name. The column expression must be an expression over this DataStream; attempting to add a column from some other datastream will raise an error. :param colName: name of the new column. :type colName: str :param col: a Column expression for the new column.
Examples
>>> ds.withColumn('col_name', ds.col_name + 2)
-
withColumnRenamed
(existing, new)[source]¶ Returns a new DataStream by renaming an existing column. This is a no-op if schema doesn’t contain the given column name.
Parameters: - existing (str) – string, name of the existing column to rename.
- new (str) – string, new name of the column.
Examples
>>> ds.withColumnRenamed('col_name', 'new_col_name')
Returns: DataStream object with new column name(s)
-