refactoring code to new create_dme request 44/11544/4 1.1.1
authorrajdeep11 <rajdeep.sin@samsung.com>
Thu, 3 Aug 2023 15:16:26 +0000 (20:46 +0530)
committerRajdeep Singh <rajdeep.sin@samsung.com>
Fri, 4 Aug 2023 11:12:33 +0000 (11:12 +0000)
Issue-Id: AIMLFW-50

Change-Id: Ia1ba0b425973aaf1f18bf69283d8854c2af4f6d9
Signed-off-by: rajdeep11 <rajdeep.sin@samsung.com>
tests/test_tm_apis.py
tests/test_trainingmgr_operations.py
tests/test_trainingmgr_util.py
trainingmgr/common/trainingmgr_operations.py
trainingmgr/common/trainingmgr_util.py
trainingmgr/db/common_db_fun.py
trainingmgr/db/trainingmgr_ps_db.py
trainingmgr/trainingmgr_main.py

index f9b0983..88a8893 100644 (file)
@@ -851,7 +851,7 @@ class Test_create_featuregroup:
         self.client = trainingmgr_main.APP.test_client(self)
         self.logger = trainingmgr_main.LOGGER
     
-    feature_group_data2=('testing_hash','pdcpBytesDl,pdcpBytesUl','InfluxSource',False,'','','','','','')
+    feature_group_data2=('testing_hash','pdcpBytesDl,pdcpBytesUl','InfluxSource',False,'','','','','','', '')
     @patch('trainingmgr.trainingmgr_main.check_feature_group_data', return_value=feature_group_data2)
     @patch('trainingmgr.trainingmgr_main.get_feature_group_by_name_db', return_value=False)
     @patch('trainingmgr.trainingmgr_main.add_featuregroup')
@@ -866,6 +866,7 @@ class Test_create_featuregroup:
                             "bucket": "",
                             "token": "",
                             "source_name": "",
+                            "measured_obj_class":"",
                             "dbOrg": ""
                                 }
         expected_response=b'{"result": "Feature Group Created"}'
@@ -880,7 +881,7 @@ class Test_create_featuregroup:
     the_response1.headers={"content-type": "application/json"}
     the_response1._content = b''
     mocked_TRAININGMGR_CONFIG_OBJ=mock.Mock(name="TRAININGMGR_CONFIG_OBJ")
-    feature_group_data2=('testing_hash','pdcpBytesDl,pdcpBytesUl','InfluxSource',True,'127.0.0.1','31823','pm-bucket','','','')
+    feature_group_data2=('testing_hash','pdcpBytesDl,pdcpBytesUl','InfluxSource',True,'127.0.0.1','31823','pm-bucket','','','','')
     @patch('trainingmgr.trainingmgr_main.check_feature_group_data', return_value=feature_group_data2)
     @patch('trainingmgr.trainingmgr_main.get_feature_group_by_name_db', return_value=False)
     @patch('trainingmgr.trainingmgr_main.add_featuregroup')
@@ -898,6 +899,7 @@ class Test_create_featuregroup:
                             "bucket": "",
                             "token": "",
                             "source_name": "",
+                            "measured_obj_class":"",
                             "dbOrg": ""
                                 }
         expected_response=b'{"result": "Feature Group Created"}'
@@ -912,7 +914,7 @@ class Test_create_featuregroup:
     the_response2.headers={"content-type": "application/json"}
     the_response2._content = b''
     mocked_TRAININGMGR_CONFIG_OBJ=mock.Mock(name="TRAININGMGR_CONFIG_OBJ")
-    feature_group_data3=('testing_hash','pdcpBytesDl,pdcpBytesUl','InfluxSource',True,'127.0.0.1','31823','pm-bucket','','','')
+    feature_group_data3=('testing_hash','pdcpBytesDl,pdcpBytesUl','InfluxSource',True,'127.0.0.1','31823','pm-bucket','','','','')
     @patch('trainingmgr.trainingmgr_main.check_feature_group_data', return_value=feature_group_data3)
     @patch('trainingmgr.trainingmgr_main.get_feature_group_by_name_db', return_value=False)
     @patch('trainingmgr.trainingmgr_main.add_featuregroup')
@@ -930,6 +932,7 @@ class Test_create_featuregroup:
                             "bucket": "",
                             "token": "",
                             "source_name": "",
+                            "measured_obj_class":"",
                             "dbOrg": ""
                                 }
         expected_response=b'{"Exception": "Cannot create dme job"}'
@@ -940,7 +943,7 @@ class Test_create_featuregroup:
         assert response.status_code ==status.HTTP_400_BAD_REQUEST, "Return status code not equal"
 
 
-    feature_group_data3=('testing_hash','pdcpBytesDl,pdcpBytesUl','InfluxSource',True,'127.0.0.1','31823','pm-bucket','','','')
+    feature_group_data3=('testing_hash','pdcpBytesDl,pdcpBytesUl','InfluxSource',True,'127.0.0.1','31823','pm-bucket','','','','')
     @patch('trainingmgr.trainingmgr_main.check_feature_group_data', return_value=feature_group_data3)
     @patch('trainingmgr.trainingmgr_main.get_feature_group_by_name_db', return_value=False)
     @patch('trainingmgr.trainingmgr_main.add_featuregroup',side_effect = Exception('Mocked error'))
@@ -956,6 +959,7 @@ class Test_create_featuregroup:
                             "bucket": "",
                             "token": "",
                             "source_name": "",
+                            "measured_obj_class":"",
                             "dbOrg": ""
                                 }
         expected_response=b'{"Exception": "Failed to create the feature Group "}'
@@ -965,7 +969,7 @@ class Test_create_featuregroup:
         assert response.data==expected_response
         assert response.status_code ==status.HTTP_500_INTERNAL_SERVER_ERROR, "Return status code not equal"  
 
-    feature_group_data3=('testing_hash!@','pdcpBytesDl,pdcpBytesUl','InfluxSource',True,'127.0.0.1','31823','pm-bucket','','','')
+    feature_group_data3=('testing_hash!@','pdcpBytesDl,pdcpBytesUl','InfluxSource',True,'127.0.0.1','31823','pm-bucket','','','','')
     @patch('trainingmgr.trainingmgr_main.check_feature_group_data', return_value=feature_group_data3)
     @patch('trainingmgr.trainingmgr_main.get_feature_group_by_name_db', return_value=True)
     def test_neagtive_create_featuregroup_3(self, mock1, mock2):
@@ -979,6 +983,7 @@ class Test_create_featuregroup:
                             "bucket": "",
                             "token": "",
                             "source_name": "",
+                            "measured_obj_class":"",
                             "dbOrg": ""
                                 }
         expected_response=b'{"Exception": "Failed to create the feature group since feature group not valid or already present"}'
index e9bcb04..eaf93f3 100644 (file)
@@ -141,28 +141,24 @@ class Test_create_dme_filtered_data_job:
     def test_success(self, mock1):
         training_config_obj = DummyVariable()
         source_name=""
-        db_org=""
-        bucket_name=""
-        token=""
         features=[]
         feature_group_name="test"
         host="10.0.0.50"
         port="31840"
-        response=trainingmgr_operations.create_dme_filtered_data_job(training_config_obj, source_name, db_org, bucket_name, token, features, feature_group_name, host, port)
+        measured_obj_class="NRCellDU"
+        response=trainingmgr_operations.create_dme_filtered_data_job(training_config_obj, source_name, features, feature_group_name, host, port, measured_obj_class)
         assert response.status_code==status.HTTP_201_CREATED, "create_dme_filtered_data_job failed"
 
     def test_create_url_host_port_fail(self):
         training_config_obj = DummyVariable()
         source_name=""
-        db_org=""
-        bucket_name=""
-        token=""
         features=[]
         feature_group_name="test"
+        measured_obj_class="NRCellDU"
         host="url error"
         port="31840"
         try:
-            response=trainingmgr_operations.create_dme_filtered_data_job(training_config_obj, source_name, db_org, bucket_name, token, features, feature_group_name, host, port)
+            response=trainingmgr_operations.create_dme_filtered_data_job(training_config_obj, source_name, features, feature_group_name, host, port, measured_obj_class)
             assert False
         except TMException as err:
             assert "URL validation error: " in err.message
