"""
self.logger.debug("run_kf_pipeline Entered")
- job_name=arguments["trainingjob_name"]
+ job_id=arguments["trainingjob_id"]
+ featuregroup_name = arguments["featuregroup_name"]
+ featurepath = featuregroup_name+"_"+job_id
req_dict={
- "trainingjob_name":job_name,
+ "featurepath":featurepath,
"epochs": arguments["epochs"],
- "version": str(arguments["version"])
+ "modelname": str(arguments["modelName"]),
+ "modelversion": str(arguments["modelVersion"]),
+ "artifactversion":str(arguments["artifactVersion"])
}
+ self.logger.debug("run_kf_pipeline Arguments: "+str(req_dict))
try:
run = self.kfp_client.run_pipeline(exp_id, job_name="testjob"+\
random_suffix(),
return jsonify(pipe_dict), status.HTTP_200_OK
-@APP.route('/trainingjobs/<trainingjob_name>/execution', methods=['POST'])
-def run_pipeline(trainingjob_name):
+@APP.route('/trainingjobs/<trainingjob_id>/execution', methods=['POST'])
+def run_pipeline(trainingjob_id):
"""Function handling HTTP POST rest endpoint to execute pipeline based on trainingjob name
Args:
"""
errcode = None
err_string = None
- LOGGER.debug("run_pipeline for %s", trainingjob_name)
+ LOGGER.debug("run_pipeline for %s", trainingjob_id)
run_dict = {}
try:
errcode = status.HTTP_400_BAD_REQUEST
req = request.json
LOGGER.debug(req)
if("arguments" in req.keys() and "pipeline_name" in req.keys() and \
- "experiment_name" in req.keys()):
+ "experiment_name" in req.keys() and "pipeline_version" in req.keys()):
arguments = req["arguments"]
pipe_name = req["pipeline_name"]
exp_name = req["experiment_name"]
LOGGER.debug("Pipeline ID = " + pipe_id)
version_id = KFCONNECT_KF_OBJ.get_kf_pipeline_version_id(pipe_id, pipeline_version_name)
+
+ LOGGER.debug("version id is: "+ version_id)
LOGGER.debug("Running pipeline")
+
run = KFCONNECT_KF_OBJ.run_kf_pipeline(exp.experiment_id, pipe_id, arguments, version_id)
+
LOGGER.debug("Run ID = %s", run.run_id)
- run_dict['trainingjob_name'] = trainingjob_name
+ run_dict['trainingjob_id'] = trainingjob_id
run_dict['run_id'] = run.run_id
run_dict['run_name'] = run.display_name
run_dict['experiment_name'] = 'Default'
if run.state == 'PENDING':
run_dict['run_status'] = "scheduled"
with kfadapter_conf.LOCK:
- kfadapter_conf.TRAINING_DICT[run.run_id] = trainingjob_name
+ kfadapter_conf.TRAINING_DICT[run.run_id] = trainingjob_id
else:
errcode = status.HTTP_400_BAD_REQUEST
err_string = 'Less arguments'
def test_run_kf_pipeline(self):
assert None == self.__KFCONNECT.run_kf_pipeline(exp_id='exp_id', pipeline_id='pipeline_id', arguments={
- "trainingjob_name":"job_name",
+ "trainingjob_id":"job_name",
"epochs": "epochs",
- "version": "version"}
+ "modelName": "name",
+ "modelVersion":"version",
+ "artifactVersion":"version",
+ "featuregroup_name":"fg_name"}
, version_id='version_id')