Updates for version 0.2.5 of es-rapp 49/14749/1
authorsaul.gill <saul.gill@est.tech>
Tue, 29 Jul 2025 13:41:14 +0000 (14:41 +0100)
committersaul.gill <saul.gill@est.tech>
Tue, 29 Jul 2025 13:59:59 +0000 (14:59 +0100)
Makes changes to allow ranpm use case flow
Update version of docker and helm
Alter generated data to match simulator data structure
Use threading to prevent overlapping execution
Expose more config options

Issue-ID: NONRTRIC-1083
Change-Id: Ib05d0d92763d5ec038282823755f5ec38facbd2e
Signed-off-by: saul.gill <saul.gill@est.tech>
.gitignore
sample-rapp-generator/es-demo-rapp/rapp-energy-saving/Artifacts/Deployment/HELM/energy-saving-chart/Chart.yaml
sample-rapp-generator/es-demo-rapp/rapp-energy-saving/Artifacts/Deployment/HELM/energy-saving-chart/resources/config/config.json
sample-rapp-generator/es-demo-rapp/rapp-energy-saving/Artifacts/Deployment/HELM/energy-saving-chart/values.yaml
sample-rapp-generator/es-demo-rapp/rapp-energy-saving/Artifacts/Deployment/HELM/energy-saving-rapp-0.2.5.tgz [new file with mode: 0644]
sample-rapp-generator/es-demo-rapp/rapp-energy-saving/Definitions/asd.yaml
sample-rapp-generator/es-demo-rapp/rapp-energy-saving/Files/Acm/instances/es-instance.json
sample-rapp-generator/es-demo-rapp/src/assist.py
sample-rapp-generator/es-demo-rapp/src/config.json
sample-rapp-generator/es-demo-rapp/src/data.py
sample-rapp-generator/es-demo-rapp/src/main.py

index ac4fcd6..b64e4e5 100755 (executable)
@@ -37,3 +37,5 @@ build/
 
 openapi/sme/
 *.csar
+
+**/__pycache__/
index f7b9238..2fd7fe3 100644 (file)
@@ -32,10 +32,10 @@ type: application
 # This is the chart version. This version number should be incremented each time you make changes
 # to the chart and its templates, including the app version.
 # Versions are expected to follow Semantic Versioning (https://semver.org/)
-version: 0.1.0
+version: 0.2.5
 
 # This is the version number of the application being deployed. This version number should be
 # incremented each time you make changes to the application. Versions are not expected to
 # follow Semantic Versioning. They should reflect the version the application is using.
 # It is recommended to use it with quotes.
-appVersion: "1.16.0"
+appVersion: "0.2.5"
index 2fbd20f..e473291 100644 (file)
@@ -28,7 +28,8 @@
     "password": "{{ .Values.influxdb.password }}",
     "example_data_file": "example_data.json",
     "database": "{{ .Values.influxdb.bucket }}",
-    "measurement": "CellReports",
+    "time_range": "{{ .Values.influxdb.timeRange }}",
+    "measurements": {{ .Values.influxdb.measurements | toJson }},
     "ssl": false,
     "address": "http://localhost:8086"
   }
index 1cc7102..890579a 100644 (file)
@@ -25,7 +25,7 @@ image:
   repository: "nexus3.onap.org:10001/estdemoimages/es-rapp"
   pullPolicy: Always
   # Overrides the image tag whose default is the chart appVersion.
-  tag: "0.1.1"
+  tag: "0.2.5"
 
 imagePullSecrets: []
 nameOverride: "energy-saving-rapp"
@@ -102,13 +102,18 @@ environment:
   smeDiscoveryEndpoint: "http://sme-discovery.default.svc.cluster.local:8080/service-apis/v1/allServiceAPIs"
 
 influxdb:
-  token: "WcrfLfqC63uCxfDa15C1sb6WtG5fhzrS"
+  token: "hIVMgY7vn3M772PSk3yrI2IjeWzybPw0"
   user: "admin"
   password: "mySuP3rS3cr3tT0keN"