index a34314c..b4c519a 100644 (file)
@@ -564,9 +564,11 @@ class Test_check_feature_group_data:
                             "bucket": "",
                             "token": "",
                             "source_name": "",
-                            "dbOrg": ""
+                            "dbOrg": "", 
+                            "measured_obj_class":""
+
                                 }
-        expected_data=("test", "", "",False,"","","","","","")
+        expected_data=("test", "", "",False,"","","","","","","")
         assert check_feature_group_data(json_data)==expected_data, "data not equal"
 
     @patch('trainingmgr.common.trainingmgr_util.check_key_in_dictionary',return_value=False)
@@ -581,9 +583,10 @@ class Test_check_feature_group_data:
                 "bucket": "",
                 "token": "",
                 "source_name": "",
-                "dbOrg": ""
+                "dbOrg": "",
+                "measured_obj_class":""
                     }
-        expected_data=("test", "", "",False,"","","","","","")
+        expected_data=("test", "", "",False,"","","","","","","")
         try:
             assert check_feature_group_data(json_data)==expected_data, 'data not equal'
             assert False
index d45b474..3f35d21 100644 (file)
@@ -121,25 +121,32 @@ def training_start(training_config_obj, dict_data, trainingjob_name):
 
     return response
 
-def create_dme_filtered_data_job(training_config_obj, source_name, db_org, bucket_name,
-                                 token, features, feature_group_name, host, port):
+def create_dme_filtered_data_job(training_config_obj, source_name, features, feature_group_name,host, port ,measured_obj_class):
     """
     This function calls Non-RT RIC DME APIs for creating filter PM data jobs.
     """
     logger = training_config_obj.logger
-
-    job_json =  {
-        "info_type_id": "json-file-data-from-filestore-to-influx",
+    job_json = {
+        "info_type_id": "PmData",
         "job_owner": "console",
-        "status_notification_uri": "http://callback.nonrtric:80/post",
-        "job_definition": { "db-url":"http://influxdb2.nonrtric:8086",
-        "db-org":db_org,
-        "db-bucket":bucket_name,
-        "db-token": token,
-        "filterType":"pmdata",
-        "filter":
-            {"sourceNames":[source_name],
-            "measTypes":features}}}
+        "job_definition": {
+          "filter":{
+              "sourceNames":[source_name],
+               "measObjInstIds": [],
+               "measTypeSpecs": [{
+                  "measuredObjClass": measured_obj_class,
+                  "measTypes":features
+                }],
+                "measuredEntityDns": []
+          },
+          "deliveryInfo": {
+             "topic": "pmreports",
+             "bootStrapServers": "kafka-1-kafka-bootstrap.nonrtric:9097"
+          }
+
+        }
+    }
+
 
     headers = {'Content-type': MIMETYPE_JSON}
 
@@ -147,7 +154,6 @@ def create_dme_filtered_data_job(training_config_obj, source_name, db_org, bucke
     logger.debug(url)
     logger.debug(json.dumps(job_json))
     response = requests.put(url, data=json.dumps(job_json), headers=headers)
-
     return response
 
 def delete_dme_filtered_data_job(training_config_obj, feature_group_name, host, port):
index b8b31cf..37e6153 100644 (file)
@@ -167,7 +167,7 @@ def check_feature_group_data(json_data):
     try:
         if check_key_in_dictionary(["featureGroupName", "feature_list", \
                                     "datalake_source", "enable_Dme", "DmeHost", 
-                                    "DmePort", "bucket", "token", "source_name"], json_data):
+                                    "DmePort", "bucket", "token", "source_name", "measured_obj_class"], json_data):
             feature_group_name=json_data["featureGroupName"]
             features=json_data["feature_list"]
             datalake_source=json_data["datalake_source"]
@@ -178,6 +178,7 @@ def check_feature_group_data(json_data):
             token=json_data["token"]
             source_name=json_data["source_name"]
             db_org=json_data["dbOrg"]
