From: rajdeep11 Date: Wed, 26 Jun 2024 11:33:51 +0000 (+0530) Subject: changes in the qoe pipeline code wrt updating kubeflow X-Git-Tag: 2.0.0~2 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=14166e5f0916bf8198755bfce7f9d038c23b8524;p=portal%2Faiml-dashboard.git changes in the qoe pipeline code wrt updating kubeflow Issue-id: AIMLFW-86 Change-Id: I4dce0d78029ef94e323f9784aed0896417ead790 Signed-off-by: rajdeep11 --- diff --git a/kf-pipelines/qoe-pipeline.ipynb b/kf-pipelines/qoe-pipeline.ipynb index 5d35415..78ba11b 100644 --- a/kf-pipelines/qoe-pipeline.ipynb +++ b/kf-pipelines/qoe-pipeline.ipynb @@ -7,9 +7,9 @@ "outputs": [], "source": [ "import kfp\n", - "import kfp.components as components\n", "import kfp.dsl as dsl\n", - "from kfp.components import InputPath, OutputPath" + "from kfp.dsl import InputPath, OutputPath\n", + "from kfp.dsl import component as component" ] }, { @@ -18,6 +18,16 @@ "metadata": {}, "outputs": [], "source": [ + "BASE_IMAGE = \"traininghost/pipelineimage:latest\"" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "@component(base_image=BASE_IMAGE)\n", "def train_export_model(trainingjobName: str, epochs: str, version: str):\n", " \n", " import tensorflow as tf\n", @@ -36,13 +46,12 @@ " \n", " fs_sdk = FeatureStoreSdk()\n", " mm_sdk = ModelMetricsSdk()\n", - " \n", - " features = fs_sdk.get_features(trainingjobName, ['measTimeStampRf', 'nrCellIdentity', 'pdcpBytesDl','pdcpBytesUl'])\n", + " print(\"job name is: \", trainingjobName)\n", + " features = fs_sdk.get_features(trainingjobName, ['pdcpBytesDl','pdcpBytesUl'])\n", " print(\"Dataframe:\")\n", " print(features)\n", "\n", - " features_cellc2b2 = features[features['nrCellIdentity'] == \"c2/B2\"]\n", - " print(\"Dataframe for cell : c2/B2\")\n", + " features_cellc2b2 = features\n", " print(features_cellc2b2)\n", " print('Previous Data Types are --> ', features_cellc2b2.dtypes)\n", " features_cellc2b2[\"pdcpBytesDl\"] = pd.to_numeric(features_cellc2b2[\"pdcpBytesDl\"], downcast=\"float\")\n", @@ -94,16 +103,7 @@ " data['metrics'].append({'Accuracy': str(np.mean(np.absolute(np.asarray(xx)-np.asarray(yy))<5))})\n", " \n", " mm_sdk.upload_metrics(data, trainingjobName, version)\n", - " mm_sdk.upload_model(\"./\", trainingjobName, version)" - ] - }, - { - "cell_type": "code", - "execution_count": 3, - "metadata": {}, - "outputs": [], - "source": [ - "BASE_IMAGE = \"traininghost/pipelineimage:latest\"" + " mm_sdk.upload_model(\"./\", trainingjobName, version)\n" ] }, { @@ -112,11 +112,16 @@ "metadata": {}, "outputs": [], "source": [ - "def train_and_export(trainingjobName: str, epochs: str, version: str):\n", - " trainOp = components.func_to_container_op(train_export_model, base_image=BASE_IMAGE)(trainingjobName, epochs,version)\n", - " # Below line to disable caching of pipeline step\n", - " trainOp.execution_options.caching_strategy.max_cache_staleness = \"P0D\"\n", - " trainOp.container.set_image_pull_policy(\"IfNotPresent\")" + "@dsl.pipeline(\n", + " name=\"qoe Pipeline\",\n", + " description=\"qoe\",\n", + ")\n", + "def super_model_pipeline( \n", + " trainingjob_name: str, epochs: str, version: str):\n", + " \n", + " trainop=train_export_model(trainingjobName=trainingjob_name, epochs=epochs, version=version)\n", + " trainop.set_caching_options(False)\n", + " kubernetes.set_image_pull_policy(trainop, \"IfNotPresent\")" ] }, { @@ -142,10 +147,10 @@ "outputs": [], "source": [ "pipeline_func = super_model_pipeline\n", - "file_name = \"qoe_model_pipeline\"\n", + "file_name = \"qoe_model_pipeline3\"\n", "\n", "kfp.compiler.Compiler().compile(pipeline_func, \n", - " '{}.zip'.format(file_name))" + " '{}.yaml'.format(file_name))" ] }, { @@ -167,7 +172,7 @@ "source": [ "import requests\n", "pipeline_name=\"qoe Pipeline\"\n", - "pipeline_file = file_name+'.zip'\n", + "pipeline_file = file_name+'.yaml'\n", "requests.post(\"http://tm.traininghost:32002/pipelines/{}/upload\".format(pipeline_name), files={'file':open(pipeline_file,'rb')})" ] },