if re.fullmatch(PATTERN, name):
return True
return False
+
+def fetch_pipeline_info_by_name(training_config_obj, pipe_name):
+ """
+ This function returns the information for a specific pipeline
+ """
+ logger = training_config_obj.logger
+ try:
+ kf_adapter_ip = training_config_obj.kf_adapter_ip
+ kf_adapter_port = training_config_obj.kf_adapter_port
+ if kf_adapter_ip is not None and kf_adapter_port is not None:
+ url = f'http://{kf_adapter_ip}:{kf_adapter_port}/pipelines'
+
+ logger.debug(f"Requesting pipelines from: {url}")
+ response = requests.get(url)
+
+ if response.status_code == 200:
+ if response.headers['content-type'] != MIMETYPE_JSON:
+ err_msg = ERROR_TYPE_KF_ADAPTER_JSON
+ logger.error(err_msg)
+ raise TMException(err_msg)
+
+ pipelines_data = response.json()
+
+ for pipeline_info in pipelines_data.get('pipelines', []):
+ if pipeline_info['display_name'] == pipe_name:
+ return PipelineInfo(
+ pipeline_id=pipeline_info['pipeline_id'],
+ display_name=pipeline_info['display_name'],
+ description=pipeline_info['description'],
+ created_at=pipeline_info['created_at']
+ )
+
+ logger.warning(f"Pipeline '{pipe_name}' not found")
+ return None
+ else:
+ err_msg = f"Unexpected response from KFAdapter: {response.status_code}"
+ logger.error(err_msg)
+ return TMException(err_msg)
+
+ except requests.RequestException as err:
+ err_msg = f"Error communicating with KFAdapter : {str(err)}"
+ logger.error(err_msg)
+ raise TMException(err_msg)
+ except Exception as err:
+ err_msg = f"Unexpected error in get_pipeline_info_by_name: {str(err)}"
+ logger.error(err_msg)
+ raise TMException(err_msg)
+
+class PipelineInfo:
+ def __init__(self, pipeline_id, display_name, description, created_at):
+ self.pipeline_id = pipeline_id
+ self.display_name = display_name
+ self.description = description
+ self.created_at = created_at
+
+ def __repr__(self):
+ return (f"PipelineInfo(pipeline_id={self.pipeline_id}, display_name={self.display_name}, "
+ f"description={self.description}, created_at={self.created_at})")
\ No newline at end of file
status=status.HTTP_200_OK,
mimetype=MIMETYPE_JSON)
+@APP.route('/pipelines/<pipe_name>', methods=['GET'])
+def get_pipeline_info_by_name(pipe_name):
+ """
+ Function handling rest endpoint to get information about a specific pipeline.
+ Args in function:
+ pipe_name : str
+ name of pipeline.
+ Args in json:
+ no json required
+ Returns:
+ json:
+ pipeline_info : dict
+ Dictionary containing detailed information about the specified pipeline.
+ status code:
+ HTTP status code 200 if successful, 404 if pipeline not found, or 500 for server errors.
+ Exceptions:
+ all exceptions are provided with exception message and HTTP status code.
+ """
+ api_response = {}
+ LOGGER.debug(f"Request to get information for pipeline: {pipe_name}")
+ response_code = status.HTTP_500_INTERNAL_SERVER_ERROR
+ try:
+ pipeline_info = fetch_pipeline_info_by_name(TRAININGMGR_CONFIG_OBJ, pipe_name)
+ if pipeline_info:
+ api_response = pipeline_info
+ response_code = status.HTTP_200_OK
+ else:
+ api_response = {"error": f"Pipeline '{pipe_name}' not found"}
+ response_code = status.HTTP_404_NOT_FOUND
+
+ except TMException as err:
+ api_response = {"error": str(err)}
+ response_code = status.HTTP_404_NOT_FOUND
+ LOGGER.error(f"TrainingManager exception: {str(err)}")
+ except Exception as err:
+ api_response = {"error": "An unexpected error occurred"}
+ LOGGER.error(f"Unexpected error in get_pipeline_info: {str(err)}")
+
+ return APP.response_class(response=json.dumps(api_response),
+ status=response_code,
+ mimetype=MIMETYPE_JSON)
+
@APP.route('/trainingjob/pipelineNotification', methods=['POST'])
def pipeline_notification():
"""
mimetype=MIMETYPE_JSON)
+
+
@APP.route("/pipelines/<pipeline_name>/versions", methods=['GET'])
def get_versions_for_pipeline(pipeline_name):
"""