# limitations under the License.
#
# ==================================================================================
-
import sys
import os
from dotenv import load_dotenv
def test_feature_group(self):
- request_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}}
+ request_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, "trainingjob_id": "52", 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}}
response = self.client.post("/feature-groups", data=json.dumps(request_json), content_type='application/json')
empyty_tasks()
assert response.content_type == 'application/json'
def setup_method(self):
self.client = main.app.test_client(self)
- def test_task_status(self):
+ def test_task_status(self):
+ trainingjob_id = 1
request_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}}
- main.task_map["unittest_task"] = Task(request_json ,"Accepted")
+ main.task_map["unittest_task"] = Task(request_json ,"Accepted", trainingjob_id)
response = self.client.get("/task-status/unittest_task")
main.task_map.clear()
assert response.content_type == 'application/json'
#Error test
def test_negative_task_status(self):
request_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}}
- main.task_map["unittest_task"] = Task(request_json ,"Error")
+ trainingjob_id = 1
+ main.task_map["unittest_task"] = Task(request_json ,"Error", trainingjob_id)
response = self.client.get("/task-status/unittest_task")
main.task_map.clear()
self.client = main.app.test_client(self)
def test_delete_task_status(self):
+ trainingjob_id = 1
request_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}}
- main.task_map["unittest_task"] = Task(request_json ,"Accepted")
+ main.task_map["unittest_task"] = Task(request_json ,"Accepted", trainingjob_id)
response = self.client.delete("/delete-task-status/unittest_task")
main.task_map.clear()
assert response.content_type == 'application/json'
@patch('main.factory.get_batch_pipeline')
def test_negative_async_code_worker_1(self,mock1,mock2):
- main.infinte_loop_config["infinte_run"]= "True"
+ main.infinte_loop_config["infinte_run"]= "False"
main.infinte_loop_config["unit_test_mode"]= "True"
request_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}}
response = self.client.post("/feature-groups", data=json.dumps(request_json), content_type='application/json')
#error
def test_negative_async_code_worker_2(self):
- main.infinte_loop_config["infinte_run"]= "True"
+ main.infinte_loop_config["infinte_run"]= "False"
main.infinte_loop_config["unit_test_mode"]= "True"
request_json = {'source': {'InfluxSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}}
response = self.client.post("/feature-groups", data=json.dumps(request_json), content_type='application/json')
#error with Cassandra Source
def test_negative_async_code_worker_3(self):
- main.infinte_loop_config["infinte_run"]= "True"
+ main.infinte_loop_config["infinte_run"]= "False"
main.infinte_loop_config["unit_test_mode"]= "True"
request_json = {'source': {'CassandraSource': {'query': 'from(bucket:"UEData") |> range(start: 0, stop: now()) |> filter(fn: (r) => r._measurement == "liveCell") |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")'}}, 'transform': [{'operation': 'SQLTransform', 'FeatureList': '*', 'SQLFilter': ''}], 'sink': {'CassandraSink': {'CollectionName': 'last_check3'}}}
response = self.client.post("/feature-groups", data=json.dumps(request_json), content_type='application/json')