From c9ea468113c6ca9fb0fe1dffcb19f023dccffb74 Mon Sep 17 00:00:00 2001 From: Monosij Ghosh Date: Thu, 12 Dec 2024 19:09:08 +0530 Subject: [PATCH] TM module training job fixes fixes for training job testing Change-Id: If18610d023738e3a300967767d65767adb7c6327 Signed-off-by: Monosij Ghosh --- request.http | 83 +++++++++++++++--------- trainingmgr/controller/trainingjob_controller.py | 5 +- trainingmgr/handler/async_handler.py | 3 +- trainingmgr/pipeline/mme_mgr.py | 4 +- trainingmgr/schemas/trainingjob_schema.py | 11 +--- trainingmgr/trainingmgr_main.py | 6 +- 6 files changed, 59 insertions(+), 53 deletions(-) diff --git a/request.http b/request.http index 65ba600..ca5837e 100644 --- a/request.http +++ b/request.http @@ -22,6 +22,7 @@ @trainingjob_name = testing_influxdb_101 # @trainingjob_name = test_influx_1 @trainingjob_version = 1 + ### TM: API for training POST http://{{base}}/trainingjobs/{{trainingjob_name}}/training content-type: application/json @@ -41,6 +42,7 @@ content-type: application/json ### TM: Get all the training job GET http://{{base}}/trainingjobs/latest Content-Type: application/json + ### TM: get status GET http://{{base}}/task_status/{{trainingjob_name}} Content-Type: application/json @@ -53,53 +55,61 @@ POST http://{{base}}/featureGroup Content-Type: application/json { - "featureGroupName":"sample_fg_02", - "feature_list":"pdcpBytesDl,pdcpBytesUl", - "datalake_source":"InfluxSource", - "enable_Dme":false, - "Host":"my-release-influxdb.traininghost", - "Port":"8086", - "dmePort":"", - "bucket":"UEData", - "token":"########", - "source_name":"", - "measured_obj_class":"", - "_measurement":"liveCell", - "dbOrg":"primary" + "featuregroup_name": "testing_influxdb_01", + "feature_list": "pdcpBytesDl,pdcpBytesUl", + "datalake_source": "InfluxSource", + "enable_dme": false, + "host": "my-release-influxdb.default", + "port": "8086", + "dme_port": "", + "bucket": "UEData", + "token": "G4xTett9YjSD9m8ShuAF", + "source_name": "", + "measured_obj_class": "", + "measurement": "liveCell", + "db_org": "primary" } -### TM: Get feature group + +### TM: Get all feature groups GET http://{{base}}/featureGroup Content-Type: application/json ### TM: Get training jobs -GET http://{{base}}/trainingjobs/latest +GET http://{{base}}/training-jobs/latest Content-Type: application/json ### TM: Get training jobs GET http://{{base}}/trainingjobs/{{trainingjob_name}}/{{trainingjob_version}} Content-Type: application/json - ### TM: Create training job -POST http://{{base}}/trainingjobs/test_influx_2 +POST http://{{base}}/training-jobs Content-Type: application/json { - "trainingjob_name":"{{trainingjob_name}}", - "is_mme":false, - "model_name":"", - "pipeline_name":"qoe_Pipeline", - "experiment_name":"Default", - "featureGroup_name":"sample_fg_01", - "query_filter":"", - "arguments":{ - "epochs":"1", - "trainingjob_name":"{{trainingjob_name}}" + "modelId":{ + "modelname": "model7", + "modelversion": "1" + }, + "model_location": "", + "training_config": { + "description": "trainingjob for testing", + "dataPipeline": { + "feature_group_name": "testing_influxdb_01", + "query_filter": "", + "arguments": "{'epochs': 1, 'trainingjob_name': 'trainingjob1'}" }, - "enable_versioning":false, - "description":"test", - "pipeline_version":"qoe_Pipeline", - "datalake_source":"InfluxSource" + "trainingPipeline": { + "pipeline_name": "qoe_Pipeline11", + "pipeline_version": "qoe_Pipeline11", + "enable_versioning": false + } + }, + "training_dataset": "", + "validation_dataset": "", + "notification_url": "", + "consumer_rapp_id": "", + "producer_rapp_id": "" } ### tm: get pipelines @@ -136,4 +146,13 @@ Content-Type: application/json "db_org": "primary", "source_name": "" } -} \ No newline at end of file +} + +### Start training by training job ID +POST http://{{base}}/training-jobs/12/training +Content-Type: application/json + + +### Get training job status by ID +GET http://{{base}}/training-jobs/12/status +Content-Type: application/json diff --git a/trainingmgr/controller/trainingjob_controller.py b/trainingmgr/controller/trainingjob_controller.py index a844609..17ae8e8 100644 --- a/trainingmgr/controller/trainingjob_controller.py +++ b/trainingmgr/controller/trainingjob_controller.py @@ -177,10 +177,7 @@ def training(training_job_id): LOGGER.debug("Request for training trainingjob %s ", training_job_id) try: trainingjob = get_training_job(training_job_id) - print(trainingjob) - trainingjob_name = trainingjob.trainingjob_name featuregroup= get_featuregroup_by_name(getField(trainingjob.training_config, "feature_group_name")) - print("featuregroup name is: ",featuregroup.featuregroup_name) feature_list_string = featuregroup.feature_list influxdb_info_dic={} influxdb_info_dic["host"]=featuregroup.host @@ -198,7 +195,7 @@ def training(training_job_id): _measurement, influxdb_info_dic, featuregroup.featuregroup_name) if (de_response.status_code == status.HTTP_200_OK ): LOGGER.debug("Response from data extraction for " + \ - trainingjob_name + " : " + json.dumps(de_response.json())) + str(trainingjob.id) + " : " + json.dumps(de_response.json())) change_status_tj(trainingjob.id, Steps.DATA_EXTRACTION.name, States.IN_PROGRESS.name) diff --git a/trainingmgr/handler/async_handler.py b/trainingmgr/handler/async_handler.py index d9d159e..63e3015 100644 --- a/trainingmgr/handler/async_handler.py +++ b/trainingmgr/handler/async_handler.py @@ -55,7 +55,6 @@ def check_and_notify_feature_engineering_status(APP,db): change_status_tj(trainingjob.id, Steps.DATA_EXTRACTION.name, States.FINISHED.name) change_status_tj(trainingjob.id, Steps.DATA_EXTRACTION_AND_TRAINING.name, States.IN_PROGRESS.name) - kf_response = requests.post( url_pipeline_run, data=json.dumps({"trainingjob_id": trainingjob.id}), @@ -72,7 +71,7 @@ def check_and_notify_feature_engineering_status(APP,db): except Exception as err: LOGGER.error(f"Error processing DATAEXTRACTION_JOBS_CACHE: {str(err)}") with APP.app_context(): - change_state_to_failed(trainingjob.id) + change_state_to_failed(trainingjob) # notification_rapp(trainingjob.id) with LOCK: try: diff --git a/trainingmgr/pipeline/mme_mgr.py b/trainingmgr/pipeline/mme_mgr.py index e1c81af..076cd7b 100644 --- a/trainingmgr/pipeline/mme_mgr.py +++ b/trainingmgr/pipeline/mme_mgr.py @@ -54,7 +54,7 @@ class MmeMgr: This function returns the model information for given modelName and ModelVersion from MME """ try: - url = f'http://{self.mme_ip}:{self.mme_port}/getModelInfo/?modelName={modelName}&modelVersion={modelVersion}' + url = f'http://{self.mme_ip}:{self.mme_port}/models/?model-name={modelName}&model-version={int(modelVersion)}' LOGGER.debug(f"Requesting modelInfo from: {url}") response = requests.get(url) if response.status_code == 200: @@ -64,7 +64,7 @@ class MmeMgr: LOGGER.debug(f"ModelName = {modelName}, ModelVersion = {modelVersion} is not registered on MME") return None else: - err_msg = f"Unexpected response from KFAdapter: {response.status_code}" + err_msg = f"Unexpected response from mme: {response.status_code}" LOGGER.error(err_msg) raise TMException(err_msg) diff --git a/trainingmgr/schemas/trainingjob_schema.py b/trainingmgr/schemas/trainingjob_schema.py index 549b7c5..c297b1e 100644 --- a/trainingmgr/schemas/trainingjob_schema.py +++ b/trainingmgr/schemas/trainingjob_schema.py @@ -34,19 +34,10 @@ class TrainingJobSchema(ma.SQLAlchemyAutoSchema): class Meta: model = TrainingJob load_instance = True - exclude = ("creation_time", "deletion_in_progress", "version", "updation_time","run_id") + exclude = ("creation_time", "deletion_in_progress", "updation_time","run_id") modelId = ma.Nested(ModelSchema) - @validates("trainingjob_name") - def validate_trainingjob_name(self, value): - - if not (3<= len(value) <=50): - raise ValidationError("Training job name length must be between 3 and 50 characters") - - if not PATTERN.fullmatch(value): - raise ValidationError("Training job name must be alphanumeric and underscore only.") - @pre_load def processModelId(self, data, **kwargs): modelname = data['modelId']['modelname'] diff --git a/trainingmgr/trainingmgr_main.py b/trainingmgr/trainingmgr_main.py index 2cd2343..7474876 100644 --- a/trainingmgr/trainingmgr_main.py +++ b/trainingmgr/trainingmgr_main.py @@ -185,15 +185,15 @@ def data_extraction_notification(): response = training_start(TRAININGMGR_CONFIG_OBJ, training_details, trainingjob_id) if ( response.headers['content-type'] != MIMETYPE_JSON or response.status_code != status.HTTP_200_OK ): - err_msg = "Kf adapter invalid content-type or status_code for " + trainingjob_id + err_msg = "Kf adapter invalid content-type or status_code for " + str(trainingjob_id) raise TMException(err_msg) LOGGER.debug("response from kf_adapter for " + \ - trainingjob_id + " : " + json.dumps(response.json())) + str(trainingjob_id) + " : " + json.dumps(response.json())) json_data = response.json() if not check_key_in_dictionary(["run_status", "run_id"], json_data): - err_msg = "Kf adapter invalid response from , key not present ,run_status or run_id for " + trainingjob_id + err_msg = "Kf adapter invalid response from , key not present ,run_status or run_id for " + str(trainingjob_id) Logger.error(err_msg) err_response_code = status.HTTP_400_BAD_REQUEST raise TMException(err_msg) -- 2.16.6