Updating the sample pipelines to address TF 2.20.0 55/14955/6
authorSwaraj Kumar <swaraj.kumar@samsung.com>
Fri, 19 Sep 2025 13:17:18 +0000 (18:47 +0530)
committerSwaraj Kumar <swaraj.kumar@samsung.com>
Mon, 22 Sep 2025 06:21:15 +0000 (11:51 +0530)
sample piplines qoe_pipeline and qoe_pipeline_retrain pipelines for l release.
Same pipelines as https://gerrit.o-ran-sc.org/r/c/portal/aiml-dashboard/+/14932

Issue-ID: AIMLFW-246

Change-Id: I51ef7ee94d0696e52b0d74171ec468360f3345e5
Signed-off-by: Swaraj Kumar <swaraj.kumar@samsung.com>
samples/qoe/qoe_pipeline.py
samples/qoe/qoe_pipeline_retrain.py [new file with mode: 0644]

index ed1875e..4ac22d4 100644 (file)
@@ -16,7 +16,6 @@
 #
 # ==================================================================================
 
-
 import kfp
 import kfp.dsl as dsl
 from kfp.dsl import InputPath, OutputPath
@@ -95,15 +94,24 @@ def train_export_model(featurepath: str, epochs: str, modelname: str, modelversi
     
     xx = y
     yy = yhat
-    model_save_filepath = "./"
-    model.export(model_save_filepath)
+    
+    print("Saving models ...")
+    save_directory = './keras_model'
+    if not os.path.exists(save_directory):
+        os.makedirs(save_directory, exist_ok=True)
+        print(f"Created directory: {save_directory}")
+    else:
+        print(f"Directory already exists: {save_directory}")
+        
+    model.save('./keras_model/model.keras')
+    model.export('./saved_model')
     
     import json
     data = {}
     data['metrics'] = []
     data['metrics'].append({'Accuracy': str(np.mean(np.absolute(np.asarray(xx)-np.asarray(yy))<5))})
     
-    #as new artifact after training will always be 1.0.0
+#     as new artifact after training will always be 1.0.0
     artifactversion="1.0.0"
     url = f"http://modelmgmtservice.traininghost:8082/ai-ml-model-registration/v1/model-registrations/updateArtifact/{modelname}/{modelversion}/{artifactversion}"
     updated_model_info= requests.post(url).json()
@@ -113,14 +121,17 @@ def train_export_model(featurepath: str, epochs: str, modelname: str, modelversi
     trainingjob_id = featurepath.split('_')[-1]
     mm_sdk.upload_metrics(data, trainingjob_id)
     print("Model-metric : ", mm_sdk.get_metrics(trainingjob_id))
-    mm_sdk.upload_model(model_save_filepath, modelname, modelversion, artifactversion)
-    
-
+    print("uploading keras model to MME")
+    mm_sdk.upload_model("./keras_model", modelname + "_keras", modelversion, artifactversion)
+    print("Saved keras format")
+    mm_sdk.upload_model("./saved_model", modelname, modelversion, artifactversion)
+    print("Saved savedmodel format")
 
 @dsl.pipeline(
     name="qoe Pipeline",
     description="qoe",
 )
+
 def super_model_pipeline( 
     featurepath: str, epochs: str, modelname: str, modelversion:str):
     
@@ -128,15 +139,16 @@ def super_model_pipeline(
     trainop.set_caching_options(False)
     kubernetes.set_image_pull_policy(trainop, "IfNotPresent")
 
-
 pipeline_func = super_model_pipeline
 file_name = "qoe_model_pipeline"
 
 kfp.compiler.Compiler().compile(pipeline_func,  
   '{}.yaml'.format(file_name))
 
-
 import requests
 pipeline_name="qoe_Pipeline"
 pipeline_file = file_name+'.yaml'
-requests.post("http://tm.traininghost:32002/pipelines/{}/upload".format(pipeline_name), files={'file':open(pipeline_file,'rb')})
\ No newline at end of file
+requests.post("http://tm.traininghost:32002/pipelines/{}/upload".format(pipeline_name), files={'file':open(pipeline_file,'rb')})
+
+
+
diff --git a/samples/qoe/qoe_pipeline_retrain.py b/samples/qoe/qoe_pipeline_retrain.py
new file mode 100644 (file)
index 0000000..1cd2450
--- /dev/null
@@ -0,0 +1,227 @@
+# ==================================================================================
+#
+#       Copyright (c) 2025 Samsung Electronics Co., Ltd. All Rights Reserved.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+# ==================================================================================
+
+import kfp
+import kfp.dsl as dsl
+from kfp.dsl import InputPath, OutputPath
+from kfp.dsl import component as component
+from kfp import kubernetes
+
+BASE_IMAGE = "traininghost/pipelineimage:latest"
+
+@component(base_image=BASE_IMAGE,packages_to_install=['requests'])
+def train_export_model(featurepath: str, epochs: str, modelname: str, modelversion:str):
+    
+    import re
+    import tensorflow as tf
+    from numpy import array
+    from tensorflow.keras.models import Sequential
+    from tensorflow.keras.layers import Dense
+    from tensorflow.keras.layers import Flatten, Dropout, Activation
+    from tensorflow.keras.layers import LSTM
+    import numpy as np
+    import requests
+    import zipfile
+    print("numpy version")
+    print(np.__version__)
+    import pandas as pd
+    import os
+    from featurestoresdk.feature_store_sdk import FeatureStoreSdk
+    from modelmetricsdk.model_metrics_sdk import ModelMetricsSdk
+    
+    fs_sdk = FeatureStoreSdk()
+    mm_sdk = ModelMetricsSdk()
+    print("featurepath is: ", featurepath)
+    features = fs_sdk.get_features(featurepath, ['pdcpBytesDl','pdcpBytesUl'])
+    print("Dataframe:")
+    print(features)
+
+    features_cellc2b2 = features
+    print(features_cellc2b2)
+    print('Previous Data Types are --> ', features_cellc2b2.dtypes)
+    features_cellc2b2["pdcpBytesDl"] = pd.to_numeric(features_cellc2b2["pdcpBytesDl"], downcast="float")
+    features_cellc2b2["pdcpBytesUl"] = pd.to_numeric(features_cellc2b2["pdcpBytesUl"], downcast="float")
+    print('New Data Types are --> ', features_cellc2b2.dtypes)
+    
+    features_cellc2b2 = features_cellc2b2[['pdcpBytesDl', 'pdcpBytesUl']]
+    
+    def split_series(series, n_past, n_future):
+        X, y = list(), list()
+        for window_start in range(len(series)):
+            past_end = window_start + n_past
+            future_end = past_end + n_future
+            if future_end > len(series):
+                break
+            # slicing the past and future parts of the window
+            past, future = series[window_start:past_end, :], series[past_end:future_end, :]
+            X.append(past)
+            y.append(future)
+        return np.array(X), np.array(y)
+    X, y = split_series(features_cellc2b2.values,10, 1)
+    X = X.reshape((X.shape[0], X.shape[1],X.shape[2]))
+    y = y.reshape((y.shape[0], y.shape[2]))
+    print(X.shape)
+    print(y.shape)
+    print("Loading the saved model")
+    print(os.listdir(os.getcwd()))
+    
+
+    url = f"http://modelmgmtservice.traininghost:8082/ai-ml-model-discovery/v1/models/?model-name={modelname}&model-version={modelversion}"
+    modelinfo =  requests.get(url).json()[0]
+    artifactversion = modelinfo["modelId"]["artifactVersion"]
+    model_url = ""
+    if modelinfo["modelLocation"] != "":
+        model_url= modelinfo["modelLocation"]
+    else :
+        keras_model= modelname + "_keras"
+        model_url = f"http://tm.traininghost:32002/model/{keras_model}/{modelversion}/{artifactversion}/Model.zip"
+    # Download the model zip file
+
+    print(f"Downloading model from :{model_url}")
+    response = requests.get(model_url)
+
+    print("Response generated: " + str(response))
+
+    # Check if the request was successful
+    if response.status_code == 200:
+        local_file_path = 'Model.zip'
+        with open(local_file_path, 'wb') as file:
+            file.write(response.content)
+        print(f'Downloaded file saved to {local_file_path}')
+    else:
+        print('Failed to download the file')
+
+    print(os.listdir(os.getcwd()))
+
+    # Extract the zip file
+    zip_file_path = "./Model.zip"
+    extract_to_dir = "./Model"
+
+    if not os.path.exists(extract_to_dir):
+        os.makedirs(extract_to_dir)
+
+    with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
+        zip_ref.extractall(extract_to_dir)
+
+    # Delete the zip file after extraction
+    if os.path.exists(zip_file_path):
+        os.remove(zip_file_path)
+        print(f'Deleted zip file: {zip_file_path}')
+    else:
+        print(f'Zip file not found: {zip_file_path}')
+
+    # Path to the directory containing the saved model
+    model_path = f"./Model/{modelversion}/model.keras"
+
+    # Load the model in SavedModel format     
+    model = tf.keras.models.load_model(model_path)
+    
+    model.compile(loss='mse', optimizer='adam', metrics=['mse'])
+    model.summary()
+
+    # Define a directory to save checkpoints
+    checkpoint_dir = "./checkpoints"
+    if not os.path.exists(checkpoint_dir):
+        os.makedirs(checkpoint_dir)
+
+    # Define a ModelCheckpoint callback
+    checkpoint_path = os.path.join(checkpoint_dir, "model_epoch_{epoch:02d}_val_loss_{val_loss:.2f}.h5")
+    checkpoint_callback = tf.keras.callbacks.ModelCheckpoint(
+        filepath=checkpoint_path,   # Save checkpoint file path, this file is not saved finaly
+        monitor='val_loss',         # Monitor validation loss, can be train loss also 
+        save_best_only=True,        # Save only the best model based on validation loss
+        save_weights_only=False,    # Save the entire model, not just weights
+        mode='min',                 # Minimizing the validation loss
+        verbose=0                   # set to 1 if want to print info when a new checkpoint is saved
+    )
+
+    # Train the model with checkpointing
+    print("Retraining the model with checkpoints...")
+    history = model.fit(
+        X, 
+        y, 
+        batch_size=10, 
+        epochs=int(epochs), 
+        validation_split=0.2, 
+        callbacks=[checkpoint_callback]  # Add the callback here
+    )
+    
+    yhat = model.predict(X, verbose = 0)
+    xx = y
+    yy = yhat
+    
+    print("Saving models ...")
+    save_directory = './retrain/keras_model'
+    if not os.path.exists(save_directory):
+        os.makedirs(save_directory, exist_ok=True)
+        print(f"Created directory: {save_directory}")
+    else:
+        print(f"Directory already exists: {save_directory}")
+        
+    model.save('./retrain/keras_model/model.keras')
+    model.export('./retrain/saved_model')
+
+    import json
+    data = {}
+    data['metrics'] = []
+    data['metrics'].append({'Accuracy': str(np.mean(np.absolute(np.asarray(xx)-np.asarray(yy))<5))})
+
+# update artifact version
+    new_artifactversion =""
+    if modelinfo["modelLocation"] != "":
+        new_artifactversion = "1.1.0"
+    else:
+        major, minor , patch= map(int, artifactversion.split('.'))
+        minor+=1
+        new_artifactversion = f"{major}.{minor}.{patch}"
+    
+    # update the new artifact version in mme
+    url = f"http://modelmgmtservice.traininghost:8082/ai-ml-model-registration/v1/model-registrations/updateArtifact/{modelname}/{modelversion}/{new_artifactversion}"
+    updated_model_info= requests.post(url).json()
+    print(updated_model_info)
+    
+    print("uploading keras model to MME")
+    mm_sdk.upload_model("./retrain/keras_model", modelname + "_keras", modelversion, new_artifactversion)
+    print("Saved keras format")
+    mm_sdk.upload_model("./retrain/saved_model", modelname, modelversion, new_artifactversion)
+    print("Saved savedmodel format")
+
+@dsl.pipeline(
+    name="qoe Pipeline",
+    description="qoe",
+)
+def super_model_pipeline( 
+    featurepath: str, epochs: str, modelname: str, modelversion:str):
+    
+    trainop=train_export_model(featurepath=featurepath, epochs=epochs, modelname=modelname, modelversion=modelversion)
+    trainop.set_caching_options(False)
+    kubernetes.set_image_pull_policy(trainop, "IfNotPresent")
+
+pipeline_func = super_model_pipeline
+file_name = "qoe_model_pipeline_retrain"
+
+kfp.compiler.Compiler().compile(pipeline_func,  
+  '{}.yaml'.format(file_name))
+
+import requests
+pipeline_name="qoe_Pipeline_retrain"
+pipeline_file = file_name+'.yaml'
+requests.post("http://tm.traininghost:32002/pipelines/{}/upload".format(pipeline_name), files={'file':open(pipeline_file,'rb')})
+
+