checker = Check()
try:
db_obj = db_helper([[None]], raise_exception=True, check_success_obj=checker)
- add_update_trainingjob('Testing ', 'qoe-pipeline', 'Default', '*', '{epoches : 1}', '', True, True, 'Near RT-RIC', 1, 'InfluxSource', 'Tester'
- ,True, '', '', db_obj, '', 'liveCell', 'UEData')
+ add_update_trainingjob('Testing ', 'qoe-pipeline', 'Default', '*', '{epoches : 1}', '', True, True, 1, 'InfluxSource', 'Tester'
+ ,db_obj, '','','','','')
assert checker.finished, 'add_update_trainingjob FAILED, When adding = True'
except Exception as err:
fxn_name = "add_update_trainingjob"
- assert str(err) == "add_update_trainingjob() takes from 12 to 18 positional arguments but 19 were given", 'Negative test {} FAILED when adding = True , Doesnt returned required error'.format(fxn_name)
+ assert str(err) == "add_update_trainingjob() takes from 12 to 16 positional arguments but 17 were given", 'Negative test {} FAILED when adding = True , Doesnt returned required error'.format(fxn_name)
def test_get_all_jobs_latest_status_version(self):
db_obj = db_helper([["usecase_name"]])
attrs_TRAININGMGR_CONFIG_OBJ = {'pipeline.return_value':''}
mocked_TRAININGMGR_CONFIG_OBJ.configure_mock(**attrs_TRAININGMGR_CONFIG_OBJ)
@patch('trainingmgr.trainingmgr_main.validate_trainingjob_name', return_value = False)
- @patch('trainingmgr.trainingmgr_main.check_trainingjob_data', return_value = ("group1", 'unittest', 'qoe', 'experiment1', 'arguments1', 'query1', True, 1, 'cassandra db', 2, 'bucket1',True, ""))
+ @patch('trainingmgr.trainingmgr_main.check_trainingjob_data', return_value = ("group1", 'unittest', 'qoe', 'experiment1', 'arguments1', 'query1', True, 1, 'cassandra db',True, ""))
@patch('trainingmgr.trainingmgr_main.TRAININGMGR_CONFIG_OBJ', return_value = mocked_TRAININGMGR_CONFIG_OBJ)
@patch('trainingmgr.trainingmgr_main.get_model_info', return_value=model_info_json)
@patch('trainingmgr.trainingmgr_main.json.loads',return_value={'timeseries':''})
"description":"uc1",
"pipeline_version":"3",
"datalake_source":"InfluxSource",
- "_measurement":"liveCell",
- "bucket":"UEData",
"is_mme":True,
"model_name": ""
}
attrs_TRAININGMGR_CONFIG_OBJ = {'pipeline.return_value':''}
mocked_TRAININGMGR_CONFIG_OBJ.configure_mock(**attrs_TRAININGMGR_CONFIG_OBJ)
@patch('trainingmgr.trainingmgr_main.validate_trainingjob_name', return_value = False)
- @patch('trainingmgr.trainingmgr_main.check_trainingjob_data', return_value = ("group1", 'unittest', 'qoe', 'experiment1', 'arguments1', 'query1', True, 1, 'cassandra db', 2, 'bucket1',True, ""))
+ @patch('trainingmgr.trainingmgr_main.check_trainingjob_data', return_value = ("group1", 'unittest', 'qoe', 'experiment1', 'arguments1', 'query1', True, 1, 'cassandra db',True, ""))
@patch('trainingmgr.trainingmgr_main.TRAININGMGR_CONFIG_OBJ', return_value = mocked_TRAININGMGR_CONFIG_OBJ)
@patch('trainingmgr.trainingmgr_main.get_model_info', return_value=model_info_json)
@patch('trainingmgr.trainingmgr_main.json.loads',return_value='')
@patch('trainingmgr.common.trainingmgr_util.isinstance',return_value=True)
def test_check_trainingjob_data(self,mock1,mock2):
usecase_name = "usecase8"
- json_data = { "description":"unittest", "featureGroup_name": "group1" , "pipeline_name":"qoe" , "experiment_name":"experiment1" , "arguments":"arguments1" , "query_filter":"query1" , "enable_versioning":True , "target_deployment":"Near RT RIC" , "pipeline_version":1 , "datalake_source":"cassandra db" , "incremental_training":True , "model":"usecase7" , "model_version":1 , "_measurement":2 , "bucket":"bucket1", "is_mme":False, "model_name":""}
+ json_data = { "description":"unittest", "featureGroup_name": "group1" , "pipeline_name":"qoe" , "experiment_name":"experiment1" , "arguments":"arguments1" , "query_filter":"query1" , "enable_versioning":True , "target_deployment":"Near RT RIC" , "pipeline_version":1 , "datalake_source":"cassandra db" , "incremental_training":True , "model":"usecase7" , "model_version":1 , "is_mme":False, "model_name":""}
- expected_data = ("group1", 'unittest', 'qoe', 'experiment1', 'arguments1', 'query1', True, 1, 'cassandra db', 2, 'bucket1',False, "")
+ expected_data = ("group1", 'unittest', 'qoe', 'experiment1', 'arguments1', 'query1', True, 1, 'cassandra db',False, "")
assert check_trainingjob_data(usecase_name, json_data) == expected_data,"data not equal"
def test_negative_check_trainingjob_data_1(self):
"pipeline_name", "experiment_name",
"arguments", "enable_versioning",
"datalake_source", "description",
- "query_filter", "_measurement",
- "bucket", "is_mme", "model_name"], json_data):
+ "query_filter",
+ "is_mme", "model_name"], json_data):
description = json_data["description"]
feature_list = json_data["featureGroup_name"]
enable_versioning = json_data["enable_versioning"]
pipeline_version = json_data["pipeline_version"]
datalake_source = json_data["datalake_source"]
- _measurement = json_data["_measurement"]
- bucket = json_data["bucket"]
is_mme=json_data["is_mme"]
model_name=json_data["model_name"]
else :
str(err)) from None
return (feature_list, description, pipeline_name, experiment_name,
arguments, query_filter, enable_versioning, pipeline_version,
- datalake_source, _measurement, bucket, is_mme, model_name)
+ datalake_source, is_mme, model_name)
def check_feature_group_data(json_data):
"""
conn.close()
-def add_update_trainingjob(description, pipeline_name, experiment_name, feature_list, arguments,
+def add_update_trainingjob(description, pipeline_name, experiment_name, featuregroup_name, arguments,
query_filter, adding, enable_versioning,
pipeline_version, datalake_source, trainingjob_name, ps_db_obj, notification_url="",
- _measurement="", bucket="", is_mme="", model_name="", model_info=""):
+ is_mme="", model_name="", model_info=""):
"""
This function add the new row or update existing row with given information
"""
version = version + 1
cursor.execute('''INSERT INTO {} VALUES '''.format(tm_table_name) + \
'''(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,''' + \
- ''' %s,%s,%s,%s,%s,%s,%s,%s,%s, %s)''',
- (trainingjob_name, description, feature_list, pipeline_name,
+ ''' %s,%s,%s,%s,%s,%s,%s,%s)''',
+ (trainingjob_name, description, featuregroup_name, pipeline_name,
experiment_name, arguments_string, query_filter,
creation_time, run_id, json.dumps(steps_state),
updation_time, version,
enable_versioning, pipeline_version,
datalake_source_string, model_url, notification_url,
- _measurement, bucket, deletion_in_progress, is_mme, model_name, model_info))
+ deletion_in_progress, is_mme, model_name, model_info))
else:
cursor.execute('''update {} set description=%s, feature_list=%s, '''.format(tm_table_name) + \
'''pipeline_name=%s,experiment_name=%s,arguments=%s,''' + \
'''steps_state=%s,''' + \
'''pipeline_version=%s,updation_time=%s,enable_versioning=%s,''' + \
'''datalake_source=%s,''' + \
- '''model_url=%s, notification_url=%s, _measurement=%s, ''' + \
- '''bucket=%s, deletion_in_progress=%s, is_mme=%s, model_name=%s , model_info=%s where ''' + \
+ '''model_url=%s, notification_url=%s, ''' + \
+ '''deletion_in_progress=%s, is_mme=%s, model_name=%s , model_info=%s where ''' + \
'''trainingjob_name=%s and version=%s''',
- (description, feature_list, pipeline_name, experiment_name,
+ (description, featuregroup_name, pipeline_name, experiment_name,
arguments_string, query_filter, creation_time, run_id,
json.dumps(steps_state),
pipeline_version, updation_time, enable_versioning,
datalake_source_string, model_url, notification_url,
- _measurement, bucket, deletion_in_progress, is_mme, model_name, model_info, trainingjob_name, version))
+ deletion_in_progress, is_mme, model_name, model_info, trainingjob_name, version))
else:
cursor.execute(''' INSERT INTO {} VALUES '''.format(tm_table_name) + \
'''(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,''' + \
- '''%s,%s,%s,%s,%s,%s,%s,%s, %s, %s, %s)''',
- (trainingjob_name, description, feature_list, pipeline_name,
+ '''%s,%s,%s,%s,%s,%s,%s,%s, %s)''',
+ (trainingjob_name, description, featuregroup_name, pipeline_name,
experiment_name, arguments_string, query_filter, creation_time,
run_id, json.dumps(steps_state),
updation_time, version, enable_versioning,
pipeline_version, datalake_source_string,
- model_url, notification_url, _measurement, bucket,
+ model_url, notification_url,
deletion_in_progress, is_mme, model_name, model_info))
conn.commit()
cursor.close()
"datalake_source varchar(2000) NOT NULL," + \
"model_url varchar(100) NOT NULL," + \
"notification_url varchar(1000) NOT NULL," + \
- "_measurement varchar(100) NOT NULL," + \
- "bucket varchar(50) NOT NULL," + \
"deletion_in_progress BOOLEAN NOT NULL," + \
"is_mme BOOLEAN NOT NULL," + \
"model_name varchar(128) NOT NULL," + \
else:
(featuregroup_name, description, pipeline_name, experiment_name,
arguments, query_filter, enable_versioning, pipeline_version,
- datalake_source, _measurement, bucket, is_mme, model_name) = \
+ datalake_source, is_mme, model_name) = \
check_trainingjob_data(trainingjob_name, json_data)
model_info=""
if is_mme:
add_update_trainingjob(description, pipeline_name, experiment_name, featuregroup_name,
arguments, query_filter, True, enable_versioning,
pipeline_version, datalake_source, trainingjob_name,
- PS_DB_OBJ, _measurement=_measurement,
- bucket=bucket, is_mme=is_mme, model_name=model_name, model_info=model_info)
+ PS_DB_OBJ,is_mme=is_mme, model_name=model_name, model_info=model_info)
api_response = {"result": "Information stored in database."}
response_code = status.HTTP_201_CREATED
elif(request.method == 'PUT'):