-  bucket: "pm-bucket"
+  bucket: "pm-logg-bucket"
   org: "est"
   apiName: "influxdb2-http"
   resourceName: "root"
+  timeRange: "-6h"
+  measurements:
+    - "ManagedElement=o-du-pynts-1122,ManagedElement=o-du-pynts-1122,GNBDUFunction=1,NRCellDU=1"
+    - "ManagedElement=o-du-pynts-1123,ManagedElement=o-du-pynts-1123,GNBDUFunction=1,NRCellDU=1"
+    - "o-ran-pm"
 
 
 ncmp:
@@ -127,5 +132,5 @@ teiv:
 
 appStartup:
   command: ["/bin/sh", "-c"]
-  args: [ "python main.py --generate_db_data=True --use_sme_db=True --random_predictions=False" ]
+  args: [ "python main.py --generate_db_data=False --use_sme_db=True --random_predictions=False" ]
 
diff --git a/sample-rapp-generator/es-demo-rapp/rapp-energy-saving/Artifacts/Deployment/HELM/energy-saving-rapp-0.2.5.tgz b/sample-rapp-generator/es-demo-rapp/rapp-energy-saving/Artifacts/Deployment/HELM/energy-saving-rapp-0.2.5.tgz
new file mode 100644 (file)
index 0000000..c4b3413
Binary files /dev/null and b/sample-rapp-generator/es-demo-rapp/rapp-energy-saving/Artifacts/Deployment/HELM/energy-saving-rapp-0.2.5.tgz differ
index 727004b..31ac972 100755 (executable)
@@ -22,7 +22,7 @@ topology_template:
       artifacts:\r
         energy-saving:\r
           type: tosca.artifacts.asd.deploymentItem\r
-          file: "Artifacts/Deployment/HELM/energy-saving-rapp-0.1.0.tgz"\r
+          file: "Artifacts/Deployment/HELM/energy-saving-rapp-0.2.5.tgz"\r
           properties:\r
             artifact_type: "helm_chart"\r
             target_server: "chartmuseum"\r
index 4b5e18f..9184b9f 100644 (file)
@@ -15,7 +15,7 @@
         "chart": {
           "chartId": {
             "name": "energy-saving-rapp",
-            "version": "0.1.0"
+            "version": "0.2.5"
           },
           "namespace": "nonrtric",
           "releaseName": "energy-saving-rapp",
index 27d6fe8..311f681 100644 (file)
@@ -59,6 +59,17 @@ class ASSIST(object):
     def send_request_to_server(self, json_data, randomize=False):
 
         if(not randomize):
+
+            if isinstance(json_data, dict) and "instances" in json_data:
+                json_data["instances"] = [
+                    [
+                        [float(val) for val in sublist]
+                        for sublist in instance
+                    ]
+                    for instance in json_data["instances"]
+                ]
+
+
             with open('input.json', 'w') as f:
                 json.dump(json_data, f)
 
@@ -69,8 +80,7 @@ class ASSIST(object):
             host = self.kserve_url.split('/')[2].split(':')[0]
             headers = {'Host': host }
 
-            with open('input.json', 'rb') as f:
-                response = requests.post(url, headers=headers, data=f)
+            response = requests.post(url, headers=headers, json=json_data)
             logger.info("Prediction result")
             logger.info(response.text)
             return response.status_code, response.text
index c39467d..33d2c4a 100644 (file)
     "odufunction_id": "GNBDUFunction-001"
   },
   "DB": {
-    "host": "localhost",
-    "port": 8086,
-    "bucket": "pm-bucket",
+    "host": "10.101.3.89",
+    "port": 31812,
+    "bucket": "pm-logg-bucket",
     "org": "est",
-    "token": "WcrfLfqC63uCxfDa15C1sb6WtG5fhzrS",
+    "token": "hIVMgY7vn3M772PSk3yrI2IjeWzybPw0",
     "user": "admin",
     "password": "mySuP3rS3cr3tT0keN",
     "example_data_file": "example_data.json",
-    "database": "pm-bucket",
-    "measurement": "CellReports",
+    "database": "pm-logg-bucket",
+    "time_range": "-30d",
+    "measurements": [
+        "ManagedElement=o-du-pynts-1122,ManagedElement=o-du-pynts-1122,GNBDUFunction=1,NRCellDU=1",
+        "ManagedElement=o-du-pynts-1123,ManagedElement=o-du-pynts-1123,GNBDUFunction=1,NRCellDU=1",
+        "o-ran-pm"
+    ],
     "ssl": false,
-    "address": "http://localhost:8086"
+    "address": "http://10.101.3.89:31812"
   }
 }
