From 7231eb8f0dcdeeceadd4807f31479fe88fd8dfcd Mon Sep 17 00:00:00 2001 From: ashishj1729 Date: Wed, 4 Dec 2024 13:42:20 +0530 Subject: [PATCH] Terminate Run functionality Added Change-Id: I8f61557127e285f4288e1ea5a37c958ddbdd1c10 Signed-off-by: ashishj1729 --- kfadapter/kfadapter_kfconnect.py | 12 +++++++++++- kfadapter/kfadapter_main.py | 9 ++++++--- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/kfadapter/kfadapter_kfconnect.py b/kfadapter/kfadapter_kfconnect.py index f96c2fe..7cee9e2 100644 --- a/kfadapter/kfadapter_kfconnect.py +++ b/kfadapter/kfadapter_kfconnect.py @@ -279,4 +279,14 @@ class KfConnect: self.logger.error(str(err)) self.logger.debug("run_kf_pipeline Exited") - return run \ No newline at end of file + return run + + + def terminate_kf_pipeline(self, run_id): + print("Terminating Run: run_id: ", run_id) + try: + out = self.kfp_client.terminate_run(run_id) + print("Terminate Run O/p :: ", out) + except Exception as err: + self.logger.error("Terminate Run Error :: ", str(err)) + raise err \ No newline at end of file diff --git a/kfadapter/kfadapter_main.py b/kfadapter/kfadapter_main.py index f129690..d3d5913 100644 --- a/kfadapter/kfadapter_main.py +++ b/kfadapter/kfadapter_main.py @@ -486,9 +486,12 @@ def kf_run(run_id): run_dict = {} try: if request.method == 'DELETE': - LOGGER.error("Method not supported yet") - raise BadRequest("Method not supported yet", status.HTTP_501_NOT_IMPLEMENTED,\ - {'ext': 1}) + LOGGER.debug("Deleting Run_id : " + run_id) + KFCONNECT_KF_OBJ.terminate_kf_pipeline(run_id) + with kfadapter_conf.LOCK: + # Deleting from global-var so that wait_status_thread should not keep checking this run_id + del kfadapter_conf.TRAINING_DICT[run_id] + return {}, status.HTTP_200_OK run_info = KFCONNECT_KF_OBJ.get_kf_run(run_id) run_dict['run_id'] = run_info.run_id -- 2.16.6