TM module training job fixes 57/13857/3
authorMonosij Ghosh <mono.ghosh@samsung.com>
Thu, 12 Dec 2024 13:39:08 +0000 (19:09 +0530)
committerMonosij Ghosh <mono.ghosh@samsung.com>
Thu, 12 Dec 2024 19:40:42 +0000 (01:10 +0530)
fixes for training job testing

Change-Id: If18610d023738e3a300967767d65767adb7c6327
Signed-off-by: Monosij Ghosh <mono.ghosh@samsung.com>
request.http
trainingmgr/controller/trainingjob_controller.py
trainingmgr/handler/async_handler.py
trainingmgr/pipeline/mme_mgr.py
trainingmgr/schemas/trainingjob_schema.py
trainingmgr/trainingmgr_main.py

index 65ba600..ca5837e 100644 (file)
@@ -22,6 +22,7 @@
 @trainingjob_name = testing_influxdb_101
 # @trainingjob_name = test_influx_1
 @trainingjob_version = 1
+
 ### TM: API for training
 POST http://{{base}}/trainingjobs/{{trainingjob_name}}/training
 content-type: application/json
@@ -41,6 +42,7 @@ content-type: application/json
 ### TM: Get all the training job
 GET http://{{base}}/trainingjobs/latest
 Content-Type: application/json
+
 ### TM: get status
 GET http://{{base}}/task_status/{{trainingjob_name}}
 Content-Type: application/json
@@ -53,53 +55,61 @@ POST http://{{base}}/featureGroup
 Content-Type: application/json
 
 {
-    "featureGroupName":"sample_fg_02",
-    "feature_list":"pdcpBytesDl,pdcpBytesUl",
-    "datalake_source":"InfluxSource",
-    "enable_Dme":false,
-    "Host":"my-release-influxdb.traininghost",
-    "Port":"8086",
-    "dmePort":"",
-    "bucket":"UEData",
-    "token":"########",
-    "source_name":"",
-    "measured_obj_class":"",
-    "_measurement":"liveCell",
-    "dbOrg":"primary"
+    "featuregroup_name": "testing_influxdb_01",
+    "feature_list": "pdcpBytesDl,pdcpBytesUl",
+    "datalake_source": "InfluxSource",
+    "enable_dme": false,
+    "host": "my-release-influxdb.default",
+    "port": "8086",
+    "dme_port": "",
+    "bucket": "UEData",
+    "token": "G4xTett9YjSD9m8ShuAF",
+    "source_name": "",
+    "measured_obj_class": "",
+    "measurement": "liveCell",
+    "db_org": "primary"
 }
-### TM: Get feature group
+
+### TM: Get all feature groups
 GET http://{{base}}/featureGroup
 Content-Type: application/json
 
 ### TM: Get training jobs
-GET http://{{base}}/trainingjobs/latest
+GET http://{{base}}/training-jobs/latest
 Content-Type: application/json
 
 ### TM: Get training jobs
 GET http://{{base}}/trainingjobs/{{trainingjob_name}}/{{trainingjob_version}}
 Content-Type: application/json
 
-
 ### TM: Create training job
-POST http://{{base}}/trainingjobs/test_influx_2
+POST http://{{base}}/training-jobs
 Content-Type: application/json
 
 {
-    "trainingjob_name":"{{trainingjob_name}}",
-    "is_mme":false,
-    "model_name":"",
-    "pipeline_name":"qoe_Pipeline",
-    "experiment_name":"Default",
-    "featureGroup_name":"sample_fg_01",
-    "query_filter":"",
-    "arguments":{
-        "epochs":"1",
-        "trainingjob_name":"{{trainingjob_name}}"
+    "modelId":{
+        "modelname": "model7",
+        "modelversion": "1"
+    },
+    "model_location": "",
+    "training_config": {
+        "description": "trainingjob for testing",
+        "dataPipeline": {
+            "feature_group_name": "testing_influxdb_01",
+            "query_filter": "",
+            "arguments": "{'epochs': 1, 'trainingjob_name': 'trainingjob1'}"
         },
-    "enable_versioning":false,
-    "description":"test",
-    "pipeline_version":"qoe_Pipeline",
-    "datalake_source":"InfluxSource"
+        "trainingPipeline": {
+                "pipeline_name": "qoe_Pipeline11",
+                "pipeline_version": "qoe_Pipeline11",
+                "enable_versioning": false
+        }
+    },
+    "training_dataset": "",
+    "validation_dataset": "",
+    "notification_url": "",
+    "consumer_rapp_id": "",
+    "producer_rapp_id": ""
 }
 
 ### tm: get pipelines
@@ -136,4 +146,13 @@ Content-Type: application/json
         "db_org": "primary",
         "source_name": ""
     }
-}
\ No newline at end of file
+}
+
+### Start training by training job ID
+POST http://{{base}}/training-jobs/12/training
+Content-Type: application/json
+
+
+### Get training job status by ID
+GET http://{{base}}/training-jobs/12/status
+Content-Type: application/json
index a844609..17ae8e8 100644 (file)
@@ -177,10 +177,7 @@ def training(training_job_id):
     LOGGER.debug("Request for training trainingjob  %s ", training_job_id)
     try:
         trainingjob = get_training_job(training_job_id)