\ No newline at end of file
index 74662ec..0b70154 100644 (file)
@@ -47,6 +47,8 @@ class DATABASE(object):
         self.influx_invoker_id = None
         self.influx_api_name = None
         self.influx_resource_name = None
+        self.time_range = None
+        self.measurements = None
         self.config()
 
         # Set pandas options to display all rows and columns
@@ -94,7 +96,7 @@ class DATABASE(object):
             time.sleep(120)
 
     # Query information
-    def read_data(self, train=False, valid=False, limit=False):
+    def read_data_old(self, train=False, valid=False, limit=False):
 
         self.data = None
         query = 'from(bucket:"{}")'.format(self.bucket)
@@ -107,6 +109,29 @@ class DATABASE(object):
         self.data = result
         return result
 
+    def read_data(self, train=False, valid=False, limit=False):
+        self.data = None
+        query = 'from(bucket:"{}")'.format(self.bucket)
+
+        time_range = getattr(self, 'time_range', '-10m')
+        query += f'|> range(start: {time_range}) '
+
+        measurements = getattr(self, 'measurements', ['o-ran-pm'])
+        if isinstance(measurements, str):
+            measurements = [measurements]
+
+        measurement_filters = [f'r["_measurement"] == "{m}"' for m in measurements]
+        query += f' |> filter(fn: (r) => {" or ".join(measurement_filters)})'
+
+        query += ' |> filter(fn: (r) => r["_field"] == "CellID" or r["_field"] == "DRB.UEThpUl" or r["_field"] == "RRU.PrbUsedUl" or r["_field"] == "PEE.AvgPower") '
+        # Keep _measurement in the rowKey to preserve it
+        query += ' |> pivot(rowKey: ["_time", "_measurement"], columnKey: ["_field"], valueColumn: "_value") '
+
+        result = self.query(query)
+        #logger.debug(f"Data grouped by measurement:\n{result.groupby('_measurement').size()}")
+        self.data = result
+        return result
+
     # Query data
     def query(self, query):
         while True:
@@ -129,40 +154,29 @@ class DATABASE(object):
 
     def generate_synthetic_data(self):
         data = []
-        fields = ["CellID", "DRB.UEThpUl", "RRU.PrbUsedUl", "PEE.AvgPower"]
-        # fields = ["CellID", "DRB.UEThpUl", "RRU.PrbUsedUl", "PEE.AvgPower", "RRC.ConnMean"]
-        other_fields = ["TestFilterField1", "TestFilterField2", "TestFilterField3", "TestFilterField4"]
-        measurements = ["o-ran-pm", "test-filter-measurement1", "test-filter-measurement2", "test-filter-measurement3", "test-filter-measurement4"]
+        fields = ["CellID", "DRB.UEThpUl", "RRU.PrbUsedUl", "PEE.AvgPower", "GranularityPeriod", "RRC.ConnMean", "RRU.PrbTotDl", "DRB.UEThpDl"]
+        measurements = ["o-ran-pm", "ManagedElement=o-du-pynts-1122,ManagedElement=o-du-pynts-1122,GNBDUFunction=1,NRCellDU=1", "ManagedElement=o-du-pynts-1123,ManagedElement=o-du-pynts-1123,GNBDUFunction=1,NRCellDU=1"]
 
         # Generate matching records (synchronized _time for each group of 4)
         for _ in range(50):  # 50 records, each with 4 rows sharing the same time
             common_time = datetime.now() - timedelta(minutes=random.randint(0, 60))
             iso_time = common_time.isoformat()
