Source code for cerebralcortex.test_suite.test_nosql_storage

# Copyright (c) 2019, MD2K Center of Excellence
# - Nasir Ali <nasir.ali08@gmail.com>
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

import pandas as pd
from datetime import datetime

from pyspark.sql import functions as F

from cerebralcortex.core.datatypes import DataStream
from cerebralcortex.core.metadata_manager.stream import Metadata
from cerebralcortex.test_suite.util.data_helper import gen_phone_battery_data, gen_phone_battery_metadata

[docs]class NoSqlStorageTest:
[docs] def test_01_save_stream(self): """ Test functionality related to save a stream """ data = gen_phone_battery_data() metadata = gen_phone_battery_metadata() ds = DataStream(data, metadata) ww = ds.window() ww.show() result = self.CC.save_stream(ds) self.assertEqual(result, True)
[docs] def test_02_stream(self): all_streams = self.CC.list_streams() searched_streams = self.CC.search_stream(stream_name="battery") self.assertTrue(len(all_streams)>0) self.assertEqual(len(searched_streams),1) self.assertEqual(searched_streams[0],self.stream_name)
[docs] def test_03_get_stream(self): """ Test functionality related to get a stream """ ds = self.CC.get_stream(self.stream_name) data = ds metadata = ds.metadata datapoint = data.take(1) self.assertEqual(datapoint[0][0], datetime(2019, 1, 9, 11, 50, 30)) self.assertEqual(datapoint[0][2], 91) self.assertEqual(datapoint[0][4], 1) self.assertEqual(datapoint[0][3], self.user_id) self.assertTrue(data.count()> 500) self.assertEqual(len(metadata.data_descriptor), 5) self.assertEqual(len(metadata.modules), 1) self.assertEqual(metadata.get_hash(), self.metadata_hash) self.assertEqual(metadata.name, self.stream_name) self.assertEqual(metadata.version, int(self.stream_version)) self.assertEqual(metadata.modules[0].name, 'battery') self.assertEqual(metadata.modules[0].version, '1.2.4') self.assertEqual(metadata.modules[0].authors[0].get("test_user"), 'test_user@test_email.com')
[docs] def test_04_get_storage_path(self): result = self.CC.RawData._get_storage_path() self.assertTrue(len(result)>0)
[docs] def test_05_path_exist(self): result = self.CC.RawData._path_exist() self.assertEqual(result, True)
[docs] def test_06_ls_dir(self): result = self.CC.RawData._ls_dir() self.assertTrue(len(result)>0)
[docs] def test_07_create_dir(self): result = self.CC.RawData._create_dir() self.assertTrue(len(result)>0)
[docs] def test_08_write_pandas_to_parquet_file(self): test_list = [['a','b','c'], ['AA','BB','CC']] pdf = pd.DataFrame(test_list, columns=['col_A', 'col_B', 'col_C']) result = self.CC.RawData.write_pandas_to_parquet_file(df=pdf, stream_name="pandas-to-parquet-test-stream", stream_version=1, user_id="test_user") self.assertTrue(len(result)>0)
[docs] def test_09_is_study(self): result = self.CC.RawData.is_study() self.assertEqual(result, True)
[docs] def test_10_is_stream(self): result = self.CC.RawData.is_stream(stream_name=self.stream_name) self.assertEqual(result, True)
[docs] def test_11_get_stream_versions(self): result = self.CC.RawData.get_stream_versions(stream_name=self.stream_name) self.assertTrue(len(result)>0)
[docs] def test_12_list_streams(self): result = self.CC.RawData.list_streams() print("Stream names", result) self.assertTrue(len(result)>0)
# def test_list_users(self): # result = self.CC.RawData.list_users(stream_name=self.stream_name) # self.assertTrue(len(result)>0)
[docs] def test_14_search_stream(self): result = self.CC.RawData.search_stream(stream_name="battery") self.assertTrue(len(result)>0)