from trainingmgr.common.trainingmgr_config import TrainingMgrConfig
from trainingmgr.common.trainingmgr_util import response_for_training, check_key_in_dictionary,check_trainingjob_data, \
get_one_key, get_metrics, handle_async_feature_engineering_status_exception_case, get_one_word_status, check_trainingjob_data, \
- validate_trainingjob_name, get_pipelines_details, check_feature_group_data, get_feature_group_by_name, edit_feature_group_by_name
+ validate_trainingjob_name, check_feature_group_data, get_feature_group_by_name, edit_feature_group_by_name
from requests.models import Response
from trainingmgr import trainingmgr_main
from trainingmgr.common.tmgr_logger import TMLogger
except TMException as err:
assert str(err) == "The name of training job is invalid."
+@pytest.mark.skip("") #Following fxn has been migrated to PipelineMgr
class Test_get_pipelines_details:
# testing the get_all_pipeline service
def setup_method(self):
from trainingmgr.common.trainingmgr_operations import create_dme_filtered_data_job
from trainingmgr.schemas import ma, TrainingJobSchema , FeatureGroupSchema
from trainingmgr.db.trainingjob_db import get_all_versions_info_by_name
+from trainingmgr.constants.steps import Steps
ERROR_TYPE_KF_ADAPTER_JSON = "Kf adapter doesn't sends json type response"
MIMETYPE_JSON = "application/json"
return States.IN_PROGRESS.name
+def get_step_in_progress_state(steps_state):
+ '''
+ This function return the first step which is currently In-Progress state.
+ '''
+ for step in sorted(Steps, key=lambda x: x.value):
+ if steps_state[step.name] == States.IN_PROGRESS.name:
+ return step
+
+ return None
+
def check_trainingjob_data(trainingjob_name, json_data):
"""
This function checks validation for json_data dictionary and return tuple which conatins
isavailable = True
return isavailable
-# Handled by Pipeline_Manager (to be deleted in future)
-def get_pipelines_details(training_config_obj):
- logger=training_config_obj.logger
- try:
- kf_adapter_ip = training_config_obj.kf_adapter_ip
- kf_adapter_port = training_config_obj.kf_adapter_port
- if kf_adapter_ip!=None and kf_adapter_port!=None:
- url = 'http://' + str(kf_adapter_ip) + ':' + str(kf_adapter_port) + '/pipelines'
- logger.debug(url)
- response = requests.get(url)
- if response.headers['content-type'] != MIMETYPE_JSON:
- err_smg = ERROR_TYPE_KF_ADAPTER_JSON
- logger.error(err_smg)
- raise TMException(err_smg)
- except Exception as err:
- logger.error(str(err))
- return response.json()
def check_trainingjob_name_and_version(trainingjob_name, version):
if (re.fullmatch(PATTERN, trainingjob_name) and version.isnumeric()):
return True
return False
-# Handled by PipelineMgr (To be removed in future)
-def fetch_pipeline_info_by_name(training_config_obj, pipe_name):
- """
- This function returns the information for a specific pipeline
- """
- logger = training_config_obj.logger
- try:
- kf_adapter_ip = training_config_obj.kf_adapter_ip
- kf_adapter_port = training_config_obj.kf_adapter_port
- if kf_adapter_ip is not None and kf_adapter_port is not None:
- url = f'http://{kf_adapter_ip}:{kf_adapter_port}/pipelines'
-
- logger.debug(f"Requesting pipelines from: {url}")
- response = requests.get(url)
-
- if response.status_code == 200:
- if response.headers['content-type'] != MIMETYPE_JSON:
- err_msg = ERROR_TYPE_KF_ADAPTER_JSON
- logger.error(err_msg)
- raise TMException(err_msg)
-
- pipelines_data = response.json()
-
- for pipeline_info in pipelines_data.get('pipelines', []):
- if pipeline_info['display_name'] == pipe_name:
- return PipelineInfo(
- pipeline_id=pipeline_info['pipeline_id'],
- display_name=pipeline_info['display_name'],
- description=pipeline_info['description'],
- created_at=pipeline_info['created_at']
- ).to_dict()
-
- logger.warning(f"Pipeline '{pipe_name}' not found")
- return None
- else:
- err_msg = f"Unexpected response from KFAdapter: {response.status_code}"
- logger.error(err_msg)
- return TMException(err_msg)
-
- except requests.RequestException as err:
- err_msg = f"Error communicating with KFAdapter : {str(err)}"
- logger.error(err_msg)
- raise TMException(err_msg)
- except Exception as err:
- err_msg = f"Unexpected error in get_pipeline_info_by_name: {str(err)}"
- logger.error(err_msg)
- raise TMException(err_msg)
-
-# Moved to pipelineMgr, To be removed in future
-class PipelineInfo:
- def __init__(self, pipeline_id, display_name, description, created_at):
- self.pipeline_id = pipeline_id
- self.display_name = display_name
- self.description = description
- self.created_at = created_at
-
- def __repr__(self):
- return (f"PipelineInfo(pipeline_id={self.pipeline_id}, display_name={self.display_name}, "
- f"description={self.description}, created_at={self.created_at})")
-
- def to_dict(self):
- return {
- "pipeline_id":self.pipeline_id,
- "display_name": self.display_name,
- "description": self.description,
- "created_at": self.created_at
- }
-
from flask_api import status
from trainingmgr.common.exceptions_utls import TMException
from trainingmgr.service.pipeline_service import get_single_pipeline, get_all_pipeline_versions, get_all_pipelines, \
- upload_pipeline_service
+ upload_pipeline_service, list_experiments_service
from trainingmgr.common.trainingmgr_config import TrainingMgrConfig
import traceback
import re
return jsonify({'result': err.message}), status.HTTP_500_INTERNAL_SERVER_ERROR
except Exception as err:
return jsonify({'result': "Error in uploading Pipeline| Error : " + str(err)}), status.HTTP_500_INTERNAL_SERVER_ERROR
+
+
+@pipeline_controller.route("/pipelines/experiments", methods=['GET'])
+def get_all_experiment_names():
+ """
+ Function handling rest endpoint to get all experiment names.
+
+ Args in function:
+ none
+
+ Args in json:
+ no json required
+
+ Returns:
+ json:
+ experiment_names : list
+ list containing all experiment names(as str).
+ status code:
+ HTTP status code 200
+
+ Exceptions:
+ all exception are provided with exception message and HTTP status code.
+ """
+ LOGGER.debug("request for getting all experiment names is come.")
+ try:
+ experiment_names = list_experiments_service()
+ return jsonify(experiment_names), status.HTTP_200_OK
+ except Exception as err:
+ LOGGER.error(str(err))
+ return jsonify({"Exception": str(err)}), status.HTTP_500_INTERNAL_SERVER_ERROR
\ No newline at end of file
from trainingmgr.common.exceptions_utls import TMException
from flask_api import status
import requests
-
+import json
LOGGER = TrainingMgrConfig().logger
LOGGER.error(err_msg)
raise TMException(err_msg)
-
def get_all_pipeline_versions(self, pipeline_name):
"""
This function returns the version-list for input pipeline
except Exception as err:
err_msg = f"Unexpected error in upload_pipeline_file: {str(err)}"
LOGGER.error(err_msg)
+ raise TMException(err_msg)
+
+ # To check: trainingjob_name needs to be changed to trainingjobId or not
+ def start_training(self, training_details, trainingjob_name):
+ """
+ This function calls kf_adapter module to start pipeline of trainingjob_name training and returns
+ response which is gotten by calling kf adapter module.
+ """
+ try:
+ LOGGER.debug('Calling kf_adapter for pipeline run for '+trainingjob_name)
+ LOGGER.debug('Will send to kf_adapter: '+json.dumps(training_details))
+ url = f'http://{self.kf_adapter_ip}:{self.kf_adapter_port}/trainingjobs/{trainingjob_name}/execution'#NOSONAR
+ LOGGER.debug(url)
+ response = requests.post(url,
+ data=json.dumps(training_details),
+ headers={'content-type': MIMETYPE_JSON,
+ 'Accept-Charset': 'UTF-8'})
+
+ return response
+ except Exception as err:
+ err_msg = f"Unexpected error in start_training: {str(err)}"
+ LOGGER.error(err_msg)
+ raise TMException(err_msg)
+
+ def terminate_training(self, run_id):
+ try:
+ LOGGER.debug('terminate training for run_id : ' + str(run_id))
+ url = f'http://{self.kf_adapter_ip}:{self.kf_adapter_port}/runs/{run_id}'
+ LOGGER.debug("Terminate Training API : " + url)
+ response = requests.delete(url)
+ print("Deletion-Response : ", response)
+ return response
+ except Exception as err:
+ err_msg = f"Unexpected error in terminate_training: {str(err)}"
+ LOGGER.error(err_msg)
+ raise TMException(err_msg)
+
+ def get_experiments(self):
+ try:
+ url = f'http://{self.kf_adapter_ip}:{self.kf_adapter_port}/experiments'
+ LOGGER.debug("Get Experiments API : " + url)
+ response = requests.get(url)
+ if response.headers['content-type'] != MIMETYPE_JSON:
+ raise TMException(ERROR_TYPE_KF_ADAPTER_JSON)
+ return response.json()
+ except Exception as err:
+ err_msg = f"Unexpected error in get_experiments: {str(err)}"
+ LOGGER.error(err_msg)
raise TMException(err_msg)
\ No newline at end of file
#
# ==================================================================================
from trainingmgr.common.trainingmgr_config import TrainingMgrConfig
-from trainingmgr.db.pipeline_mgr import PipelineMgr
+from trainingmgr.pipeline.pipeline_mgr import PipelineMgr
from trainingmgr.models.pipeline_info import PipelineInfo
from werkzeug.utils import secure_filename
import os
LOGGER.debug("Deleting %s", uploaded_file_path)
if uploaded_file_path != None:
os.remove(uploaded_file_path)
+
+
+def start_training_service(training_details, trainingjob_name):
+ return pipelineMgrObj.start_training(training_details, trainingjob_name)
+
+def terminate_training_service(run_id):
+ return pipelineMgrObj.terminate_training(run_id)
+
+def list_experiments_service():
+ experiment_dict = pipelineMgrObj.get_experiments()
+ return list(experiment_dict.keys())
# limitations under the License.
#
# ==================================================================================
+import json
from trainingmgr.db.trainingjob_db import delete_trainingjob_by_id, create_trainingjob, get_trainingjob, get_trainingjob_by_modelId_db, \
-change_steps_state
+change_steps_state, change_field_value
from trainingmgr.common.exceptions_utls import DBException, TMException
from trainingmgr.schemas import TrainingJobSchema
-
+from trainingmgr.common.trainingmgr_util import get_one_word_status, get_step_in_progress_state
+from trainingmgr.constants.steps import Steps
+from trainingmgr.constants.states import States
+from trainingmgr.service.pipeline_service import terminate_training_service
trainingJobSchema = TrainingJobSchema()
trainingJobsSchema = TrainingJobSchema(many=True)
"""
try:
- #TODO: cancel training job from kubeflow training
- return delete_trainingjob_by_id(id=training_job_id)
+ # Signal Deletion in Progress
+ tj = get_trainingjob(training_job_id)
+ # print("Run Id = ", tj.run_id, " -- ", tj.run_id is None)
+ change_field_value(training_job_id, "deletion_in_progress", True)
+ # isDeleted = True
+ isDeleted = delete_trainingjob_by_id(id=training_job_id)
+ if isDeleted:
+ steps_state = json.loads(tj.steps_state.states)
+ overall_status = get_one_word_status(steps_state)
+ if overall_status == States.IN_PROGRESS.name:
+ step_in_progress_state = get_step_in_progress_state(steps_state)
+ if step_in_progress_state == Steps.DATA_EXTRACTION:
+ pass
+ # TODO: Remove the job from DATAEXTRACTION_JOBS_CACHE to signal not to check its status
+ # with LOCK:
+ # DATAEXTRACTION_JOBS_CACHE.pop(trainingjob_name)
+ elif (step_in_progress_state == Steps.TRAINING or (step_in_progress_state == Steps.DATA_EXTRACTION_AND_TRAINING and tj.run_id is not None)):
+ # Signal the Kf-Adapter to terminate the training
+ response = terminate_training_service(tj.run_id)
+ print("Deletion-Response : ", response)
+ return True
+ else:
+ return False
except Exception as err :
raise DBException(f"delete_trainining_job failed with exception : {str(err)}")
+
def get_trainingjob_by_modelId(model_id):
try:
trainingjob = get_trainingjob_by_modelId_db(model_id)
check_key_in_dictionary, get_one_key, \
response_for_training, get_metrics, \
handle_async_feature_engineering_status_exception_case, \
- validate_trainingjob_name, get_pipelines_details, check_feature_group_data, check_trainingjob_name_and_version, check_trainingjob_name_or_featuregroup_name, \
- get_feature_group_by_name, edit_feature_group_by_name, fetch_pipeline_info_by_name
+ validate_trainingjob_name, check_feature_group_data, check_trainingjob_name_and_version, check_trainingjob_name_or_featuregroup_name, \
+ get_feature_group_by_name, edit_feature_group_by_name
from trainingmgr.common.exceptions_utls import APIException,TMException
from trainingmgr.constants.steps import Steps
from trainingmgr.constants.states import States
from trainingmgr.controller.trainingjob_controller import training_job_controller
from trainingmgr.controller.pipeline_controller import pipeline_controller
from trainingmgr.common.trainingConfig_parser import validateTrainingConfig, getField
+from trainingmgr.service.pipeline_service import start_training_service
APP = Flask(__name__)
TRAININGMGR_CONFIG_OBJ = TrainingMgrConfig()
arguments[key] = str(val)
LOGGER.debug(arguments)
# Experiment name is harded to be Default
- dict_data = {
+ training_details = {
"pipeline_name": getField(trainingjob.training_config, "pipeline_name"), "experiment_name": 'Default',
"arguments": arguments, "pipeline_version": getField(trainingjob.training_config, "pipeline_version")
}
-
- response = training_start(TRAININGMGR_CONFIG_OBJ, dict_data, trainingjob_name)
+
+ response = start_training_service(training_details, trainingjob_name)
if ( response.headers['content-type'] != MIMETYPE_JSON
or response.status_code != status.HTTP_200_OK ):
err_msg = "Kf adapter invalid content-type or status_code for " + trainingjob_name
status=status.HTTP_200_OK,
mimetype=MIMETYPE_JSON)
-# Moved to Pipeline_controller (To be deleted in future)
-@APP.route('/pipelines/<pipe_name>', methods=['GET'])
-def get_pipeline_info_by_name(pipe_name):
- """
- Function handling rest endpoint to get information about a specific pipeline.
- Args in function:
- pipe_name : str
- name of pipeline.
- Args in json:
- no json required
- Returns:
- json:
- pipeline_info : dict
- Dictionary containing detailed information about the specified pipeline.
- status code:
- HTTP status code 200 if successful, 404 if pipeline not found, or 500 for server errors.
- Exceptions:
- all exceptions are provided with exception message and HTTP status code.
- """
- api_response = {}
- LOGGER.debug(f"Request to get information for pipeline: {pipe_name}")
- response_code = status.HTTP_500_INTERNAL_SERVER_ERROR
-
- try:
- pipeline_info = fetch_pipeline_info_by_name(TRAININGMGR_CONFIG_OBJ, pipe_name)
- if pipeline_info:
- api_response = {"pipeline_info":pipeline_info}
- response_code = status.HTTP_200_OK
- else:
- api_response = {"error": f"Pipeline '{pipe_name}' not found"}
- response_code = status.HTTP_404_NOT_FOUND
-
- except TMException as err:
- api_response = {"error": str(err)}
- response_code = status.HTTP_404_NOT_FOUND
- LOGGER.error(f"TrainingManager exception: {str(err)}")
- except Exception as err:
- api_response = {"error": "An unexpected error occurred"}
- LOGGER.error(f"Unexpected error in get_pipeline_info: {str(err)}")
-
- return APP.response_class(response=json.dumps(api_response),
- status=response_code,
- mimetype=MIMETYPE_JSON)
-
+# Will be migrated to pipline Mgr in next iteration
@APP.route('/trainingjob/pipelineNotification', methods=['POST'])
def pipeline_notification():
"""
status=response_code,
mimetype=MIMETYPE_JSON)
-# Moved to Pipeline_controller (To be deleted in future)
-@APP.route("/pipelines/<pipe_name>/upload", methods=['POST'])
-def upload_pipeline(pipe_name):
- """
- Function handling rest endpoint to upload pipeline.
-
- Args in function:
- pipe_name: str
- name of pipeline
-
- Args in json:
- no json required
-
- but file is required
-
- Returns:
- json:
- result: str
- result message
- status code:
- HTTP status code 200
-
- Exceptions:
- all exception are provided with exception message and HTTP status code.
- """
- LOGGER.debug("Request to upload pipeline.")
- result_string = None
- result_code = None
- uploaded_file_path = None
- try:
- LOGGER.debug(str(request))
- LOGGER.debug(str(request.files))
- if 'file' in request.files:
- uploaded_file = request.files['file']
- else:
- result_string = "Didn't get file"
- raise ValueError("file not found in request.files")
-
- if not check_trainingjob_name_or_featuregroup_name(pipe_name):
- err_msg="the pipeline name is not valid"
- raise TMException(err_msg)
- LOGGER.debug("Uploading received for %s", uploaded_file.filename)
- if uploaded_file.filename != '':
- uploaded_file_path = "/tmp/" + secure_filename(uploaded_file.filename)
- uploaded_file.save(uploaded_file_path)
- LOGGER.debug("File uploaded :%s", uploaded_file_path)
- kf_adapter_ip = TRAININGMGR_CONFIG_OBJ.kf_adapter_ip
- kf_adapter_port = TRAININGMGR_CONFIG_OBJ.kf_adapter_port
- if kf_adapter_ip!=None and kf_adapter_port!=None:
- url = 'http://' + str(kf_adapter_ip) + ':' + str(kf_adapter_port) + \
- '/pipelineIds/' + pipe_name
-
- description = ''
- if 'description' in request.form:
- description = request.form['description']
- if uploaded_file_path != None:
- with open(uploaded_file_path, 'rb') as file:
- files = {'file': file.read()}
-
- resp = requests.post(url, files=files, data={"description": description})
- LOGGER.debug(resp.text)
- if resp.status_code == status.HTTP_200_OK:
- LOGGER.debug("Pipeline uploaded :%s", pipe_name)
- result_string = "Pipeline uploaded " + pipe_name
- result_code = status.HTTP_200_OK
- else:
- LOGGER.error(resp.json()["message"])
- result_string = resp.json()["message"]
- result_code = status.HTTP_500_INTERNAL_SERVER_ERROR
- else:
- result_string = "File name not found"
- raise ValueError("filename is not found in request.files")
- except ValueError:
- tbk = traceback.format_exc()
- LOGGER.error(tbk)
- result_code = status.HTTP_500_INTERNAL_SERVER_ERROR
- result_string = "Error while uploading pipeline"
- except TMException:
- tbk = traceback.format_exc()
- LOGGER.error(tbk)
- result_code = status.HTTP_500_INTERNAL_SERVER_ERROR
- result_string = "Pipeline name is not of valid format"
- except Exception:
- tbk = traceback.format_exc()
- LOGGER.error(tbk)
- result_code = status.HTTP_500_INTERNAL_SERVER_ERROR
- result_string = "Error while uploading pipeline cause"
-
- if uploaded_file_path and os.path.isfile(uploaded_file_path):
- LOGGER.debug("Deleting %s", uploaded_file_path)
- if uploaded_file_path != None:
- os.remove(uploaded_file_path)
-
- LOGGER.debug("Responding to Client with %d %s", result_code, result_string)
- return APP.response_class(response=json.dumps({'result': result_string}),
- status=result_code,
- mimetype=MIMETYPE_JSON)
-
-
-# Moved to Pipeline_controller (To be deleted in future)
-@APP.route("/pipelines/<pipeline_name>/versions", methods=['GET'])
-def get_versions_for_pipeline(pipeline_name):
- """
- Function handling rest endpoint to get versions of given pipeline name.
-
- Args in function:
- pipeline_name : str
- name of pipeline.
-
- Args in json:
- no json required
-
- Returns:
- json:
- versions_list : list
- list containing all versions(as str)
- status code:
- HTTP status code 200
-
- Exceptions:
- all exception are provided with exception message and HTTP status code.
- """
- valid_pipeline=""
- api_response = {}
- LOGGER.debug("Request to get all version for given pipeline(" + pipeline_name + ").")
- response_code = status.HTTP_500_INTERNAL_SERVER_ERROR
- try:
- pipelines = get_pipelines_details(TRAININGMGR_CONFIG_OBJ)
- for pipeline in pipelines['pipelines']:
- if pipeline['display_name'] == pipeline_name:
- valid_pipeline = pipeline['display_name']
- break
- if valid_pipeline == "":
- raise TMException("Pipeline name not present")
- kf_adapter_ip = TRAININGMGR_CONFIG_OBJ.kf_adapter_ip
- kf_adapter_port = TRAININGMGR_CONFIG_OBJ.kf_adapter_port
- if kf_adapter_ip!=None and kf_adapter_port!=None :
- url = 'http://' + str(kf_adapter_ip) + ':' + str(
- kf_adapter_port) + '/pipelines/' + valid_pipeline + \
- '/versions'
- LOGGER.debug("URL:" + url)
- response = requests.get(url)
- if response.headers['content-type'] != MIMETYPE_JSON:
- raise TMException(ERROR_TYPE_KF_ADAPTER_JSON)
- api_response = {"versions_list": response.json()['versions_list']}
- response_code = status.HTTP_200_OK
- except Exception as err:
- api_response = {"Exception": str(err)}
- LOGGER.error(str(err))
- return APP.response_class(response=json.dumps(api_response),
- status=response_code,
- mimetype=MIMETYPE_JSON)
-
-# Moved to Pipeline_controller (To be deleted in future)
-@APP.route('/pipelines', methods=['GET'])
-def get_pipelines():
- """
- Function handling rest endpoint to get all pipeline names.
-
- Args in function:
- none
-
- Args in json:
- no json required
-
- Returns:
- json:
- pipeline_names : list
- list containing all pipeline names(as str).
- status code:
- HTTP status code 200
-
- Exceptions:
- all exception are provided with exception message and HTTP status code.
- """
- LOGGER.debug("Request to get all getting all pipeline names.")
- api_response = {}
- response_code = status.HTTP_500_INTERNAL_SERVER_ERROR
- try:
- pipelines = get_pipelines_details(TRAININGMGR_CONFIG_OBJ)
- api_response = pipelines
- response_code = status.HTTP_200_OK
- except Exception as err:
- LOGGER.error(str(err))
- api_response = {"Exception": str(err)}
- return APP.response_class(response=json.dumps(api_response),status=response_code,mimetype=MIMETYPE_JSON)
-
+# Moved to pipelineMgr (to be deleted in future)
@APP.route('/experiments', methods=['GET'])
def get_all_experiment_names():
"""