changes for the new pipeline, modelName and modelVersion and 26/13826/4
authorrajdeep11 <rajdeep.sin@samsung.com>
Sat, 7 Dec 2024 07:55:17 +0000 (13:25 +0530)
committerrajdeep11 <rajdeep.sin@samsung.com>
Sat, 7 Dec 2024 08:47:55 +0000 (14:17 +0530)
artifactVersion for modelStorage and featurepath for getting features

Change-Id: I4a0c7027eca8dfcdb7a4cf2c6c0e500d7007edb7
Signed-off-by: rajdeep11 <rajdeep.sin@samsung.com>
kfadapter/kfadapter_kfconnect.py
kfadapter/kfadapter_main.py
kfadapter/kfadapter_util.py
test/test_kfadapter_kfconnect.py

index 7cee9e2..d57c8fe 100644 (file)
@@ -263,12 +263,17 @@ class KfConnect:
 
         """
         self.logger.debug("run_kf_pipeline Entered")
-        job_name=arguments["trainingjob_name"]
+        job_id=arguments["trainingjob_id"]
+        featuregroup_name = arguments["featuregroup_name"]
+        featurepath = featuregroup_name+"_"+job_id
         req_dict={
-            "trainingjob_name":job_name,
+            "featurepath":featurepath,
             "epochs": arguments["epochs"],
-            "version": str(arguments["version"])
+            "modelname": str(arguments["modelName"]),
+            "modelversion": str(arguments["modelVersion"]), 
+            "artifactversion":str(arguments["artifactVersion"])
         }
+        self.logger.debug("run_kf_pipeline Arguments: "+str(req_dict))
         try:
             run = self.kfp_client.run_pipeline(exp_id, job_name="testjob"+\
                                            random_suffix(),
index d3d5913..2e91ded 100644 (file)
@@ -328,8 +328,8 @@ def list_pipelines():
     return jsonify(pipe_dict), status.HTTP_200_OK
 
 
-@APP.route('/trainingjobs/<trainingjob_name>/execution', methods=['POST'])
-def run_pipeline(trainingjob_name):
+@APP.route('/trainingjobs/<trainingjob_id>/execution', methods=['POST'])
+def run_pipeline(trainingjob_id):
     """Function handling HTTP POST rest endpoint to execute pipeline based on trainingjob name
 
     Args:
@@ -351,7 +351,7 @@ def run_pipeline(trainingjob_name):
     """
     errcode = None
     err_string = None
-    LOGGER.debug("run_pipeline for %s", trainingjob_name)
+    LOGGER.debug("run_pipeline for %s", trainingjob_id)
     run_dict = {}
     try:
         errcode = status.HTTP_400_BAD_REQUEST
@@ -359,7 +359,7 @@ def run_pipeline(trainingjob_name):
         req = request.json
         LOGGER.debug(req)
         if("arguments" in req.keys() and "pipeline_name" in req.keys() and \
-                "experiment_name" in req.keys()):
+                "experiment_name" in req.keys() and "pipeline_version" in req.keys()):
             arguments = req["arguments"]
             pipe_name = req["pipeline_name"]
             exp_name = req["experiment_name"]
@@ -380,10 +380,14 @@ def run_pipeline(trainingjob_name):
             LOGGER.debug("Pipeline ID = " + pipe_id)
 
             version_id = KFCONNECT_KF_OBJ.get_kf_pipeline_version_id(pipe_id, pipeline_version_name)
+
+            LOGGER.debug("version id is: "+ version_id)
             LOGGER.debug("Running pipeline")
+
             run = KFCONNECT_KF_OBJ.run_kf_pipeline(exp.experiment_id, pipe_id, arguments, version_id)
+
             LOGGER.debug("Run ID = %s", run.run_id)
-            run_dict['trainingjob_name'] = trainingjob_name
+            run_dict['trainingjob_id'] = trainingjob_id
             run_dict['run_id'] = run.run_id
             run_dict['run_name'] = run.display_name
             run_dict['experiment_name'] = 'Default'
@@ -394,7 +398,7 @@ def run_pipeline(trainingjob_name):
             if run.state == 'PENDING':
                 run_dict['run_status'] = "scheduled"
                 with kfadapter_conf.LOCK:
-                    kfadapter_conf.TRAINING_DICT[run.run_id] = trainingjob_name
+                    kfadapter_conf.TRAINING_DICT[run.run_id] = trainingjob_id
         else:
             errcode = status.HTTP_400_BAD_REQUEST
             err_string = 'Less arguments'
index 6894de6..55711ac 100644 (file)
@@ -119,7 +119,7 @@ def wait_status_thread(name, kfc_kfconnect):
                     run_dict = {}
                     run_dict['run_id'] = i
                     run_dict['run_status'] = run_status
-                    run_dict['trainingjob_name'] = kfadapter_conf.TRAINING_DICT[i]
+                    run_dict['trainingjob_id'] = kfadapter_conf.TRAINING_DICT[i]
                     logger.info("POSTING to training manager")
                     logger.info(run_dict)
                     payload = json.dumps(run_dict)
index 6955805..74c0946 100644 (file)
@@ -91,9 +91,12 @@ class Test_KfConnect:
 
     def test_run_kf_pipeline(self):
         assert None == self.__KFCONNECT.run_kf_pipeline(exp_id='exp_id', pipeline_id='pipeline_id', arguments={
-            "trainingjob_name":"job_name",
+            "trainingjob_id":"job_name",
             "epochs": "epochs",
-            "version": "version"}
+            "modelName": "name", 
+            "modelVersion":"version",
+            "artifactVersion":"version",
+            "featuregroup_name":"fg_name"}
         , version_id='version_id')