From fdfa71d2b5ed379e4815c94158d4378606219116 Mon Sep 17 00:00:00 2001 From: Swaraj Kumar Date: Fri, 19 Sep 2025 18:47:18 +0530 Subject: [PATCH] Updating the sample pipelines to address TF 2.20.0 sample piplines qoe_pipeline and qoe_pipeline_retrain pipelines for l release. Same pipelines as https://gerrit.o-ran-sc.org/r/c/portal/aiml-dashboard/+/14932 Issue-ID: AIMLFW-246 Change-Id: I51ef7ee94d0696e52b0d74171ec468360f3345e5 Signed-off-by: Swaraj Kumar --- samples/qoe/qoe_pipeline.py | 32 +++-- samples/qoe/qoe_pipeline_retrain.py | 227 ++++++++++++++++++++++++++++++++++++ 2 files changed, 249 insertions(+), 10 deletions(-) create mode 100644 samples/qoe/qoe_pipeline_retrain.py diff --git a/samples/qoe/qoe_pipeline.py b/samples/qoe/qoe_pipeline.py index ed1875e..4ac22d4 100644 --- a/samples/qoe/qoe_pipeline.py +++ b/samples/qoe/qoe_pipeline.py @@ -16,7 +16,6 @@ # # ================================================================================== - import kfp import kfp.dsl as dsl from kfp.dsl import InputPath, OutputPath @@ -95,15 +94,24 @@ def train_export_model(featurepath: str, epochs: str, modelname: str, modelversi xx = y yy = yhat - model_save_filepath = "./" - model.export(model_save_filepath) + + print("Saving models ...") + save_directory = './keras_model' + if not os.path.exists(save_directory): + os.makedirs(save_directory, exist_ok=True) + print(f"Created directory: {save_directory}") + else: + print(f"Directory already exists: {save_directory}") + + model.save('./keras_model/model.keras') + model.export('./saved_model') import json data = {} data['metrics'] = [] data['metrics'].append({'Accuracy': str(np.mean(np.absolute(np.asarray(xx)-np.asarray(yy))<5))}) - #as new artifact after training will always be 1.0.0 +# as new artifact after training will always be 1.0.0 artifactversion="1.0.0" url = f"http://modelmgmtservice.traininghost:8082/ai-ml-model-registration/v1/model-registrations/updateArtifact/{modelname}/{modelversion}/{artifactversion}" updated_model_info= requests.post(url).json() @@ -113,14 +121,17 @@ def train_export_model(featurepath: str, epochs: str, modelname: str, modelversi trainingjob_id = featurepath.split('_')[-1] mm_sdk.upload_metrics(data, trainingjob_id) print("Model-metric : ", mm_sdk.get_metrics(trainingjob_id)) - mm_sdk.upload_model(model_save_filepath, modelname, modelversion, artifactversion) - - + print("uploading keras model to MME") + mm_sdk.upload_model("./keras_model", modelname + "_keras", modelversion, artifactversion) + print("Saved keras format") + mm_sdk.upload_model("./saved_model", modelname, modelversion, artifactversion) + print("Saved savedmodel format") @dsl.pipeline( name="qoe Pipeline", description="qoe", ) + def super_model_pipeline( featurepath: str, epochs: str, modelname: str, modelversion:str): @@ -128,15 +139,16 @@ def super_model_pipeline( trainop.set_caching_options(False) kubernetes.set_image_pull_policy(trainop, "IfNotPresent") - pipeline_func = super_model_pipeline file_name = "qoe_model_pipeline" kfp.compiler.Compiler().compile(pipeline_func, '{}.yaml'.format(file_name)) - import requests pipeline_name="qoe_Pipeline" pipeline_file = file_name+'.yaml' -requests.post("http://tm.traininghost:32002/pipelines/{}/upload".format(pipeline_name), files={'file':open(pipeline_file,'rb')}) \ No newline at end of file +requests.post("http://tm.traininghost:32002/pipelines/{}/upload".format(pipeline_name), files={'file':open(pipeline_file,'rb')}) + + + diff --git a/samples/qoe/qoe_pipeline_retrain.py b/samples/qoe/qoe_pipeline_retrain.py new file mode 100644 index 0000000..1cd2450 --- /dev/null +++ b/samples/qoe/qoe_pipeline_retrain.py @@ -0,0 +1,227 @@ +# ================================================================================== +# +# Copyright (c) 2025 Samsung Electronics Co., Ltd. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# ================================================================================== + +import kfp +import kfp.dsl as dsl +from kfp.dsl import InputPath, OutputPath +from kfp.dsl import component as component +from kfp import kubernetes + +BASE_IMAGE = "traininghost/pipelineimage:latest" + +@component(base_image=BASE_IMAGE,packages_to_install=['requests']) +def train_export_model(featurepath: str, epochs: str, modelname: str, modelversion:str): + + import re + import tensorflow as tf + from numpy import array + from tensorflow.keras.models import Sequential + from tensorflow.keras.layers import Dense + from tensorflow.keras.layers import Flatten, Dropout, Activation + from tensorflow.keras.layers import LSTM + import numpy as np + import requests + import zipfile + print("numpy version") + print(np.__version__) + import pandas as pd + import os + from featurestoresdk.feature_store_sdk import FeatureStoreSdk + from modelmetricsdk.model_metrics_sdk import ModelMetricsSdk + + fs_sdk = FeatureStoreSdk() + mm_sdk = ModelMetricsSdk() + print("featurepath is: ", featurepath) + features = fs_sdk.get_features(featurepath, ['pdcpBytesDl','pdcpBytesUl']) + print("Dataframe:") + print(features) + + features_cellc2b2 = features + print(features_cellc2b2) + print('Previous Data Types are --> ', features_cellc2b2.dtypes) + features_cellc2b2["pdcpBytesDl"] = pd.to_numeric(features_cellc2b2["pdcpBytesDl"], downcast="float") + features_cellc2b2["pdcpBytesUl"] = pd.to_numeric(features_cellc2b2["pdcpBytesUl"], downcast="float") + print('New Data Types are --> ', features_cellc2b2.dtypes) + + features_cellc2b2 = features_cellc2b2[['pdcpBytesDl', 'pdcpBytesUl']] + + def split_series(series, n_past, n_future): + X, y = list(), list() + for window_start in range(len(series)): + past_end = window_start + n_past + future_end = past_end + n_future + if future_end > len(series): + break + # slicing the past and future parts of the window + past, future = series[window_start:past_end, :], series[past_end:future_end, :] + X.append(past) + y.append(future) + return np.array(X), np.array(y) + X, y = split_series(features_cellc2b2.values,10, 1) + X = X.reshape((X.shape[0], X.shape[1],X.shape[2])) + y = y.reshape((y.shape[0], y.shape[2])) + print(X.shape) + print(y.shape) + + print("Loading the saved model") + print(os.listdir(os.getcwd())) + + + url = f"http://modelmgmtservice.traininghost:8082/ai-ml-model-discovery/v1/models/?model-name={modelname}&model-version={modelversion}" + modelinfo = requests.get(url).json()[0] + artifactversion = modelinfo["modelId"]["artifactVersion"] + model_url = "" + if modelinfo["modelLocation"] != "": + model_url= modelinfo["modelLocation"] + else : + keras_model= modelname + "_keras" + model_url = f"http://tm.traininghost:32002/model/{keras_model}/{modelversion}/{artifactversion}/Model.zip" + # Download the model zip file + + print(f"Downloading model from :{model_url}") + response = requests.get(model_url) + + print("Response generated: " + str(response)) + + # Check if the request was successful + if response.status_code == 200: + local_file_path = 'Model.zip' + with open(local_file_path, 'wb') as file: + file.write(response.content) + print(f'Downloaded file saved to {local_file_path}') + else: + print('Failed to download the file') + + print(os.listdir(os.getcwd())) + + # Extract the zip file + zip_file_path = "./Model.zip" + extract_to_dir = "./Model" + + if not os.path.exists(extract_to_dir): + os.makedirs(extract_to_dir) + + with zipfile.ZipFile(zip_file_path, 'r') as zip_ref: + zip_ref.extractall(extract_to_dir) + + # Delete the zip file after extraction + if os.path.exists(zip_file_path): + os.remove(zip_file_path) + print(f'Deleted zip file: {zip_file_path}') + else: + print(f'Zip file not found: {zip_file_path}') + + # Path to the directory containing the saved model + model_path = f"./Model/{modelversion}/model.keras" + + # Load the model in SavedModel format + model = tf.keras.models.load_model(model_path) + + model.compile(loss='mse', optimizer='adam', metrics=['mse']) + model.summary() + + # Define a directory to save checkpoints + checkpoint_dir = "./checkpoints" + if not os.path.exists(checkpoint_dir): + os.makedirs(checkpoint_dir) + + # Define a ModelCheckpoint callback + checkpoint_path = os.path.join(checkpoint_dir, "model_epoch_{epoch:02d}_val_loss_{val_loss:.2f}.h5") + checkpoint_callback = tf.keras.callbacks.ModelCheckpoint( + filepath=checkpoint_path, # Save checkpoint file path, this file is not saved finaly + monitor='val_loss', # Monitor validation loss, can be train loss also + save_best_only=True, # Save only the best model based on validation loss + save_weights_only=False, # Save the entire model, not just weights + mode='min', # Minimizing the validation loss + verbose=0 # set to 1 if want to print info when a new checkpoint is saved + ) + + # Train the model with checkpointing + print("Retraining the model with checkpoints...") + history = model.fit( + X, + y, + batch_size=10, + epochs=int(epochs), + validation_split=0.2, + callbacks=[checkpoint_callback] # Add the callback here + ) + + yhat = model.predict(X, verbose = 0) + xx = y + yy = yhat + + print("Saving models ...") + save_directory = './retrain/keras_model' + if not os.path.exists(save_directory): + os.makedirs(save_directory, exist_ok=True) + print(f"Created directory: {save_directory}") + else: + print(f"Directory already exists: {save_directory}") + + model.save('./retrain/keras_model/model.keras') + model.export('./retrain/saved_model') + + import json + data = {} + data['metrics'] = [] + data['metrics'].append({'Accuracy': str(np.mean(np.absolute(np.asarray(xx)-np.asarray(yy))<5))}) + +# update artifact version + new_artifactversion ="" + if modelinfo["modelLocation"] != "": + new_artifactversion = "1.1.0" + else: + major, minor , patch= map(int, artifactversion.split('.')) + minor+=1 + new_artifactversion = f"{major}.{minor}.{patch}" + + # update the new artifact version in mme + url = f"http://modelmgmtservice.traininghost:8082/ai-ml-model-registration/v1/model-registrations/updateArtifact/{modelname}/{modelversion}/{new_artifactversion}" + updated_model_info= requests.post(url).json() + print(updated_model_info) + + print("uploading keras model to MME") + mm_sdk.upload_model("./retrain/keras_model", modelname + "_keras", modelversion, new_artifactversion) + print("Saved keras format") + mm_sdk.upload_model("./retrain/saved_model", modelname, modelversion, new_artifactversion) + print("Saved savedmodel format") + +@dsl.pipeline( + name="qoe Pipeline", + description="qoe", +) +def super_model_pipeline( + featurepath: str, epochs: str, modelname: str, modelversion:str): + + trainop=train_export_model(featurepath=featurepath, epochs=epochs, modelname=modelname, modelversion=modelversion) + trainop.set_caching_options(False) + kubernetes.set_image_pull_policy(trainop, "IfNotPresent") + +pipeline_func = super_model_pipeline +file_name = "qoe_model_pipeline_retrain" + +kfp.compiler.Compiler().compile(pipeline_func, + '{}.yaml'.format(file_name)) + +import requests +pipeline_name="qoe_Pipeline_retrain" +pipeline_file = file_name+'.yaml' +requests.post("http://tm.traininghost:32002/pipelines/{}/upload".format(pipeline_name), files={'file':open(pipeline_file,'rb')}) + + -- 2.16.6