"""
if self.__initialized:
return
+
self.__kf_adapter_port = getenv('KF_ADAPTER_PORT').rstrip() if getenv('KF_ADAPTER_PORT') is not None else None
self.__kf_adapter_ip = getenv('KF_ADAPTER_IP').rstrip() if getenv('KF_ADAPTER_IP') is not None else None
return True
return False
+# Handled by PipelineMgr (To be removed in future)
def fetch_pipeline_info_by_name(training_config_obj, pipe_name):
"""
This function returns the information for a specific pipeline
logger.error(err_msg)
raise TMException(err_msg)
+# Moved to pipelineMgr, To be removed in future
class PipelineInfo:
def __init__(self, pipeline_id, display_name, description, created_at):
self.pipeline_id = pipeline_id
--- /dev/null
+# ==================================================================================
+#
+# Copyright (c) 2024 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ==================================================================================
+
+import json
+from flask import Blueprint, jsonify, request
+from flask_api import status
+from trainingmgr.common.exceptions_utls import TMException
+from trainingmgr.service.pipeline_service import get_single_pipeline
+from trainingmgr.common.trainingmgr_config import TrainingMgrConfig
+
+pipeline_controller = Blueprint('pipeline_controller', __name__)
+LOGGER = TrainingMgrConfig().logger
+
+@pipeline_controller.route('/pipelines/<pipeline_name>', methods=['GET'])
+def get_pipeline_info_by_name(pipeline_name):
+ LOGGER.debug(f"Your Controller: Request to get information for pipeline: {pipeline_name}")
+ try:
+ pipeline_info = get_single_pipeline(pipeline_name)
+ if pipeline_info:
+ return jsonify({"pipeline_info": pipeline_info}), status.HTTP_200_OK
+ else:
+ return jsonify({"error": f"Pipeline '{pipeline_name}' not found"}), status.HTTP_404_NOT_FOUND
+ except TMException as err:
+ LOGGER.error(f"TrainingManager exception: {str(err)}")
+ return jsonify({"error": str(err)}), status.HTTP_404_NOT_FOUND
+ except Exception as err:
+ LOGGER.error(f"Unexpected error in get_pipeline_info_by_name: {str(err)}")
+ return jsonify({"error": "An unexpected error occurred"}), status.HTTP_500_INTERNAL_SERVER_ERROR
+
\ No newline at end of file
--- /dev/null
+# ==================================================================================
+#
+# Copyright (c) 2024 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ==================================================================================
+
+from os import getenv
+from trainingmgr.common.trainingmgr_config import TrainingMgrConfig
+import requests
+from trainingmgr.common.exceptions_utls import TMException
+
+LOGGER = TrainingMgrConfig().logger
+
+# Constants
+MIMETYPE_JSON = "application/json"
+ERROR_TYPE_KF_ADAPTER_JSON = "Kf adapter doesn't sends json type response"
+
+
+class PipelineMgr:
+ __instance = None
+
+ def __new__(cls):
+ if cls.__instance is None:
+ cls.__instance = super(PipelineMgr, cls).__new__(cls)
+ cls.__instance.__initialized = False
+ return cls.__instance
+
+ def __init__(self):
+ if self.__initialized:
+ return
+
+ self.kf_adapter_ip = TrainingMgrConfig().kf_adapter_ip
+ self.kf_adapter_port = TrainingMgrConfig().kf_adapter_port
+
+ self.__initialized = True
+
+
+ def get_all_pipelines(self):
+ """
+ This function returns the information for all pipelines
+ """
+ try:
+ url = f'http://{self.kf_adapter_ip}:{self.kf_adapter_port}/pipelines'
+ LOGGER.debug(f"Requesting pipelines from: {url}")
+ response = requests.get(url)
+ if response.status_code == 200:
+ if response.headers['content-type'] != MIMETYPE_JSON:
+ err_msg = ERROR_TYPE_KF_ADAPTER_JSON
+ LOGGER.error(err_msg)
+ raise TMException(err_msg)
+
+ return response.json()
+ else:
+ err_msg = f"Unexpected response from KFAdapter: {response.status_code}"
+ LOGGER.error(err_msg)
+ return TMException(err_msg)
+
+ except requests.RequestException as err:
+ err_msg = f"Error communicating with KFAdapter : {str(err)}"
+ LOGGER.error(err_msg)
+ raise TMException(err_msg)
+ except Exception as err:
+ err_msg = f"Unexpected error in get_pipeline_info_by_name: {str(err)}"
+ LOGGER.error(err_msg)
+ raise TMException(err_msg)
\ No newline at end of file
--- /dev/null
+from dataclasses import dataclass, asdict
+from datetime import datetime
+
+@dataclass
+class PipelineInfo:
+ pipeline_id: str
+ display_name: str
+ description: str
+ created_at: datetime
+
+ def to_dict(self):
+ return asdict(self)
\ No newline at end of file
--- /dev/null
+# ==================================================================================
+#
+# Copyright (c) 2024 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# ==================================================================================
+from trainingmgr.common.trainingmgr_config import TrainingMgrConfig
+from trainingmgr.db.pipeline_mgr import PipelineMgr
+from trainingmgr.models.pipeline_info import PipelineInfo
+
+pipelineMgrObj = PipelineMgr()
+
+LOGGER = TrainingMgrConfig().logger
+def get_single_pipeline(pipeline_name):
+ allPipelines = pipelineMgrObj.get_all_pipelines()
+ for pipeline_info in allPipelines.get('pipelines', []):
+ if pipeline_info['display_name'] == pipeline_name:
+
+ return PipelineInfo(
+ pipeline_id=pipeline_info['pipeline_id'],
+ display_name=pipeline_info['display_name'],
+ description=pipeline_info['description'],
+ created_at=pipeline_info['created_at']
+ ).to_dict()
+
+ LOGGER.warning(f"Pipeline '{pipeline_name}' not found")
+ return None
+
+
+
\ No newline at end of file
change_field_value_by_version, delete_trainingjob_version, change_in_progress_to_failed_by_latest_version, \
update_model_download_url, get_all_versions_info_by_name
from trainingmgr.controller.trainingjob_controller import training_job_controller
+from trainingmgr.controller.pipeline_controller import pipeline_controller
from trainingmgr.common.trainingConfig_parser import validateTrainingConfig, getField
APP = Flask(__name__)
from middleware.loggingMiddleware import LoggingMiddleware
APP.wsgi_app = LoggingMiddleware(APP.wsgi_app)
APP.register_blueprint(training_job_controller)
+APP.register_blueprint(pipeline_controller)
PS_DB_OBJ = None
LOGGER = None