From 99440990746b7669e45ae3940e072d67f153b053 Mon Sep 17 00:00:00 2001 From: rajdeep11 Date: Sat, 7 Dec 2024 13:25:17 +0530 Subject: [PATCH] changes for the new pipeline, modelName and modelVersion and artifactVersion for modelStorage and featurepath for getting features Change-Id: I4a0c7027eca8dfcdb7a4cf2c6c0e500d7007edb7 Signed-off-by: rajdeep11 --- kfadapter/kfadapter_kfconnect.py | 11 ++++++++--- kfadapter/kfadapter_main.py | 16 ++++++++++------ kfadapter/kfadapter_util.py | 2 +- test/test_kfadapter_kfconnect.py | 7 +++++-- 4 files changed, 24 insertions(+), 12 deletions(-) diff --git a/kfadapter/kfadapter_kfconnect.py b/kfadapter/kfadapter_kfconnect.py index 7cee9e2..d57c8fe 100644 --- a/kfadapter/kfadapter_kfconnect.py +++ b/kfadapter/kfadapter_kfconnect.py @@ -263,12 +263,17 @@ class KfConnect: """ self.logger.debug("run_kf_pipeline Entered") - job_name=arguments["trainingjob_name"] + job_id=arguments["trainingjob_id"] + featuregroup_name = arguments["featuregroup_name"] + featurepath = featuregroup_name+"_"+job_id req_dict={ - "trainingjob_name":job_name, + "featurepath":featurepath, "epochs": arguments["epochs"], - "version": str(arguments["version"]) + "modelname": str(arguments["modelName"]), + "modelversion": str(arguments["modelVersion"]), + "artifactversion":str(arguments["artifactVersion"]) } + self.logger.debug("run_kf_pipeline Arguments: "+str(req_dict)) try: run = self.kfp_client.run_pipeline(exp_id, job_name="testjob"+\ random_suffix(), diff --git a/kfadapter/kfadapter_main.py b/kfadapter/kfadapter_main.py index d3d5913..2e91ded 100644 --- a/kfadapter/kfadapter_main.py +++ b/kfadapter/kfadapter_main.py @@ -328,8 +328,8 @@ def list_pipelines(): return jsonify(pipe_dict), status.HTTP_200_OK -@APP.route('/trainingjobs//execution', methods=['POST']) -def run_pipeline(trainingjob_name): +@APP.route('/trainingjobs//execution', methods=['POST']) +def run_pipeline(trainingjob_id): """Function handling HTTP POST rest endpoint to execute pipeline based on trainingjob name Args: @@ -351,7 +351,7 @@ def run_pipeline(trainingjob_name): """ errcode = None err_string = None - LOGGER.debug("run_pipeline for %s", trainingjob_name) + LOGGER.debug("run_pipeline for %s", trainingjob_id) run_dict = {} try: errcode = status.HTTP_400_BAD_REQUEST @@ -359,7 +359,7 @@ def run_pipeline(trainingjob_name): req = request.json LOGGER.debug(req) if("arguments" in req.keys() and "pipeline_name" in req.keys() and \ - "experiment_name" in req.keys()): + "experiment_name" in req.keys() and "pipeline_version" in req.keys()): arguments = req["arguments"] pipe_name = req["pipeline_name"] exp_name = req["experiment_name"] @@ -380,10 +380,14 @@ def run_pipeline(trainingjob_name): LOGGER.debug("Pipeline ID = " + pipe_id) version_id = KFCONNECT_KF_OBJ.get_kf_pipeline_version_id(pipe_id, pipeline_version_name) + + LOGGER.debug("version id is: "+ version_id) LOGGER.debug("Running pipeline") + run = KFCONNECT_KF_OBJ.run_kf_pipeline(exp.experiment_id, pipe_id, arguments, version_id) + LOGGER.debug("Run ID = %s", run.run_id) - run_dict['trainingjob_name'] = trainingjob_name + run_dict['trainingjob_id'] = trainingjob_id run_dict['run_id'] = run.run_id run_dict['run_name'] = run.display_name run_dict['experiment_name'] = 'Default' @@ -394,7 +398,7 @@ def run_pipeline(trainingjob_name): if run.state == 'PENDING': run_dict['run_status'] = "scheduled" with kfadapter_conf.LOCK: - kfadapter_conf.TRAINING_DICT[run.run_id] = trainingjob_name + kfadapter_conf.TRAINING_DICT[run.run_id] = trainingjob_id else: errcode = status.HTTP_400_BAD_REQUEST err_string = 'Less arguments' diff --git a/kfadapter/kfadapter_util.py b/kfadapter/kfadapter_util.py index 6894de6..55711ac 100644 --- a/kfadapter/kfadapter_util.py +++ b/kfadapter/kfadapter_util.py @@ -119,7 +119,7 @@ def wait_status_thread(name, kfc_kfconnect): run_dict = {} run_dict['run_id'] = i run_dict['run_status'] = run_status - run_dict['trainingjob_name'] = kfadapter_conf.TRAINING_DICT[i] + run_dict['trainingjob_id'] = kfadapter_conf.TRAINING_DICT[i] logger.info("POSTING to training manager") logger.info(run_dict) payload = json.dumps(run_dict) diff --git a/test/test_kfadapter_kfconnect.py b/test/test_kfadapter_kfconnect.py index 6955805..74c0946 100644 --- a/test/test_kfadapter_kfconnect.py +++ b/test/test_kfadapter_kfconnect.py @@ -91,9 +91,12 @@ class Test_KfConnect: def test_run_kf_pipeline(self): assert None == self.__KFCONNECT.run_kf_pipeline(exp_id='exp_id', pipeline_id='pipeline_id', arguments={ - "trainingjob_name":"job_name", + "trainingjob_id":"job_name", "epochs": "epochs", - "version": "version"} + "modelName": "name", + "modelVersion":"version", + "artifactVersion":"version", + "featuregroup_name":"fg_name"} , version_id='version_id') -- 2.16.6