--- /dev/null
+{
+ "cells": [
+ {
+ "cell_type": "code",
+ "execution_count": 9,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "import kfp\n",
+ "import kfp.dsl as dsl\n",
+ "from kfp.dsl import InputPath, OutputPath\n",
+ "from kfp.dsl import component as component\n",
+ "from kfp import kubernetes"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 10,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "BASE_IMAGE = \"traininghost/pipelineimage:latest\""
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 11,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "@component(base_image=BASE_IMAGE,packages_to_install=['requests'])\n",
+ "def train_export_model(trainingjobName: str, epochs: str, version: str):\n",
+ " \n",
+ " import tensorflow as tf\n",
+ " from numpy import array\n",
+ " from tensorflow.keras.models import Sequential\n",
+ " from tensorflow.keras.layers import Dense\n",
+ " from tensorflow.keras.layers import Flatten, Dropout, Activation\n",
+ " from tensorflow.keras.layers import LSTM\n",
+ " import numpy as np\n",
+ " import requests\n",
+ " print(\"numpy version\")\n",
+ " print(np.__version__)\n",
+ " import pandas as pd\n",
+ " import os\n",
+ " import zipfile\n",
+ " from featurestoresdk.feature_store_sdk import FeatureStoreSdk\n",
+ " from modelmetricsdk.model_metrics_sdk import ModelMetricsSdk\n",
+ " \n",
+ " fs_sdk = FeatureStoreSdk()\n",
+ " mm_sdk = ModelMetricsSdk()\n",
+ " print(\"job name is: \", trainingjobName)\n",
+ " features = fs_sdk.get_features(trainingjobName, ['pdcpBytesDl','pdcpBytesUl'])\n",
+ " print(\"Dataframe:\")\n",
+ " print(features)\n",
+ "\n",
+ " features_cellc2b2 = features\n",
+ " print(features_cellc2b2)\n",
+ " print('Previous Data Types are --> ', features_cellc2b2.dtypes)\n",
+ " features_cellc2b2[\"pdcpBytesDl\"] = pd.to_numeric(features_cellc2b2[\"pdcpBytesDl\"], downcast=\"float\")\n",
+ " features_cellc2b2[\"pdcpBytesUl\"] = pd.to_numeric(features_cellc2b2[\"pdcpBytesUl\"], downcast=\"float\")\n",
+ " print('New Data Types are --> ', features_cellc2b2.dtypes)\n",
+ " \n",
+ " features_cellc2b2 = features_cellc2b2[['pdcpBytesDl', 'pdcpBytesUl']]\n",
+ " \n",
+ " def split_series(series, n_past, n_future):\n",
+ " X, y = list(), list()\n",
+ " for window_start in range(len(series)):\n",
+ " past_end = window_start + n_past\n",
+ " future_end = past_end + n_future\n",
+ " if future_end > len(series):\n",
+ " break\n",
+ " # slicing the past and future parts of the window\n",
+ " past, future = series[window_start:past_end, :], series[past_end:future_end, :]\n",
+ " X.append(past)\n",
+ " y.append(future)\n",
+ " return np.array(X), np.array(y)\n",
+ " X, y = split_series(features_cellc2b2.values,10, 1)\n",
+ " X = X.reshape((X.shape[0], X.shape[1],X.shape[2]))\n",
+ " y = y.reshape((y.shape[0], y.shape[2]))\n",
+ " print(X.shape)\n",
+ " print(y.shape)\n",
+ " \n",
+ " print(\"Loading the saved model\")\n",
+ " print(os.listdir(os.getcwd()))\n",
+ "\n",
+ " # Download the model zip file\n",
+ " model_url= f\"http://tm.traininghost:32002/model/{trainingjobName}/{version}/Model.zip\"\n",
+ " print(f\"Downloading model from :{model_url}\")\n",
+ " response = requests.get(model_url)\n",
+ "\n",
+ " print(\"Response generated: \" + str(response))\n",
+ "\n",
+ " # Check if the request was successful\n",
+ " if response.status_code == 200:\n",
+ " local_file_path = 'Model.zip'\n",
+ " with open(local_file_path, 'wb') as file:\n",
+ " file.write(response.content)\n",
+ " print(f'Downloaded file saved to {local_file_path}')\n",
+ " else:\n",
+ " print('Failed to download the file')\n",
+ "\n",
+ " print(os.listdir(os.getcwd()))\n",
+ "\n",
+ " # Extract the zip file\n",
+ " zip_file_path = \"./Model.zip\"\n",
+ " extract_to_dir = \"./Model\"\n",
+ "\n",
+ " if not os.path.exists(extract_to_dir):\n",
+ " os.makedirs(extract_to_dir)\n",
+ "\n",
+ " with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:\n",
+ " zip_ref.extractall(extract_to_dir)\n",
+ "\n",
+ " # Delete the zip file after extraction\n",
+ " if os.path.exists(zip_file_path):\n",
+ " os.remove(zip_file_path)\n",
+ " print(f'Deleted zip file: {zip_file_path}')\n",
+ " else:\n",
+ " print(f'Zip file not found: {zip_file_path}')\n",
+ "\n",
+ " # Load the model in SavedModel format\n",
+ " model_path = \"./Model/1\" # Path to the directory containing the saved model\n",
+ " model = tf.keras.models.load_model(model_path)\n",
+ " model.compile(loss='mse', optimizer='adam', metrics=['mse'])\n",
+ " model.summary()\n",
+ "\n",
+ " # Define a directory to save checkpoints\n",
+ " checkpoint_dir = \"./checkpoints\"\n",
+ " if not os.path.exists(checkpoint_dir):\n",
+ " os.makedirs(checkpoint_dir)\n",
+ "\n",
+ " # Define a ModelCheckpoint callback\n",
+ " checkpoint_path = os.path.join(checkpoint_dir, \"model_epoch_{epoch:02d}_val_loss_{val_loss:.2f}.h5\")\n",
+ " checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(\n",
+ " filepath=checkpoint_path, # Save checkpoint file path, this file is not saved finaly\n",
+ " monitor='val_loss', # Monitor validation loss, can be train loss also \n",
+ " save_best_only=True, # Save only the best model based on validation loss\n",
+ " save_weights_only=False, # Save the entire model, not just weights\n",
+ " mode='min', # Minimizing the validation loss\n",
+ " verbose=0 # set to 1 if want to print info when a new checkpoint is saved\n",
+ " )\n",
+ "\n",
+ " # Train the model with checkpointing\n",
+ " print(\"Retraining the model with checkpoints...\")\n",
+ " history = model.fit(\n",
+ " X, \n",
+ " y, \n",
+ " batch_size=10, \n",
+ " epochs=int(epochs), \n",
+ " validation_split=0.2, \n",
+ " callbacks=[checkpoint_callback] # Add the callback here\n",
+ " )\n",
+ " \n",
+ " yhat = model.predict(X, verbose = 0)\n",
+ " xx = y\n",
+ " yy = yhat\n",
+ " \n",
+ " retrained_model_path = \"./retrain\"\n",
+ " if not os.path.exists(retrained_model_path):\n",
+ " os.makedirs(retrained_model_path)\n",
+ "\n",
+ " # Save the retrained model\n",
+ " model.save(retrained_model_path)\n",
+ " print(f\"Retrained model saved at {retrained_model_path}\")\n",
+ "\n",
+ " import json\n",
+ " data = {}\n",
+ " data['metrics'] = []\n",
+ " data['metrics'].append({'Accuracy': str(np.mean(np.absolute(np.asarray(xx)-np.asarray(yy))<5))})\n",
+ " \n",
+ " mm_sdk.upload_metrics(data, trainingjobName, version)\n",
+ " mm_sdk.upload_model(\"./retrain\", trainingjobName, version)\n"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 12,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "@dsl.pipeline(\n",
+ " name=\"qoe Pipeline retrain\",\n",
+ " description=\"qoe retrain\",\n",
+ ")\n",
+ "def super_model_pipeline( \n",
+ " trainingjob_name: str, epochs: str, version: str):\n",
+ " \n",
+ " trainop=train_export_model(trainingjobName=trainingjob_name, epochs=epochs, version=version)\n",
+ " trainop.set_caching_options(False)\n",
+ " kubernetes.set_image_pull_policy(trainop, \"IfNotPresent\")"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 13,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "pipeline_func = super_model_pipeline\n",
+ "file_name = \"qoe_model_pipeline_retrain_2\"\n",
+ "\n",
+ "kfp.compiler.Compiler().compile(pipeline_func, \n",
+ " '{}.yaml'.format(file_name))"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "import requests\n",
+ "pipeline_name=\"qoe_Pipeline_retrain_2\"\n",
+ "pipeline_file = file_name+'.yaml'\n",
+ "requests.post(\"http://tm.traininghost:32002/pipelines/{}/upload\".format(pipeline_name), files={'file':open(pipeline_file,'rb')})"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 14,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "<a href=\"http://ml-pipeline-ui.kubeflow:80/#/experiments/details/c7714054-ee78-484d-ae94-086901f98693\" target=\"_blank\" >Experiment details</a>."
+ ],
+ "text/plain": [
+ "<IPython.core.display.HTML object>"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ },
+ {
+ "data": {
+ "text/html": [
+ "<a href=\"http://ml-pipeline-ui.kubeflow:80/#/runs/details/86ef1c74-2978-4a1d-9743-831b9b9c9dfe\" target=\"_blank\" >Run details</a>."
+ ],
+ "text/plain": [
+ "<IPython.core.display.HTML object>"
+ ]
+ },
+ "metadata": {},
+ "output_type": "display_data"
+ }
+ ],
+ "source": [
+ "from kfp.client import Client\n",
+ "client = Client(host='http://ml-pipeline-ui.kubeflow:80')\n",
+ "# client.upload_pipeline_version(pipeline_package_path='pipeline.yaml',pipeline_version_name='v2', pipeline_name='sample-pipeline')\n",
+ "result = client.create_run_from_pipeline_package('qoe_model_pipeline_retrain_2.yaml', arguments={'trainingjob_name':'testing_influxdb_301', 'epochs':'5', 'version':'1'})"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": []
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "display_name": "Python 3 (ipykernel)",
+ "language": "python",
+ "name": "python3"
+ },
+ "language_info": {
+ "codemirror_mode": {
+ "name": "ipython",
+ "version": 3
+ },
+ "file_extension": ".py",
+ "mimetype": "text/x-python",
+ "name": "python",
+ "nbconvert_exporter": "python",
+ "pygments_lexer": "ipython3",
+ "version": "3.9.6"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 4
+}