+            measurement = random.choice(measurements)
 
             for field in fields:
                 value = (
-                    f"S{random.randint(1,9)}/B{random.randint(1,9)}/C{random.randint(1,9)}"
-                    if field == "CellID"
-                    else round(random.uniform(1, 100), 2)
+                    f"S{random.randint(1,9)}/B{random.randint(1,9)}/C{random.randint(1,9)}" if field == "CellID"
+                    else (900 if field == "GranularityPeriod"
+                    else str(round(random.uniform(1, 100), 5)))
                 )
                 record = {
                     "_time": iso_time,
-                    "_measurement": "o-ran-pm",
+                    "_measurement": measurement,
                     "_field": field,
                     "_value": value
                 }
                 data.append(record)
 
-        # Generate non-matching records (randomized)
-        for _ in range(50):
-            record = {
-                "_time": (datetime.now() - timedelta(minutes=random.randint(0, 60))).isoformat(),
-                "_measurement": random.choice(measurements),
-                "_field": random.choice(other_fields),
-                "_value": str(round(random.uniform(0, 100), 2))
-            }
-            data.append(record)
-
         # Log data to be written
         print(pd.DataFrame(data).to_string())
 
@@ -202,6 +216,7 @@ class DATABASE(object):
         self.port = influx_config.get("port")
         self.ssl = influx_config.get("ssl")
         self.dbname = influx_config.get("database")
-        self.meas = influx_config.get("measurement")
         self.user = influx_config.get("user")
         self.password = influx_config.get("password")
+        self.time_range = influx_config.get("time_range")
+        self.measurements = influx_config.get("measurements")
index 97dfd1c..89dd281 100644 (file)
@@ -19,6 +19,7 @@ import argparse
 import time
 import pandas as pd
 import schedule
+from threading import Lock
 import logging
 from data import DATABASE
 from assist import ASSIST
@@ -67,13 +68,52 @@ class ESrapp():
 
         # Create policy type and policy instance
         #self.policy_manager.create_policy_type()
+        self.inference_lock = Lock()
+        self._running = False
+
 
     def entry(self):
-        schedule.every(10).seconds.do(self.inference)
+        if self._running:
+            logger.warning("ES rApp is already running")
+            return
 
-        while True:
-            schedule.run_pending()
+        self._running = True
+        self.job = schedule.every(10).seconds.do(self.safe_inference)
+        last_run = 0
+
+        try:
+            while self._running:
+                now = time.time()
+                if now - last_run >= 10:  # 10 second interval
+                    self.safe_inference()
+                    last_run = now
+                time.sleep(1)
+
+        except KeyboardInterrupt:
+            logger.info("ES rApp shutting down gracefully")
+        except Exception as e:
+            logger.error(f"Error in entry loop: {str(e)}", exc_info=True)
+        finally:
+            try:
+                schedule.cancel_job(self.job)
+            except:
+                pass
+            self._running = False
+            try:
+                if self.inference_lock.locked():
+                    self.inference_lock.release()
+            except:
+                pass
+
+    def safe_inference(self):
+        if not self.inference_lock.acquire(blocking=False):
+            logger.warning("Previous inference still running, skipping this iteration")
+            return
 
+        try:
+            self.inference()
+        finally:
+            self.inference_lock.release()
     # Send data to ML rApp
     def inference(self):
         data = self.db.read_data()
@@ -83,48 +123,59 @@ class ESrapp():
             return
 
         data_mapping = self.mapping(data)