+            measured_obj_class = json_data["measured_obj_class"]
             
         else :
             raise TMException("check_featuregroup_data- supplied data doesn't have" + \
@@ -186,7 +187,7 @@ def check_feature_group_data(json_data):
     except Exception as err:
         raise APIException(status.HTTP_400_BAD_REQUEST, str(err)) from None
     
-    return (feature_group_name, features, datalake_source, enable_dme, dme_host, dme_port, bucket, token, source_name,db_org)
+    return (feature_group_name, features, datalake_source, enable_dme, dme_host, dme_port, bucket, token, source_name,db_org, measured_obj_class)
 
 def get_one_key(dictionary):
     '''
index 8c906b7..9248158 100644 (file)
@@ -558,7 +558,7 @@ def add_update_trainingjob(description, pipeline_name, experiment_name, feature_
         if conn is not None:
             conn.close()
 
-def add_featuregroup(feature_group_name, feature_list, datalake_source, enable_dme, ps_db_obj , dmehost="", dmeport="", bucket="", token="", source_name="",db_org=""):
+def add_featuregroup(feature_group_name, feature_list, datalake_source, enable_dme, ps_db_obj, measured_obj_class="" , dmehost="", dmeport="", bucket="", token="", source_name="",db_org=""):
     """
     This function add the new row or update existing row with given information
     """
@@ -569,8 +569,8 @@ def add_featuregroup(feature_group_name, feature_list, datalake_source, enable_d
     
     try:
         cursor.execute(''' INSERT INTO {} VALUES '''.format(fg_table_name) +
-                       '''(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s)''',
-                       (feature_group_name, feature_list, datalake_source, enable_dme, dmehost, dmeport, bucket, token, source_name, db_org))
+                       '''(%s,%s,%s,%s,%s,%s,%s,%s,%s,%s, %s)''',
+                       (feature_group_name, feature_list, datalake_source, enable_dme, dmehost, measured_obj_class, dmeport, bucket, token, source_name, db_org))
         conn.commit()
         cursor.close()
     except Exception as err:
index 5e3d2f6..deb6ccd 100644 (file)
@@ -131,6 +131,7 @@ class PSDB():
                         "datalake_source varchar(2000) NOT NULL," + \
                         "enable_dme BOOLEAN NOT NULL," + \
                         "DmeHost varchar(128) NOT NULL," + \
+                        "measured_obj_class varchar(128) NOT NULL," + \
                         "DmePort varchar(128) NOT NULL," + \
                         "bucket varchar(128) NOT NULL," + \
                         "token varchar(2000) NOT NULL," + \
index 4215727..749819f 100644 (file)
@@ -1320,7 +1320,7 @@ def create_feature_group():
 
     try:
         json_data=request.json
-        (feature_group_name, features, datalake_source, enable_dme, dme_host, dme_port, bucket, token, source_name,db_org)=check_feature_group_data(json_data)
+        (feature_group_name, features, datalake_source, enable_dme, dme_host, dme_port,bucket, token, source_name,db_org, measured_obj_class)=check_feature_group_data(json_data)
         # check the data conformance
         LOGGER.debug("the db info is : ", get_feature_group_by_name_db(PS_DB_OBJ, feature_group_name))
 
@@ -1332,9 +1332,9 @@ def create_feature_group():
         else:
             # the features are stored in string format in the db, and has to be passed as list of feature to the dme. Hence the conversion.
             features_list = features.split(",")
-            add_featuregroup(feature_group_name, features, datalake_source, enable_dme, PS_DB_OBJ,dme_host, dme_port, bucket, token, source_name,db_org )
+            add_featuregroup(feature_group_name, features, datalake_source, enable_dme, PS_DB_OBJ,measured_obj_class,dme_host, dme_port, bucket, token, source_name,db_org )
             if enable_dme == True :
-                response= create_dme_filtered_data_job(TRAININGMGR_CONFIG_OBJ, source_name, db_org, bucket,  token, features_list, feature_group_name,  dme_host, dme_port)
+                response= create_dme_filtered_data_job(TRAININGMGR_CONFIG_OBJ, source_name, features_list, feature_group_name, dme_host, dme_port, measured_obj_class)
                 if response.status_code != 201:
                     api_response={"Exception": "Cannot create dme job"}
                     delete_feature_group_by_name(PS_DB_OBJ, feature_group_name)