self.client = trainingmgr_main.APP.test_client(self)
self.logger = trainingmgr_main.LOGGER
- feature_group_data2=('testing_hash','pdcpBytesDl,pdcpBytesUl','InfluxSource',False,'','','','','','')
+ feature_group_data2=('testing_hash','pdcpBytesDl,pdcpBytesUl','InfluxSource',False,'','','','','','', '')
@patch('trainingmgr.trainingmgr_main.check_feature_group_data', return_value=feature_group_data2)
@patch('trainingmgr.trainingmgr_main.get_feature_group_by_name_db', return_value=False)
@patch('trainingmgr.trainingmgr_main.add_featuregroup')
"bucket": "",
"token": "",
"source_name": "",
+ "measured_obj_class":"",
"dbOrg": ""
}
expected_response=b'{"result": "Feature Group Created"}'
the_response1.headers={"content-type": "application/json"}
the_response1._content = b''
mocked_TRAININGMGR_CONFIG_OBJ=mock.Mock(name="TRAININGMGR_CONFIG_OBJ")
- feature_group_data2=('testing_hash','pdcpBytesDl,pdcpBytesUl','InfluxSource',True,'127.0.0.1','31823','pm-bucket','','','')
+ feature_group_data2=('testing_hash','pdcpBytesDl,pdcpBytesUl','InfluxSource',True,'127.0.0.1','31823','pm-bucket','','','','')
@patch('trainingmgr.trainingmgr_main.check_feature_group_data', return_value=feature_group_data2)
@patch('trainingmgr.trainingmgr_main.get_feature_group_by_name_db', return_value=False)
@patch('trainingmgr.trainingmgr_main.add_featuregroup')
"bucket": "",
"token": "",
"source_name": "",
+ "measured_obj_class":"",
"dbOrg": ""
}
expected_response=b'{"result": "Feature Group Created"}'
the_response2.headers={"content-type": "application/json"}
the_response2._content = b''
mocked_TRAININGMGR_CONFIG_OBJ=mock.Mock(name="TRAININGMGR_CONFIG_OBJ")
- feature_group_data3=('testing_hash','pdcpBytesDl,pdcpBytesUl','InfluxSource',True,'127.0.0.1','31823','pm-bucket','','','')
+ feature_group_data3=('testing_hash','pdcpBytesDl,pdcpBytesUl','InfluxSource',True,'127.0.0.1','31823','pm-bucket','','','','')
@patch('trainingmgr.trainingmgr_main.check_feature_group_data', return_value=feature_group_data3)
@patch('trainingmgr.trainingmgr_main.get_feature_group_by_name_db', return_value=False)
@patch('trainingmgr.trainingmgr_main.add_featuregroup')
"bucket": "",
"token": "",
"source_name": "",
+ "measured_obj_class":"",
"dbOrg": ""
}
expected_response=b'{"Exception": "Cannot create dme job"}'
assert response.status_code ==status.HTTP_400_BAD_REQUEST, "Return status code not equal"
- feature_group_data3=('testing_hash','pdcpBytesDl,pdcpBytesUl','InfluxSource',True,'127.0.0.1','31823','pm-bucket','','','')
+ feature_group_data3=('testing_hash','pdcpBytesDl,pdcpBytesUl','InfluxSource',True,'127.0.0.1','31823','pm-bucket','','','','')
@patch('trainingmgr.trainingmgr_main.check_feature_group_data', return_value=feature_group_data3)
@patch('trainingmgr.trainingmgr_main.get_feature_group_by_name_db', return_value=False)
@patch('trainingmgr.trainingmgr_main.add_featuregroup',side_effect = Exception('Mocked error'))
"bucket": "",
"token": "",
"source_name": "",
+ "measured_obj_class":"",
"dbOrg": ""
}
expected_response=b'{"Exception": "Failed to create the feature Group "}'
assert response.data==expected_response
assert response.status_code ==status.HTTP_500_INTERNAL_SERVER_ERROR, "Return status code not equal"
- feature_group_data3=('testing_hash!@','pdcpBytesDl,pdcpBytesUl','InfluxSource',True,'127.0.0.1','31823','pm-bucket','','','')
+ feature_group_data3=('testing_hash!@','pdcpBytesDl,pdcpBytesUl','InfluxSource',True,'127.0.0.1','31823','pm-bucket','','','','')
@patch('trainingmgr.trainingmgr_main.check_feature_group_data', return_value=feature_group_data3)
@patch('trainingmgr.trainingmgr_main.get_feature_group_by_name_db', return_value=True)
def test_neagtive_create_featuregroup_3(self, mock1, mock2):
"bucket": "",
"token": "",
"source_name": "",
+ "measured_obj_class":"",
"dbOrg": ""
}
expected_response=b'{"Exception": "Failed to create the feature group since feature group not valid or already present"}'
def test_success(self, mock1):
training_config_obj = DummyVariable()
source_name=""
- db_org=""
- bucket_name=""
- token=""
features=[]
feature_group_name="test"
host="10.0.0.50"
port="31840"
- response=trainingmgr_operations.create_dme_filtered_data_job(training_config_obj, source_name, db_org, bucket_name, token, features, feature_group_name, host, port)
+ measured_obj_class="NRCellDU"
+ response=trainingmgr_operations.create_dme_filtered_data_job(training_config_obj, source_name, features, feature_group_name, host, port, measured_obj_class)
assert response.status_code==status.HTTP_201_CREATED, "create_dme_filtered_data_job failed"
def test_create_url_host_port_fail(self):
training_config_obj = DummyVariable()
source_name=""
- db_org=""
- bucket_name=""
- token=""
features=[]
feature_group_name="test"
+ measured_obj_class="NRCellDU"
host="url error"
port="31840"
try:
- response=trainingmgr_operations.create_dme_filtered_data_job(training_config_obj, source_name, db_org, bucket_name, token, features, feature_group_name, host, port)
+ response=trainingmgr_operations.create_dme_filtered_data_job(training_config_obj, source_name, features, feature_group_name, host, port, measured_obj_class)
assert False
except TMException as err:
assert "URL validation error: " in err.message
"bucket": "",
"token": "",
"source_name": "",
- "dbOrg": ""
+ "dbOrg": "",
+ "measured_obj_class":""
+
}
- expected_data=("test", "", "",False,"","","","","","")
+ expected_data=("test", "", "",False,"","","","","","","")
assert check_feature_group_data(json_data)==expected_data, "data not equal"
@patch('trainingmgr.common.trainingmgr_util.check_key_in_dictionary',return_value=False)
"bucket": "",
"token": "",
"source_name": "",
- "dbOrg": ""
+ "dbOrg": "",
+ "measured_obj_class":""
}
- expected_data=("test", "", "",False,"","","","","","")
+ expected_data=("test", "", "",False,"","","","","","","")
try:
assert check_feature_group_data(json_data)==expected_data, 'data not equal'
assert False
return response
-def create_dme_filtered_data_job(training_config_obj, source_name, db_org, bucket_name,
- token, features, feature_group_name, host, port):
+def create_dme_filtered_data_job(training_config_obj, source_name, features, feature_group_name,host, port ,measured_obj_class):
"""
This function calls Non-RT RIC DME APIs for creating filter PM data jobs.
"""
logger = training_config_obj.logger
-
- job_json = {
- "info_type_id": "json-file-data-from-filestore-to-influx",
+ job_json = {
+ "info_type_id": "PmData",
"job_owner": "console",
- "status_notification_uri": "http://callback.nonrtric:80/post",
- "job_definition": { "db-url":"http://influxdb2.nonrtric:8086",
- "db-org":db_org,
- "db-bucket":bucket_name,
- "db-token": token,
- "filterType":"pmdata",
- "filter":
- {"sourceNames":[source_name],
- "measTypes":features}}}
+ "job_definition": {
+ "filter":{
+ "sourceNames":[source_name],
+ "measObjInstIds": [],
+ "measTypeSpecs": [{
+ "measuredObjClass": measured_obj_class,
+ "measTypes":features
+ }],
+ "measuredEntityDns": []
+ },
+ "deliveryInfo": {
+ "topic": "pmreports",
+ "bootStrapServers": "kafka-1-kafka-bootstrap.nonrtric:9097"
+ }
+
+ }
+ }
+
headers = {'Content-type': MIMETYPE_JSON}
logger.debug(url)
logger.debug(json.dumps(job_json))
response = requests.put(url, data=json.dumps(job_json), headers=headers)
-
return response
def delete_dme_filtered_data_job(training_config_obj, feature_group_name, host, port):
try:
if check_key_in_dictionary(["featureGroupName", "feature_list", \
"datalake_source", "enable_Dme", "DmeHost",
- "DmePort", "bucket", "token", "source_name"], json_data):
+ "DmePort", "bucket", "token", "source_name", "measured_obj_class"], json_data):
feature_group_name=json_data["featureGroupName"]
features=json_data["feature_list"]
datalake_source=json_data["datalake_source"]
token=json_data["token"]
source_name=json_data["source_name"]
db_org=json_data["dbOrg"]
+ measured_obj_class = json_data["measured_obj_class"]
else :
raise TMException("check_featuregroup_data- supplied data doesn't have" + \
except Exception as err:
raise APIException(status.HTTP_400_BAD_REQUEST, str(err)) from None
- return (feature_group_name, features, datalake_source, enable_dme, dme_host, dme_port, bucket, token, source_name,db_org)
+ return (feature_group_name, features, datalake_source, enable_dme, dme_host, dme_port, bucket, token, source_name,db_org, measured_obj_class)
def get_one_key(dictionary):
'''
if conn is not None:
conn.close()
-def add_featuregroup(feature_group_name, feature_list, datalake_source, enable_dme, ps_db_obj , dmehost="", dmeport="", bucket="", token="", source_name="",db_org=""):
+def add_featuregroup(feature_group_name, feature_list, datalake_source, enable_dme, ps_db_obj, measured_obj_class="" , dmehost="", dmeport="", bucket="", token="", source_name="",db_org=""):
"""
This function add the new row or update existing row with given information
"""
try:
cursor.execute(''' INSERT INTO {} VALUES '''.format(fg_table_name) +
- '''(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''',
- (feature_group_name, feature_list, datalake_source, enable_dme, dmehost, dmeport, bucket, token, source_name, db_org))
+ '''(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s, %s)''',
+ (feature_group_name, feature_list, datalake_source, enable_dme, dmehost, measured_obj_class, dmeport, bucket, token, source_name, db_org))
conn.commit()
cursor.close()
except Exception as err:
"datalake_source varchar(2000) NOT NULL," + \
"enable_dme BOOLEAN NOT NULL," + \
"DmeHost varchar(128) NOT NULL," + \
+ "measured_obj_class varchar(128) NOT NULL," + \
"DmePort varchar(128) NOT NULL," + \
"bucket varchar(128) NOT NULL," + \
"token varchar(2000) NOT NULL," + \
try:
json_data=request.json
- (feature_group_name, features, datalake_source, enable_dme, dme_host, dme_port, bucket, token, source_name,db_org)=check_feature_group_data(json_data)
+ (feature_group_name, features, datalake_source, enable_dme, dme_host, dme_port,bucket, token, source_name,db_org, measured_obj_class)=check_feature_group_data(json_data)
# check the data conformance
LOGGER.debug("the db info is : ", get_feature_group_by_name_db(PS_DB_OBJ, feature_group_name))
else:
# the features are stored in string format in the db, and has to be passed as list of feature to the dme. Hence the conversion.
features_list = features.split(",")
- add_featuregroup(feature_group_name, features, datalake_source, enable_dme, PS_DB_OBJ,dme_host, dme_port, bucket, token, source_name,db_org )
+ add_featuregroup(feature_group_name, features, datalake_source, enable_dme, PS_DB_OBJ,measured_obj_class,dme_host, dme_port, bucket, token, source_name,db_org )
if enable_dme == True :
- response= create_dme_filtered_data_job(TRAININGMGR_CONFIG_OBJ, source_name, db_org, bucket, token, features_list, feature_group_name, dme_host, dme_port)
+ response= create_dme_filtered_data_job(TRAININGMGR_CONFIG_OBJ, source_name, features_list, feature_group_name, dme_host, dme_port, measured_obj_class)
if response.status_code != 201:
api_response={"Exception": "Cannot create dme job"}
delete_feature_group_by_name(PS_DB_OBJ, feature_group_name)