-        print(trainingjob)
-        trainingjob_name = trainingjob.trainingjob_name
         featuregroup= get_featuregroup_by_name(getField(trainingjob.training_config, "feature_group_name"))
-        print("featuregroup name is: ",featuregroup.featuregroup_name)
         feature_list_string = featuregroup.feature_list
         influxdb_info_dic={}
         influxdb_info_dic["host"]=featuregroup.host
@@ -198,7 +195,7 @@ def training(training_job_id):
                                         _measurement, influxdb_info_dic, featuregroup.featuregroup_name)
         if (de_response.status_code == status.HTTP_200_OK ):
             LOGGER.debug("Response from data extraction for " + \
-                    trainingjob_name + " : " + json.dumps(de_response.json()))
+                    str(trainingjob.id) + " : " + json.dumps(de_response.json()))
             change_status_tj(trainingjob.id,
                                 Steps.DATA_EXTRACTION.name,
                                 States.IN_PROGRESS.name)
index d9d159e..63e3015 100644 (file)
@@ -55,7 +55,6 @@ def check_and_notify_feature_engineering_status(APP,db):
                         
                         change_status_tj(trainingjob.id, Steps.DATA_EXTRACTION.name, States.FINISHED.name)
                         change_status_tj(trainingjob.id, Steps.DATA_EXTRACTION_AND_TRAINING.name, States.IN_PROGRESS.name)
-
                     kf_response = requests.post(
                         url_pipeline_run,
                         data=json.dumps({"trainingjob_id": trainingjob.id}),
@@ -72,7 +71,7 @@ def check_and_notify_feature_engineering_status(APP,db):
             except Exception as err:
                 LOGGER.error(f"Error processing DATAEXTRACTION_JOBS_CACHE: {str(err)}")
                 with APP.app_context():
-                    change_state_to_failed(trainingjob.id)
+                    change_state_to_failed(trainingjob)
                     # notification_rapp(trainingjob.id)
                     with LOCK:
                         try:
index e1c81af..076cd7b 100644 (file)
@@ -54,7 +54,7 @@ class MmeMgr:
             This function returns the model information for given modelName and ModelVersion from MME
         """
         try:
-            url = f'http://{self.mme_ip}:{self.mme_port}/getModelInfo/?modelName={modelName}&modelVersion={modelVersion}'
+            url = f'http://{self.mme_ip}:{self.mme_port}/models/?model-name={modelName}&model-version={int(modelVersion)}'
             LOGGER.debug(f"Requesting modelInfo from: {url}")
             response = requests.get(url)
             if response.status_code == 200:
@@ -64,7 +64,7 @@ class MmeMgr:
                 LOGGER.debug(f"ModelName = {modelName}, ModelVersion = {modelVersion} is not registered on MME")
                 return None                
             else:
-                err_msg = f"Unexpected response from KFAdapter: {response.status_code}"
+                err_msg = f"Unexpected response from mme: {response.status_code}"
                 LOGGER.error(err_msg)
                 raise TMException(err_msg)
 
index 549b7c5..c297b1e 100644 (file)
@@ -34,19 +34,10 @@ class TrainingJobSchema(ma.SQLAlchemyAutoSchema):
     class Meta:
         model = TrainingJob
         load_instance = True
-        exclude = ("creation_time", "deletion_in_progress", "version", "updation_time","run_id")
+        exclude = ("creation_time", "deletion_in_progress", "updation_time","run_id")
     
     modelId = ma.Nested(ModelSchema)
 
-    @validates("trainingjob_name")
-    def validate_trainingjob_name(self, value):
-
-        if not (3<= len(value) <=50):
-            raise ValidationError("Training job name length must be between 3 and 50 characters")
-        
-        if not PATTERN.fullmatch(value):
-            raise ValidationError("Training job name must be alphanumeric and underscore only.")
-    
     @pre_load
     def processModelId(self, data, **kwargs):
         modelname = data['modelId']['modelname']
index 2cd2343..7474876 100644 (file)
@@ -185,15 +185,15 @@ def data_extraction_notification():
         response = training_start(TRAININGMGR_CONFIG_OBJ, training_details, trainingjob_id)
         if ( response.headers['content-type'] != MIMETYPE_JSON 
                 or response.status_code != status.HTTP_200_OK ):
-            err_msg = "Kf adapter invalid content-type or status_code for " + trainingjob_id
+            err_msg = "Kf adapter invalid content-type or status_code for " + str(trainingjob_id)
             raise TMException(err_msg)
         
         LOGGER.debug("response from kf_adapter for " + \
-                    trainingjob_id + " : " + json.dumps(response.json()))
+                    str(trainingjob_id) + " : " + json.dumps(response.json()))
         json_data = response.json()
         
         if not check_key_in_dictionary(["run_status", "run_id"], json_data):
-            err_msg = "Kf adapter invalid response from , key not present ,run_status or  run_id for " + trainingjob_id
+            err_msg = "Kf adapter invalid response from , key not present ,run_status or  run_id for " + str(trainingjob_id)
             Logger.error(err_msg)
             err_response_code = status.HTTP_400_BAD_REQUEST
             raise TMException(err_msg)