Source code for cerebralcortex.test_suite.test_stream

# Copyright (c) 2019, MD2K Center of Excellence
# - Nasir Ali <nasir.ali08@gmail.com>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

from datetime import datetime
from pyspark.sql import functions as F
from cerebralcortex.core.datatypes import DataStream
from cerebralcortex.core.metadata_manager.stream import Metadata
from cerebralcortex.test_suite.util.data_helper import gen_phone_battery_data, gen_phone_battery_metadata


[docs]class DataStreamTest:
[docs] def test_01_save_stream(self): """ Test functionality related to save a stream """ data = gen_phone_battery_data() metadata = gen_phone_battery_metadata() ds = DataStream(data, metadata) #dd = ds.filter_user("dfce1e65-2882-395b-a641-93f31748591b") result = self.CC.save_stream(ds) self.assertEqual(result, True)
# def test_02_stream(self): # all_streams = self.CC.list_streams() # searched_streams = self.CC.search_stream(stream_name="battery") # # self.assertEqual(len(all_streams),1) # self.assertEqual(all_streams[0].name,self.stream_name) # self.assertEqual(all_streams[0].metadata_hash,self.metadata_hash) # # self.assertEqual(len(searched_streams),2) # self.assertEqual(searched_streams[0],self.stream_name) # # def test_04_test_datafram_operations(self): # ds = self.CC.get_stream(self.stream_name) # avg_ds = ds.compute_average() # data = avg_ds.collect() # self.assertEqual(len(data),17) # self.assertEqual(data[0][2],92.18333333333334) # # ds = self.CC.get_stream(self.stream_name) # window_ds = ds.window() # data = window_ds.collect() # self.assertEqual(len(data),17) # self.assertEqual(len(data[0][2]), 60)
[docs] def test_05_map_window_to_stream(self): def get_val(lst): return lst[0] sum_vals_udf = F.udf(get_val) ds = self.CC.get_stream(self.stream_name) win_ds = ds.window() # convert window stream as quality stream for next step win_df=win_ds.data.withColumn("some_val", sum_vals_udf(win_ds.data["battery_level"])).drop("battery_level") from pyspark.sql.functions import pandas_udf, PandasUDFType import pandas as pd from pyspark.sql.types import StructField, StructType, StringType, FloatType schema = StructType([ StructField("mean", FloatType()), StructField("val_1", FloatType()), StructField("val_2", FloatType()) ]) @pandas_udf(schema, PandasUDFType.GROUPED_MAP) # doctest: +SKIP def mean_udf(v): print(v.dtypes) all_cols = [99,23,1.3,2.5] df = pd.DataFrame(all_cols, columns=['mean', 'val_1', 'val_2']) return df win_ds=DataStream(data=win_ds.data.drop("window"), metadata=Metadata()) new_ds = win_ds.groupby("user").compute(mean_udf) print(new_ds.data.columns) sd = new_ds.collect() df = win_df.withColumn("quality", F.when(win_df.some_val > 97, 1).otherwise(0)).drop("some_val") win_quality_ds = DataStream(data=df, metadata=Metadata()) mapped_stream = ds.map_stream(win_quality_ds) filtered_stream = mapped_stream.filter("quality", "=", 0) bad_quality = filtered_stream.collect() self.assertEqual(len(bad_quality.data), 710)
# def test_03_get_stream(self): # """ # Test functionality related to get a stream # # """ # ds = self.CC.get_stream(self.stream_name) # data = ds.data # metadata = ds.metadata[0] # # datapoint = data.take(1) # # self.assertEqual(datapoint[0][0], datetime(2019, 1, 9, 11, 49, 28)) # self.assertEqual(datapoint[0][1], 92) # self.assertEqual(datapoint[0][2], 1) # self.assertEqual(datapoint[0][3], self.user_id) # self.assertEqual(data.count(), 999) # # self.assertEqual(len(metadata.data_descriptor), 1) # self.assertEqual(len(metadata.modules), 1) # # self.assertEqual(metadata.metadata_hash, self.metadata_hash) # self.assertEqual(metadata.name, self.stream_name) # self.assertEqual(metadata.version, int(self.stream_version)) # self.assertEqual(metadata.data_descriptor[0].name, 'battery_level') # self.assertEqual(metadata.data_descriptor[0].type, 'LongType') # self.assertEqual(metadata.data_descriptor[0].attributes.get("description"), 'current battery charge') # self.assertEqual(metadata.modules[0].name, 'battery') # self.assertEqual(metadata.modules[0].version, '1.2.4') # self.assertEqual(metadata.modules[0].authors[0].get("test_user"), 'test_user@test_email.com')