Working on creating a retraining pipeline 33/13733/1
authorSwaraj Kumar <swaraj.kumar@samsung.com>
Wed, 6 Nov 2024 12:25:04 +0000 (17:55 +0530)
committerSwaraj Kumar <swaraj.kumar@samsung.com>
Wed, 6 Nov 2024 12:25:04 +0000 (17:55 +0530)
Change-Id: Ia9eec0ee9253a030b33b30481bc6090c26581a14
Signed-off-by: Swaraj Kumar <swaraj.kumar@samsung.com>
kf-pipelines/qoe-pipeline-retrain-2.ipynb [new file with mode: 0644]

diff --git a/kf-pipelines/qoe-pipeline-retrain-2.ipynb b/kf-pipelines/qoe-pipeline-retrain-2.ipynb
new file mode 100644 (file)
index 0000000..958bd32
--- /dev/null
@@ -0,0 +1,285 @@
+{
+ "cells": [
+  {
+   "cell_type": "code",
+   "execution_count": 9,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "import kfp\n",
+    "import kfp.dsl as dsl\n",
+    "from kfp.dsl import InputPath, OutputPath\n",
+    "from kfp.dsl import component as component\n",
+    "from kfp import kubernetes"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 10,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "BASE_IMAGE = \"traininghost/pipelineimage:latest\""
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 11,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "@component(base_image=BASE_IMAGE,packages_to_install=['requests'])\n",
+    "def train_export_model(trainingjobName: str, epochs: str, version: str):\n",
+    "    \n",
+    "    import tensorflow as tf\n",
+    "    from numpy import array\n",
+    "    from tensorflow.keras.models import Sequential\n",
+    "    from tensorflow.keras.layers import Dense\n",
+    "    from tensorflow.keras.layers import Flatten, Dropout, Activation\n",
+    "    from tensorflow.keras.layers import LSTM\n",
+    "    import numpy as np\n",
+    "    import requests\n",
+    "    print(\"numpy version\")\n",
+    "    print(np.__version__)\n",
+    "    import pandas as pd\n",
+    "    import os\n",
+    "    import zipfile\n",
+    "    from featurestoresdk.feature_store_sdk import FeatureStoreSdk\n",
+    "    from modelmetricsdk.model_metrics_sdk import ModelMetricsSdk\n",
+    "    \n",
+    "    fs_sdk = FeatureStoreSdk()\n",
+    "    mm_sdk = ModelMetricsSdk()\n",
+    "    print(\"job name is: \", trainingjobName)\n",
+    "    features = fs_sdk.get_features(trainingjobName, ['pdcpBytesDl','pdcpBytesUl'])\n",
+    "    print(\"Dataframe:\")\n",
+    "    print(features)\n",
+    "\n",
+    "    features_cellc2b2 = features\n",
+    "    print(features_cellc2b2)\n",
+    "    print('Previous Data Types are --> ', features_cellc2b2.dtypes)\n",
+    "    features_cellc2b2[\"pdcpBytesDl\"] = pd.to_numeric(features_cellc2b2[\"pdcpBytesDl\"], downcast=\"float\")\n",
+    "    features_cellc2b2[\"pdcpBytesUl\"] = pd.to_numeric(features_cellc2b2[\"pdcpBytesUl\"], downcast=\"float\")\n",
+    "    print('New Data Types are --> ', features_cellc2b2.dtypes)\n",
+    "    \n",
+    "    features_cellc2b2 = features_cellc2b2[['pdcpBytesDl', 'pdcpBytesUl']]\n",
+    "    \n",
+    "    def split_series(series, n_past, n_future):\n",
+    "        X, y = list(), list()\n",
+    "        for window_start in range(len(series)):\n",
+    "            past_end = window_start + n_past\n",
+    "            future_end = past_end + n_future\n",
+    "            if future_end > len(series):\n",
+    "                break\n",
+    "            # slicing the past and future parts of the window\n",
+    "            past, future = series[window_start:past_end, :], series[past_end:future_end, :]\n",
+    "            X.append(past)\n",
+    "            y.append(future)\n",
+    "        return np.array(X), np.array(y)\n",
+    "    X, y = split_series(features_cellc2b2.values,10, 1)\n",
+    "    X = X.reshape((X.shape[0], X.shape[1],X.shape[2]))\n",
+    "    y = y.reshape((y.shape[0], y.shape[2]))\n",
+    "    print(X.shape)\n",
+    "    print(y.shape)\n",
+    "    \n",
+    "    print(\"Loading the saved model\")\n",
+    "    print(os.listdir(os.getcwd()))\n",
+    "\n",
+    "    # Download the model zip file\n",
+    "    model_url= f\"http://tm.traininghost:32002/model/{trainingjobName}/{version}/Model.zip\"\n",
+    "    print(f\"Downloading model from :{model_url}\")\n",
+    "    response = requests.get(model_url)\n",
+    "\n",
+    "    print(\"Response generated: \" + str(response))\n",
+    "\n",
+    "    # Check if the request was successful\n",
+    "    if response.status_code == 200:\n",
+    "        local_file_path = 'Model.zip'\n",
+    "        with open(local_file_path, 'wb') as file:\n",
+    "            file.write(response.content)\n",
+    "        print(f'Downloaded file saved to {local_file_path}')\n",
+    "    else:\n",
+    "        print('Failed to download the file')\n",
+    "\n",
+    "    print(os.listdir(os.getcwd()))\n",
+    "\n",
+    "    # Extract the zip file\n",
+    "    zip_file_path = \"./Model.zip\"\n",
+    "    extract_to_dir = \"./Model\"\n",
+    "\n",
+    "    if not os.path.exists(extract_to_dir):\n",
+    "        os.makedirs(extract_to_dir)\n",
+    "\n",
+    "    with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:\n",
+    "        zip_ref.extractall(extract_to_dir)\n",
+    "\n",
+    "    # Delete the zip file after extraction\n",
+    "    if os.path.exists(zip_file_path):\n",
+    "        os.remove(zip_file_path)\n",
+    "        print(f'Deleted zip file: {zip_file_path}')\n",
+    "    else:\n",
+    "        print(f'Zip file not found: {zip_file_path}')\n",
+    "\n",
+    "    # Load the model in SavedModel format\n",
+    "    model_path = \"./Model/1\"  # Path to the directory containing the saved model\n",
+    "    model = tf.keras.models.load_model(model_path)\n",
+    "    model.compile(loss='mse', optimizer='adam', metrics=['mse'])\n",
+    "    model.summary()\n",
+    "\n",
+    "    # Define a directory to save checkpoints\n",
+    "    checkpoint_dir = \"./checkpoints\"\n",
+    "    if not os.path.exists(checkpoint_dir):\n",
+    "        os.makedirs(checkpoint_dir)\n",
+    "\n",
+    "    # Define a ModelCheckpoint callback\n",
+    "    checkpoint_path = os.path.join(checkpoint_dir, \"model_epoch_{epoch:02d}_val_loss_{val_loss:.2f}.h5\")\n",
+    "    checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(\n",
+    "        filepath=checkpoint_path,   # Save checkpoint file path, this file is not saved finaly\n",
+    "        monitor='val_loss',         # Monitor validation loss, can be train loss also \n",
+    "        save_best_only=True,        # Save only the best model based on validation loss\n",
+    "        save_weights_only=False,    # Save the entire model, not just weights\n",
+    "        mode='min',                 # Minimizing the validation loss\n",
+    "        verbose=0                   # set to 1 if want to print info when a new checkpoint is saved\n",
+    "    )\n",
+    "\n",
+    "    # Train the model with checkpointing\n",
+    "    print(\"Retraining the model with checkpoints...\")\n",
+    "    history = model.fit(\n",
+    "        X, \n",
+    "        y, \n",
+    "        batch_size=10, \n",
+    "        epochs=int(epochs), \n",
+    "        validation_split=0.2, \n",
+    "        callbacks=[checkpoint_callback]  # Add the callback here\n",
+    "    )\n",
+    "    \n",
+    "    yhat = model.predict(X, verbose = 0)\n",
+    "    xx = y\n",
+    "    yy = yhat\n",
+    "    \n",
+    "    retrained_model_path = \"./retrain\"\n",
+    "    if not os.path.exists(retrained_model_path):\n",
+    "        os.makedirs(retrained_model_path)\n",
+    "\n",
+    "    # Save the retrained model\n",
+    "    model.save(retrained_model_path)\n",
+    "    print(f\"Retrained model saved at {retrained_model_path}\")\n",
+    "\n",
+    "    import json\n",
+    "    data = {}\n",
+    "    data['metrics'] = []\n",
+    "    data['metrics'].append({'Accuracy': str(np.mean(np.absolute(np.asarray(xx)-np.asarray(yy))<5))})\n",
+    "    \n",
+    "    mm_sdk.upload_metrics(data, trainingjobName, version)\n",
+    "    mm_sdk.upload_model(\"./retrain\", trainingjobName, version)\n"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 12,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "@dsl.pipeline(\n",
+    "    name=\"qoe Pipeline retrain\",\n",
+    "    description=\"qoe retrain\",\n",
+    ")\n",
+    "def super_model_pipeline( \n",
+    "    trainingjob_name: str, epochs: str, version: str):\n",
+    "    \n",
+    "    trainop=train_export_model(trainingjobName=trainingjob_name, epochs=epochs, version=version)\n",
+    "    trainop.set_caching_options(False)\n",
+    "    kubernetes.set_image_pull_policy(trainop, \"IfNotPresent\")"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 13,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "pipeline_func = super_model_pipeline\n",
+    "file_name = \"qoe_model_pipeline_retrain_2\"\n",
+    "\n",
+    "kfp.compiler.Compiler().compile(pipeline_func,  \n",
+    "  '{}.yaml'.format(file_name))"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": [
+    "import requests\n",
+    "pipeline_name=\"qoe_Pipeline_retrain_2\"\n",
+    "pipeline_file = file_name+'.yaml'\n",
+    "requests.post(\"http://tm.traininghost:32002/pipelines/{}/upload\".format(pipeline_name), files={'file':open(pipeline_file,'rb')})"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": 14,
+   "metadata": {},
+   "outputs": [
+    {
+     "data": {
+      "text/html": [
+       "<a href=\"http://ml-pipeline-ui.kubeflow:80/#/experiments/details/c7714054-ee78-484d-ae94-086901f98693\" target=\"_blank\" >Experiment details</a>."
+      ],
+      "text/plain": [
+       "<IPython.core.display.HTML object>"
+      ]
+     },
+     "metadata": {},
+     "output_type": "display_data"
+    },
+    {
+     "data": {
+      "text/html": [
+       "<a href=\"http://ml-pipeline-ui.kubeflow:80/#/runs/details/86ef1c74-2978-4a1d-9743-831b9b9c9dfe\" target=\"_blank\" >Run details</a>."
+      ],
+      "text/plain": [
+       "<IPython.core.display.HTML object>"
+      ]
+     },
+     "metadata": {},
+     "output_type": "display_data"
+    }
+   ],
+   "source": [
+    "from kfp.client import Client\n",
+    "client = Client(host='http://ml-pipeline-ui.kubeflow:80')\n",
+    "# client.upload_pipeline_version(pipeline_package_path='pipeline.yaml',pipeline_version_name='v2', pipeline_name='sample-pipeline')\n",
+    "result = client.create_run_from_pipeline_package('qoe_model_pipeline_retrain_2.yaml', arguments={'trainingjob_name':'testing_influxdb_301', 'epochs':'5', 'version':'1'})"
+   ]
+  },
+  {
+   "cell_type": "code",
+   "execution_count": null,
+   "metadata": {},
+   "outputs": [],
+   "source": []
+  }
+ ],
+ "metadata": {
+  "kernelspec": {
+   "display_name": "Python 3 (ipykernel)",
+   "language": "python",
+   "name": "python3"
+  },
+  "language_info": {
+   "codemirror_mode": {
+    "name": "ipython",
+    "version": 3
+   },
+   "file_extension": ".py",
+   "mimetype": "text/x-python",
+   "name": "python",
+   "nbconvert_exporter": "python",
+   "pygments_lexer": "ipython3",
+   "version": "3.9.6"
+  }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 4
+}