-        groups = data_mapping.groupby("CellID")
+        # Group the data by CellID and _measurement. This means that even if cell ids are the same, but the measurement is different, they will be processed separately.
+        groups = data_mapping.groupby(["CellID", "_measurement"])
         for group_name, group_data in groups:
             json_data = self.generate_json_data(group_data)
             logger.info(f"Send data to ML rApp {group_name}: {json_data}")
             status_code, response_text = self.assist.send_request_to_server(json_data, randomize=self.random_predictions)
             if not self.check_and_perform_action(response_text):
-                cell_id_number = group_data['cellidnumber'].iloc[0]
+                cell_id_name = group_data['CellID'].iloc[0]
+                du_name = self.extract_managed_element(group_data['_measurement'].iloc[0])
+                full_cell_id = cell_id_name + "-" + du_name
                 logger.info(f"Turn on the cell {group_name}")
-                # Create policy instance with the cell_id_number before performing action
-                #self.policy_manager.create_policy_instance(cell_id_number)
                 # Wait for 3 seconds before performing the action
                 time.sleep(3)
 
-                if cell_id_number not in self.cell_power_status:
-                    logger.debug(f"Cell {cell_id_number} not in local cache. Adding it...")
-                    self.cell_power_status[cell_id_number] = "off"
+                if full_cell_id not in self.cell_power_status:
+                    logger.debug(f"Cell {full_cell_id} not in local cache. Adding it...")
+                    self.cell_power_status[full_cell_id] = "off"
                 # Check if the cell is already powered on
-                if self.cell_power_status[cell_id_number] == "on":
-                    logger.debug(f"Cell {cell_id_number} is already powered on.")
+                if self.cell_power_status[full_cell_id] == "on":
+                    logger.debug(f"Cell {full_cell_id} is already powered on.")
                     # continue
                 else:
-                    self.ncmp_client.power_on_cell(cell_id_number)
-                    self.cell_power_status[cell_id_number] = "on"
+                    self.ncmp_client.power_on_cell(full_cell_id)
+                    self.cell_power_status[full_cell_id] = "on"
             else:
-                cell_id_number = group_data['cellidnumber'].iloc[0]
+                du_name = self.extract_managed_element(group_data['_measurement'].iloc[0])
+                cell_id_name = group_data['CellID'].iloc[0]
+                full_cell_id = cell_id_name + "-" + du_name
                 logger.info(f"Turn off the cell {group_name}")
-                # Create policy instance with the cell_id_number before performing action
-                #self.policy_manager.create_policy_instance(cell_id_number)
                 # Wait for 3 seconds before performing the action
                 time.sleep(3)
 
-                if cell_id_number not in self.cell_power_status:
-                    logger.debug(f"Cell {cell_id_number} not in local cache. Adding it...")
-                    self.cell_power_status[cell_id_number] = "on"
+                if full_cell_id not in self.cell_power_status:
+                    logger.debug(f"Cell {full_cell_id} not in local cache. Adding it...")
+                    self.cell_power_status[full_cell_id] = "on"
 
-                if self.cell_power_status[cell_id_number] == "off":
-                    logger.debug(f"Cell {cell_id_number} is already powered off.")
+                if self.cell_power_status[full_cell_id] == "off":
+                    logger.debug(f"Cell {full_cell_id} is already powered off.")
                     # continue
                 else:
-                    self.ncmp_client.power_off_cell(cell_id_number)
-                    self.cell_power_status[cell_id_number] = "off"
+                    self.ncmp_client.power_off_cell(full_cell_id)
+                    self.cell_power_status[full_cell_id] = "off"
+
+    def extract_managed_element(self, measurement):
+        if '=' not in measurement or ',' not in measurement:
+            return measurement
+
+        parts = measurement.split(',')
+        for part in parts:
+            if part.startswith('ManagedElement='):
+                return part.split('=')[1]
 
+        return measurement
     # Generate the input data for ML rApp
     def generate_json_data(self, data):
         # rrc_conn_mean_values = data["RRC.ConnMean"].tolist()