From: Swaraj Kumar Date: Wed, 6 Nov 2024 12:25:04 +0000 (+0530) Subject: Working on creating a retraining pipeline X-Git-Tag: 4.0.0~18^2 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=14c29fb0a9da1af61efa65f4f565b95009cc1389;p=portal%2Faiml-dashboard.git Working on creating a retraining pipeline Change-Id: Ia9eec0ee9253a030b33b30481bc6090c26581a14 Signed-off-by: Swaraj Kumar --- diff --git a/kf-pipelines/qoe-pipeline-retrain-2.ipynb b/kf-pipelines/qoe-pipeline-retrain-2.ipynb new file mode 100644 index 0000000..958bd32 --- /dev/null +++ b/kf-pipelines/qoe-pipeline-retrain-2.ipynb @@ -0,0 +1,285 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [], + "source": [ + "import kfp\n", + "import kfp.dsl as dsl\n", + "from kfp.dsl import InputPath, OutputPath\n", + "from kfp.dsl import component as component\n", + "from kfp import kubernetes" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [], + "source": [ + "BASE_IMAGE = \"traininghost/pipelineimage:latest\"" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [], + "source": [ + "@component(base_image=BASE_IMAGE,packages_to_install=['requests'])\n", + "def train_export_model(trainingjobName: str, epochs: str, version: str):\n", + " \n", + " import tensorflow as tf\n", + " from numpy import array\n", + " from tensorflow.keras.models import Sequential\n", + " from tensorflow.keras.layers import Dense\n", + " from tensorflow.keras.layers import Flatten, Dropout, Activation\n", + " from tensorflow.keras.layers import LSTM\n", + " import numpy as np\n", + " import requests\n", + " print(\"numpy version\")\n", + " print(np.__version__)\n", + " import pandas as pd\n", + " import os\n", + " import zipfile\n", + " from featurestoresdk.feature_store_sdk import FeatureStoreSdk\n", + " from modelmetricsdk.model_metrics_sdk import ModelMetricsSdk\n", + " \n", + " fs_sdk = FeatureStoreSdk()\n", + " mm_sdk = ModelMetricsSdk()\n", + " print(\"job name is: \", trainingjobName)\n", + " features = fs_sdk.get_features(trainingjobName, ['pdcpBytesDl','pdcpBytesUl'])\n", + " print(\"Dataframe:\")\n", + " print(features)\n", + "\n", + " features_cellc2b2 = features\n", + " print(features_cellc2b2)\n", + " print('Previous Data Types are --> ', features_cellc2b2.dtypes)\n", + " features_cellc2b2[\"pdcpBytesDl\"] = pd.to_numeric(features_cellc2b2[\"pdcpBytesDl\"], downcast=\"float\")\n", + " features_cellc2b2[\"pdcpBytesUl\"] = pd.to_numeric(features_cellc2b2[\"pdcpBytesUl\"], downcast=\"float\")\n", + " print('New Data Types are --> ', features_cellc2b2.dtypes)\n", + " \n", + " features_cellc2b2 = features_cellc2b2[['pdcpBytesDl', 'pdcpBytesUl']]\n", + " \n", + " def split_series(series, n_past, n_future):\n", + " X, y = list(), list()\n", + " for window_start in range(len(series)):\n", + " past_end = window_start + n_past\n", + " future_end = past_end + n_future\n", + " if future_end > len(series):\n", + " break\n", + " # slicing the past and future parts of the window\n", + " past, future = series[window_start:past_end, :], series[past_end:future_end, :]\n", + " X.append(past)\n", + " y.append(future)\n", + " return np.array(X), np.array(y)\n", + " X, y = split_series(features_cellc2b2.values,10, 1)\n", + " X = X.reshape((X.shape[0], X.shape[1],X.shape[2]))\n", + " y = y.reshape((y.shape[0], y.shape[2]))\n", + " print(X.shape)\n", + " print(y.shape)\n", + " \n", + " print(\"Loading the saved model\")\n", + " print(os.listdir(os.getcwd()))\n", + "\n", + " # Download the model zip file\n", + " model_url= f\"http://tm.traininghost:32002/model/{trainingjobName}/{version}/Model.zip\"\n", + " print(f\"Downloading model from :{model_url}\")\n", + " response = requests.get(model_url)\n", + "\n", + " print(\"Response generated: \" + str(response))\n", + "\n", + " # Check if the request was successful\n", + " if response.status_code == 200:\n", + " local_file_path = 'Model.zip'\n", + " with open(local_file_path, 'wb') as file:\n", + " file.write(response.content)\n", + " print(f'Downloaded file saved to {local_file_path}')\n", + " else:\n", + " print('Failed to download the file')\n", + "\n", + " print(os.listdir(os.getcwd()))\n", + "\n", + " # Extract the zip file\n", + " zip_file_path = \"./Model.zip\"\n", + " extract_to_dir = \"./Model\"\n", + "\n", + " if not os.path.exists(extract_to_dir):\n", + " os.makedirs(extract_to_dir)\n", + "\n", + " with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:\n", + " zip_ref.extractall(extract_to_dir)\n", + "\n", + " # Delete the zip file after extraction\n", + " if os.path.exists(zip_file_path):\n", + " os.remove(zip_file_path)\n", + " print(f'Deleted zip file: {zip_file_path}')\n", + " else:\n", + " print(f'Zip file not found: {zip_file_path}')\n", + "\n", + " # Load the model in SavedModel format\n", + " model_path = \"./Model/1\" # Path to the directory containing the saved model\n", + " model = tf.keras.models.load_model(model_path)\n", + " model.compile(loss='mse', optimizer='adam', metrics=['mse'])\n", + " model.summary()\n", + "\n", + " # Define a directory to save checkpoints\n", + " checkpoint_dir = \"./checkpoints\"\n", + " if not os.path.exists(checkpoint_dir):\n", + " os.makedirs(checkpoint_dir)\n", + "\n", + " # Define a ModelCheckpoint callback\n", + " checkpoint_path = os.path.join(checkpoint_dir, \"model_epoch_{epoch:02d}_val_loss_{val_loss:.2f}.h5\")\n", + " checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(\n", + " filepath=checkpoint_path, # Save checkpoint file path, this file is not saved finaly\n", + " monitor='val_loss', # Monitor validation loss, can be train loss also \n", + " save_best_only=True, # Save only the best model based on validation loss\n", + " save_weights_only=False, # Save the entire model, not just weights\n", + " mode='min', # Minimizing the validation loss\n", + " verbose=0 # set to 1 if want to print info when a new checkpoint is saved\n", + " )\n", + "\n", + " # Train the model with checkpointing\n", + " print(\"Retraining the model with checkpoints...\")\n", + " history = model.fit(\n", + " X, \n", + " y, \n", + " batch_size=10, \n", + " epochs=int(epochs), \n", + " validation_split=0.2, \n", + " callbacks=[checkpoint_callback] # Add the callback here\n", + " )\n", + " \n", + " yhat = model.predict(X, verbose = 0)\n", + " xx = y\n", + " yy = yhat\n", + " \n", + " retrained_model_path = \"./retrain\"\n", + " if not os.path.exists(retrained_model_path):\n", + " os.makedirs(retrained_model_path)\n", + "\n", + " # Save the retrained model\n", + " model.save(retrained_model_path)\n", + " print(f\"Retrained model saved at {retrained_model_path}\")\n", + "\n", + " import json\n", + " data = {}\n", + " data['metrics'] = []\n", + " data['metrics'].append({'Accuracy': str(np.mean(np.absolute(np.asarray(xx)-np.asarray(yy))<5))})\n", + " \n", + " mm_sdk.upload_metrics(data, trainingjobName, version)\n", + " mm_sdk.upload_model(\"./retrain\", trainingjobName, version)\n" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [], + "source": [ + "@dsl.pipeline(\n", + " name=\"qoe Pipeline retrain\",\n", + " description=\"qoe retrain\",\n", + ")\n", + "def super_model_pipeline( \n", + " trainingjob_name: str, epochs: str, version: str):\n", + " \n", + " trainop=train_export_model(trainingjobName=trainingjob_name, epochs=epochs, version=version)\n", + " trainop.set_caching_options(False)\n", + " kubernetes.set_image_pull_policy(trainop, \"IfNotPresent\")" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [], + "source": [ + "pipeline_func = super_model_pipeline\n", + "file_name = \"qoe_model_pipeline_retrain_2\"\n", + "\n", + "kfp.compiler.Compiler().compile(pipeline_func, \n", + " '{}.yaml'.format(file_name))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import requests\n", + "pipeline_name=\"qoe_Pipeline_retrain_2\"\n", + "pipeline_file = file_name+'.yaml'\n", + "requests.post(\"http://tm.traininghost:32002/pipelines/{}/upload\".format(pipeline_name), files={'file':open(pipeline_file,'rb')})" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "Experiment details." + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "data": { + "text/html": [ + "Run details." + ], + "text/plain": [ + "" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "from kfp.client import Client\n", + "client = Client(host='http://ml-pipeline-ui.kubeflow:80')\n", + "# client.upload_pipeline_version(pipeline_package_path='pipeline.yaml',pipeline_version_name='v2', pipeline_name='sample-pipeline')\n", + "result = client.create_run_from_pipeline_package('qoe_model_pipeline_retrain_2.yaml', arguments={'trainingjob_name':'testing_influxdb_301', 'epochs':'5', 'version':'1'})" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.6" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +}