--- /dev/null
+{
+ "cells": [
+ {
+ "cell_type": "code",
+ "execution_count": 1,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "import kfp\n",
+ "import kfp.components as components\n",
+ "import kfp.dsl as dsl\n",
+ "from kfp.components import InputPath, OutputPath"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 2,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "def train_export_model(trainingjobName: str, epochs: str, version: str):\n",
+ " \n",
+ " import tensorflow as tf\n",
+ " from numpy import array\n",
+ " from tensorflow.keras.models import Sequential\n",
+ " from tensorflow.keras.layers import Dense\n",
+ " from tensorflow.keras.layers import Flatten, Dropout, Activation\n",
+ " from tensorflow.keras.layers import LSTM\n",
+ " import numpy as np\n",
+ " print(\"numpy version\")\n",
+ " print(np.__version__)\n",
+ " import pandas as pd\n",
+ " import os\n",
+ " from featurestoresdk.feature_store_sdk import FeatureStoreSdk\n",
+ " from modelmetricsdk.model_metrics_sdk import ModelMetricsSdk\n",
+ " \n",
+ " fs_sdk = FeatureStoreSdk()\n",
+ " mm_sdk = ModelMetricsSdk()\n",
+ " \n",
+ " features = fs_sdk.get_features(trainingjobName, ['measTimeStampRf', 'nrCellIdentity', 'pdcpBytesDl','pdcpBytesUl'])\n",
+ " print(\"Dataframe:\")\n",
+ " print(features)\n",
+ "\n",
+ " features_cellc2b2 = features[features['nrCellIdentity'] == \"c2/B2\"]\n",
+ " print(\"Dataframe for cell : c2/B2\")\n",
+ " print(features_cellc2b2)\n",
+ " print('Previous Data Types are --> ', features_cellc2b2.dtypes)\n",
+ " features_cellc2b2[\"pdcpBytesDl\"] = pd.to_numeric(features_cellc2b2[\"pdcpBytesDl\"], downcast=\"float\")\n",
+ " features_cellc2b2[\"pdcpBytesUl\"] = pd.to_numeric(features_cellc2b2[\"pdcpBytesUl\"], downcast=\"float\")\n",
+ " print('New Data Types are --> ', features_cellc2b2.dtypes)\n",
+ " \n",
+ " features_cellc2b2 = features_cellc2b2[['pdcpBytesDl', 'pdcpBytesUl']]\n",
+ " \n",
+ " def split_series(series, n_past, n_future):\n",
+ " X, y = list(), list()\n",
+ " for window_start in range(len(series)):\n",
+ " past_end = window_start + n_past\n",
+ " future_end = past_end + n_future\n",
+ " if future_end > len(series):\n",
+ " break\n",
+ " # slicing the past and future parts of the window\n",
+ " past, future = series[window_start:past_end, :], series[past_end:future_end, :]\n",
+ " X.append(past)\n",
+ " y.append(future)\n",
+ " return np.array(X), np.array(y)\n",
+ " X, y = split_series(features_cellc2b2.values,10, 1)\n",
+ " X = X.reshape((X.shape[0], X.shape[1],X.shape[2]))\n",
+ " y = y.reshape((y.shape[0], y.shape[2]))\n",
+ " print(X.shape)\n",
+ " print(y.shape)\n",
+ " \n",
+ " model = Sequential()\n",
+ " model.add(LSTM(units = 150, activation=\"tanh\" ,return_sequences = True, input_shape = (X.shape[1], X.shape[2])))\n",
+ "\n",
+ " model.add(LSTM(units = 150, return_sequences = True,activation=\"tanh\"))\n",
+ "\n",
+ " model.add(LSTM(units = 150,return_sequences = False,activation=\"tanh\" ))\n",
+ "\n",
+ " model.add((Dense(units = X.shape[2])))\n",
+ " \n",
+ " model.compile(loss='mse', optimizer='adam',metrics=['mse'])\n",
+ " model.summary()\n",
+ " \n",
+ " model.fit(X, y, batch_size=10,epochs=int(epochs), validation_split=0.2)\n",
+ " yhat = model.predict(X, verbose = 0)\n",
+ "\n",
+ " \n",
+ " xx = y\n",
+ " yy = yhat\n",
+ " model.save(\"./\")\n",
+ " import json\n",
+ " data = {}\n",
+ " data['metrics'] = []\n",
+ " data['metrics'].append({'Accuracy': str(np.mean(np.absolute(np.asarray(xx)-np.asarray(yy))<5))})\n",
+ " \n",
+ " mm_sdk.upload_metrics(data, trainingjobName, version)\n",
+ " mm_sdk.upload_model(\"./\", trainingjobName, version)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 3,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "BASE_IMAGE = \"traininghost/pipelineimage:latest\""
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 4,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "def train_and_export(trainingjobName: str, epochs: str, version: str):\n",
+ " trainOp = components.func_to_container_op(train_export_model, base_image=BASE_IMAGE)(trainingjobName, epochs,version)\n",
+ " # Below line to disable caching of pipeline step\n",
+ " trainOp.execution_options.caching_strategy.max_cache_staleness = \"P0D\"\n",
+ " trainOp.container.set_image_pull_policy(\"IfNotPresent\")"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 5,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "@dsl.pipeline(\n",
+ " name=\"qoe Pipeline\",\n",
+ " description=\"qoe\",\n",
+ ")\n",
+ "def super_model_pipeline( \n",
+ " trainingjob_name: str, epochs: str, version: str):\n",
+ " \n",
+ " train_and_export(trainingjob_name, epochs, version)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 6,
+ "metadata": {},
+ "outputs": [],
+ "source": [
+ "pipeline_func = super_model_pipeline\n",
+ "file_name = \"qoe_model_pipeline\"\n",
+ "\n",
+ "kfp.compiler.Compiler().compile(pipeline_func, \n",
+ " '{}.zip'.format(file_name))"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 7,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/plain": [
+ "<Response [200]>"
+ ]
+ },
+ "execution_count": 7,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "import requests\n",
+ "pipeline_name=\"qoe Pipeline\"\n",
+ "pipeline_file = file_name+'.zip'\n",
+ "requests.post(\"http://tm.traininghost:32002/pipelines/{}/upload\".format(pipeline_name), files={'file':open(pipeline_file,'rb')})"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": []
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": []
+ }
+ ],
+ "metadata": {
+ "kernelspec": {
+ "display_name": "Python 3 (ipykernel)",
+ "language": "python",
+ "name": "python3"
+ },
+ "language_info": {
+ "codemirror_mode": {
+ "name": "ipython",
+ "version": 3
+ },
+ "file_extension": ".py",
+ "mimetype": "text/x-python",
+ "name": "python",
+ "nbconvert_exporter": "python",
+ "pygments_lexer": "ipython3",
+ "version": "3.9.6"
+ }
+ },
+ "nbformat": 4,
+ "nbformat_minor": 4
+}