"outputs": [],
"source": [
"import kfp\n",
- "import kfp.components as components\n",
"import kfp.dsl as dsl\n",
- "from kfp.components import InputPath, OutputPath"
+ "from kfp.dsl import InputPath, OutputPath\n",
+ "from kfp.dsl import component as component"
]
},
{
"metadata": {},
"outputs": [],
"source": [
+ "BASE_IMAGE = \"traininghost/pipelineimage:latest\""
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 3,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "@component(base_image=BASE_IMAGE)\n",
"def train_export_model(trainingjobName: str, epochs: str, version: str):\n",
" \n",
" import tensorflow as tf\n",
" \n",
" fs_sdk = FeatureStoreSdk()\n",
" mm_sdk = ModelMetricsSdk()\n",
- " \n",
- " features = fs_sdk.get_features(trainingjobName, ['measTimeStampRf', 'nrCellIdentity', 'pdcpBytesDl','pdcpBytesUl'])\n",
+ " print(\"job name is: \", trainingjobName)\n",
+ " features = fs_sdk.get_features(trainingjobName, ['pdcpBytesDl','pdcpBytesUl'])\n",
" print(\"Dataframe:\")\n",
" print(features)\n",
"\n",
- " features_cellc2b2 = features[features['nrCellIdentity'] == \"c2/B2\"]\n",
- " print(\"Dataframe for cell : c2/B2\")\n",
+ " features_cellc2b2 = features\n",
" print(features_cellc2b2)\n",
" print('Previous Data Types are --> ', features_cellc2b2.dtypes)\n",
" features_cellc2b2[\"pdcpBytesDl\"] = pd.to_numeric(features_cellc2b2[\"pdcpBytesDl\"], downcast=\"float\")\n",
" data['metrics'].append({'Accuracy': str(np.mean(np.absolute(np.asarray(xx)-np.asarray(yy))<5))})\n",
" \n",
" mm_sdk.upload_metrics(data, trainingjobName, version)\n",
- " mm_sdk.upload_model(\"./\", trainingjobName, version)"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 3,
- "metadata": {},
- "outputs": [],
- "source": [
- "BASE_IMAGE = \"traininghost/pipelineimage:latest\""
+ " mm_sdk.upload_model(\"./\", trainingjobName, version)\n"
]
},
{
"metadata": {},
"outputs": [],
"source": [
- "def train_and_export(trainingjobName: str, epochs: str, version: str):\n",
- " trainOp = components.func_to_container_op(train_export_model, base_image=BASE_IMAGE)(trainingjobName, epochs,version)\n",
- " # Below line to disable caching of pipeline step\n",
- " trainOp.execution_options.caching_strategy.max_cache_staleness = \"P0D\"\n",
- " trainOp.container.set_image_pull_policy(\"IfNotPresent\")"
+ "@dsl.pipeline(\n",
+ " name=\"qoe Pipeline\",\n",
+ " description=\"qoe\",\n",
+ ")\n",
+ "def super_model_pipeline( \n",
+ " trainingjob_name: str, epochs: str, version: str):\n",
+ " \n",
+ " trainop=train_export_model(trainingjobName=trainingjob_name, epochs=epochs, version=version)\n",
+ " trainop.set_caching_options(False)\n",
+ " kubernetes.set_image_pull_policy(trainop, \"IfNotPresent\")"
]
},
{
"outputs": [],
"source": [
"pipeline_func = super_model_pipeline\n",
- "file_name = \"qoe_model_pipeline\"\n",
+ "file_name = \"qoe_model_pipeline3\"\n",
"\n",
"kfp.compiler.Compiler().compile(pipeline_func, \n",
- " '{}.zip'.format(file_name))"
+ " '{}.yaml'.format(file_name))"
]
},
{
"source": [
"import requests\n",
"pipeline_name=\"qoe Pipeline\"\n",
- "pipeline_file = file_name+'.zip'\n",
+ "pipeline_file = file_name+'.yaml'\n",
"requests.post(\"http://tm.traininghost:32002/pipelines/{}/upload\".format(pipeline_name), files={'file':open(pipeline_file,'rb')})"
]
},