From: minhac.lee Date: Tue, 18 Oct 2022 23:59:11 +0000 (+0900) Subject: Adding utils for kfadapter X-Git-Tag: 1.0.0~20 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=bf245347bbc3ca3755fedee842bad50ee19e17a5;p=aiml-fw%2Fathp%2Ftps%2Fkubeflow-adapter.git Adding utils for kfadapter Issue-Id: AIMLFW-2 Signed-off-by: minhac.lee Change-Id: Iae2fba4f918c3672b6a9e42dd8554440a34d8359 --- diff --git a/kfadapter/kfadapter_util.py b/kfadapter/kfadapter_util.py new file mode 100644 index 0000000..cde187d --- /dev/null +++ b/kfadapter/kfadapter_util.py @@ -0,0 +1,175 @@ +"""kfadapter_util.py. + +This module is for all the utility functions to be used +by the main and other modules + +""" +# ================================================================================== +# +# Copyright (c) 2022 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== +from random import choices +import traceback +import string +import time +import json +import requests +from flask_api import status +import kfadapter_conf + +class BadRequest(Exception): + """ + This is a class for throwing custom exception when local error occurs + + Attributes: + message: Custom message for the exception occured + status: Error code for the exception + payload: Custom payload for diagnostic purpose + """ + def __init__(self, message, sts_code=status.HTTP_400_BAD_REQUEST, payload=None): + """ + The constructor for BadRequest class. + + Parameters: + message: Custom message for the exception occured + status: Error code for the exception + payload: Custom payload for diagnostic purpose + """ + super().__init__() + self.message = message + self.status = sts_code + self.payload = payload + +def keys_match(dict1, dict2) -> bool: + """ + check all keys from dict2 are present in dict1 or not + """ + for key in dict2.keys(): + if key not in dict1: + return False + return True + +def run_finished(run_status: string) -> bool: + """ + Function for letting the caller know if run has finished + based on run_status string reported from KubeFlow + + Args: + run_status: status returned for run from KubeFlow + + Returns: + true or false to signify whether run is finished or not + + """ + return run_status in {'Succeeded', 'Failed', 'Error', 'Skipped', 'Terminated'} + +def random_suffix(): + """ + Function for generating random suffix + + Args: None + + Returns: random suffix + + """ + return ''.join(choices(string.ascii_lowercase + string.digits, k=10)) + +def wait_status_thread(name, kfc_kfconnect): + """ + Thread Function for notify the status of all pipeline run + to training manager + + Args: + kfc_config: KfAdapter_config object + + Returns:None + + """ + #pylint: disable=unused-argument + #pylint: disable=maybe-no-member + kfc_config = kfadapter_conf.KfConfiguration.get_instance() + logger = kfc_config.logger + while True: + with kfadapter_conf.LOCK: + dict_copy = kfadapter_conf.TRAINING_DICT.copy() + for i in dict_copy: + try: + run = kfc_kfconnect.get_kf_run(i) + run_status = run.run.status + except: # pylint: disable=bare-except + run_status = "Manual reconcile" + + if run_finished(run_status) or run_status == "Manual reconcile": + run_dict = {} + run_dict['run_id'] = i + run_dict['run_status'] = run_status + run_dict['trainingjob_name'] = kfadapter_conf.TRAINING_DICT[i] + logger.info("POSTING to training manager") + logger.info(run_dict) + payload = json.dumps(run_dict) + url = "http://"+kfc_config.trainingmgr_dict['trainingmgr_host']+":"+\ + kfc_config.trainingmgr_dict['trainingmgr_port']\ + + "/trainingjob/pipelineNotification" + logger.debug(url) + headers = {'content-type': 'application/json', 'Accept-Charset': 'UTF-8'} + try: + response = requests.post(url, data=payload, headers=headers) + logger.info(response.json) + except requests.exceptions.ConnectionError as warn: + logger.warning("REST Server not running at %s", url) + logger.warning(warn) + except: # pylint: disable=bare-except + tbk = traceback.format_exc() + logger.error(tbk) + del kfadapter_conf.TRAINING_DICT[i] + break + time.sleep(1) + time.sleep(kfc_config.run_status_polling_interval_sec) + +def check_list(data, compare_key): + ''' + check compare_key presents in inner list or dictionary and return value for given compare_key + ''' + for value in data: + if isinstance(value, dict): + ret = check_map(value, compare_key) + if ret: + return ret + elif isinstance(value, list): + ret = check_list(value, compare_key) + if ret: + return ret + else: + pass + return None + +def check_map(data, compare_key): + ''' + check compare_key presents in inner list or dictionary or current dictionary and return value + for given compare_key + ''' + for key, value in data.items(): + if key == compare_key: + return value + if isinstance(value, dict): + ret = check_map(value, compare_key) + if ret: + return ret + elif isinstance(value, list): + ret = check_list(value, compare_key) + if ret: + return ret + return None