openapi/sme/
*.csar
+
+**/__pycache__/
# This is the chart version. This version number should be incremented each time you make changes
# to the chart and its templates, including the app version.
# Versions are expected to follow Semantic Versioning (https://semver.org/)
-version: 0.1.0
+version: 0.2.5
# This is the version number of the application being deployed. This version number should be
# incremented each time you make changes to the application. Versions are not expected to
# follow Semantic Versioning. They should reflect the version the application is using.
# It is recommended to use it with quotes.
-appVersion: "1.16.0"
+appVersion: "0.2.5"
"password": "{{ .Values.influxdb.password }}",
"example_data_file": "example_data.json",
"database": "{{ .Values.influxdb.bucket }}",
- "measurement": "CellReports",
+ "time_range": "{{ .Values.influxdb.timeRange }}",
+ "measurements": {{ .Values.influxdb.measurements | toJson }},
"ssl": false,
"address": "http://localhost:8086"
}
repository: "nexus3.onap.org:10001/estdemoimages/es-rapp"
pullPolicy: Always
# Overrides the image tag whose default is the chart appVersion.
- tag: "0.1.1"
+ tag: "0.2.5"
imagePullSecrets: []
nameOverride: "energy-saving-rapp"
smeDiscoveryEndpoint: "http://sme-discovery.default.svc.cluster.local:8080/service-apis/v1/allServiceAPIs"
influxdb:
- token: "WcrfLfqC63uCxfDa15C1sb6WtG5fhzrS"
+ token: "hIVMgY7vn3M772PSk3yrI2IjeWzybPw0"
user: "admin"
password: "mySuP3rS3cr3tT0keN"
- bucket: "pm-bucket"
+ bucket: "pm-logg-bucket"
org: "est"
apiName: "influxdb2-http"
resourceName: "root"
+ timeRange: "-6h"
+ measurements:
+ - "ManagedElement=o-du-pynts-1122,ManagedElement=o-du-pynts-1122,GNBDUFunction=1,NRCellDU=1"
+ - "ManagedElement=o-du-pynts-1123,ManagedElement=o-du-pynts-1123,GNBDUFunction=1,NRCellDU=1"
+ - "o-ran-pm"
ncmp:
appStartup:
command: ["/bin/sh", "-c"]
- args: [ "python main.py --generate_db_data=True --use_sme_db=True --random_predictions=False" ]
+ args: [ "python main.py --generate_db_data=False --use_sme_db=True --random_predictions=False" ]
artifacts:\r
energy-saving:\r
type: tosca.artifacts.asd.deploymentItem\r
- file: "Artifacts/Deployment/HELM/energy-saving-rapp-0.1.0.tgz"\r
+ file: "Artifacts/Deployment/HELM/energy-saving-rapp-0.2.5.tgz"\r
properties:\r
artifact_type: "helm_chart"\r
target_server: "chartmuseum"\r
"chart": {
"chartId": {
"name": "energy-saving-rapp",
- "version": "0.1.0"
+ "version": "0.2.5"
},
"namespace": "nonrtric",
"releaseName": "energy-saving-rapp",
def send_request_to_server(self, json_data, randomize=False):
if(not randomize):
+
+ if isinstance(json_data, dict) and "instances" in json_data:
+ json_data["instances"] = [
+ [
+ [float(val) for val in sublist]
+ for sublist in instance
+ ]
+ for instance in json_data["instances"]
+ ]
+
+
with open('input.json', 'w') as f:
json.dump(json_data, f)
host = self.kserve_url.split('/')[2].split(':')[0]
headers = {'Host': host }
- with open('input.json', 'rb') as f:
- response = requests.post(url, headers=headers, data=f)
+ response = requests.post(url, headers=headers, json=json_data)
logger.info("Prediction result")
logger.info(response.text)
return response.status_code, response.text
"odufunction_id": "GNBDUFunction-001"
},
"DB": {
- "host": "localhost",
- "port": 8086,
- "bucket": "pm-bucket",
+ "host": "10.101.3.89",
+ "port": 31812,
+ "bucket": "pm-logg-bucket",
"org": "est",
- "token": "WcrfLfqC63uCxfDa15C1sb6WtG5fhzrS",
+ "token": "hIVMgY7vn3M772PSk3yrI2IjeWzybPw0",
"user": "admin",
"password": "mySuP3rS3cr3tT0keN",
"example_data_file": "example_data.json",
- "database": "pm-bucket",
- "measurement": "CellReports",
+ "database": "pm-logg-bucket",
+ "time_range": "-30d",
+ "measurements": [
+ "ManagedElement=o-du-pynts-1122,ManagedElement=o-du-pynts-1122,GNBDUFunction=1,NRCellDU=1",
+ "ManagedElement=o-du-pynts-1123,ManagedElement=o-du-pynts-1123,GNBDUFunction=1,NRCellDU=1",
+ "o-ran-pm"
+ ],
"ssl": false,
- "address": "http://localhost:8086"
+ "address": "http://10.101.3.89:31812"
}
}
\ No newline at end of file
self.influx_invoker_id = None
self.influx_api_name = None
self.influx_resource_name = None
+ self.time_range = None
+ self.measurements = None
self.config()
# Set pandas options to display all rows and columns
time.sleep(120)
# Query information
- def read_data(self, train=False, valid=False, limit=False):
+ def read_data_old(self, train=False, valid=False, limit=False):
self.data = None
query = 'from(bucket:"{}")'.format(self.bucket)
self.data = result
return result
+ def read_data(self, train=False, valid=False, limit=False):
+ self.data = None
+ query = 'from(bucket:"{}")'.format(self.bucket)
+
+ time_range = getattr(self, 'time_range', '-10m')
+ query += f'|> range(start: {time_range}) '
+
+ measurements = getattr(self, 'measurements', ['o-ran-pm'])
+ if isinstance(measurements, str):
+ measurements = [measurements]
+
+ measurement_filters = [f'r["_measurement"] == "{m}"' for m in measurements]
+ query += f' |> filter(fn: (r) => {" or ".join(measurement_filters)})'
+
+ query += ' |> filter(fn: (r) => r["_field"] == "CellID" or r["_field"] == "DRB.UEThpUl" or r["_field"] == "RRU.PrbUsedUl" or r["_field"] == "PEE.AvgPower") '
+ # Keep _measurement in the rowKey to preserve it
+ query += ' |> pivot(rowKey: ["_time", "_measurement"], columnKey: ["_field"], valueColumn: "_value") '
+
+ result = self.query(query)
+ #logger.debug(f"Data grouped by measurement:\n{result.groupby('_measurement').size()}")
+ self.data = result
+ return result
+
# Query data
def query(self, query):
while True:
def generate_synthetic_data(self):
data = []
- fields = ["CellID", "DRB.UEThpUl", "RRU.PrbUsedUl", "PEE.AvgPower"]
- # fields = ["CellID", "DRB.UEThpUl", "RRU.PrbUsedUl", "PEE.AvgPower", "RRC.ConnMean"]
- other_fields = ["TestFilterField1", "TestFilterField2", "TestFilterField3", "TestFilterField4"]
- measurements = ["o-ran-pm", "test-filter-measurement1", "test-filter-measurement2", "test-filter-measurement3", "test-filter-measurement4"]
+ fields = ["CellID", "DRB.UEThpUl", "RRU.PrbUsedUl", "PEE.AvgPower", "GranularityPeriod", "RRC.ConnMean", "RRU.PrbTotDl", "DRB.UEThpDl"]
+ measurements = ["o-ran-pm", "ManagedElement=o-du-pynts-1122,ManagedElement=o-du-pynts-1122,GNBDUFunction=1,NRCellDU=1", "ManagedElement=o-du-pynts-1123,ManagedElement=o-du-pynts-1123,GNBDUFunction=1,NRCellDU=1"]
# Generate matching records (synchronized _time for each group of 4)
for _ in range(50): # 50 records, each with 4 rows sharing the same time
common_time = datetime.now() - timedelta(minutes=random.randint(0, 60))
iso_time = common_time.isoformat()
+ measurement = random.choice(measurements)
for field in fields:
value = (
- f"S{random.randint(1,9)}/B{random.randint(1,9)}/C{random.randint(1,9)}"
- if field == "CellID"
- else round(random.uniform(1, 100), 2)
+ f"S{random.randint(1,9)}/B{random.randint(1,9)}/C{random.randint(1,9)}" if field == "CellID"
+ else (900 if field == "GranularityPeriod"
+ else str(round(random.uniform(1, 100), 5)))
)
record = {
"_time": iso_time,
- "_measurement": "o-ran-pm",
+ "_measurement": measurement,
"_field": field,
"_value": value
}
data.append(record)
- # Generate non-matching records (randomized)
- for _ in range(50):
- record = {
- "_time": (datetime.now() - timedelta(minutes=random.randint(0, 60))).isoformat(),
- "_measurement": random.choice(measurements),
- "_field": random.choice(other_fields),
- "_value": str(round(random.uniform(0, 100), 2))
- }
- data.append(record)
-
# Log data to be written
print(pd.DataFrame(data).to_string())
self.port = influx_config.get("port")
self.ssl = influx_config.get("ssl")
self.dbname = influx_config.get("database")
- self.meas = influx_config.get("measurement")
self.user = influx_config.get("user")
self.password = influx_config.get("password")
+ self.time_range = influx_config.get("time_range")
+ self.measurements = influx_config.get("measurements")
import time
import pandas as pd
import schedule
+from threading import Lock
import logging
from data import DATABASE
from assist import ASSIST
# Create policy type and policy instance
#self.policy_manager.create_policy_type()
+ self.inference_lock = Lock()
+ self._running = False
+
def entry(self):
- schedule.every(10).seconds.do(self.inference)
+ if self._running:
+ logger.warning("ES rApp is already running")
+ return
- while True:
- schedule.run_pending()
+ self._running = True
+ self.job = schedule.every(10).seconds.do(self.safe_inference)
+ last_run = 0
+
+ try:
+ while self._running:
+ now = time.time()
+ if now - last_run >= 10: # 10 second interval
+ self.safe_inference()
+ last_run = now
+ time.sleep(1)
+
+ except KeyboardInterrupt:
+ logger.info("ES rApp shutting down gracefully")
+ except Exception as e:
+ logger.error(f"Error in entry loop: {str(e)}", exc_info=True)
+ finally:
+ try:
+ schedule.cancel_job(self.job)
+ except:
+ pass
+ self._running = False
+ try:
+ if self.inference_lock.locked():
+ self.inference_lock.release()
+ except:
+ pass
+
+ def safe_inference(self):
+ if not self.inference_lock.acquire(blocking=False):
+ logger.warning("Previous inference still running, skipping this iteration")
+ return
+ try:
+ self.inference()
+ finally:
+ self.inference_lock.release()
# Send data to ML rApp
def inference(self):
data = self.db.read_data()
return
data_mapping = self.mapping(data)
- groups = data_mapping.groupby("CellID")
+ # Group the data by CellID and _measurement. This means that even if cell ids are the same, but the measurement is different, they will be processed separately.
+ groups = data_mapping.groupby(["CellID", "_measurement"])
for group_name, group_data in groups:
json_data = self.generate_json_data(group_data)
logger.info(f"Send data to ML rApp {group_name}: {json_data}")
status_code, response_text = self.assist.send_request_to_server(json_data, randomize=self.random_predictions)
if not self.check_and_perform_action(response_text):
- cell_id_number = group_data['cellidnumber'].iloc[0]
+ cell_id_name = group_data['CellID'].iloc[0]
+ du_name = self.extract_managed_element(group_data['_measurement'].iloc[0])
+ full_cell_id = cell_id_name + "-" + du_name
logger.info(f"Turn on the cell {group_name}")
- # Create policy instance with the cell_id_number before performing action
- #self.policy_manager.create_policy_instance(cell_id_number)
# Wait for 3 seconds before performing the action
time.sleep(3)
- if cell_id_number not in self.cell_power_status:
- logger.debug(f"Cell {cell_id_number} not in local cache. Adding it...")
- self.cell_power_status[cell_id_number] = "off"
+ if full_cell_id not in self.cell_power_status:
+ logger.debug(f"Cell {full_cell_id} not in local cache. Adding it...")
+ self.cell_power_status[full_cell_id] = "off"
# Check if the cell is already powered on
- if self.cell_power_status[cell_id_number] == "on":
- logger.debug(f"Cell {cell_id_number} is already powered on.")
+ if self.cell_power_status[full_cell_id] == "on":
+ logger.debug(f"Cell {full_cell_id} is already powered on.")
# continue
else:
- self.ncmp_client.power_on_cell(cell_id_number)
- self.cell_power_status[cell_id_number] = "on"
+ self.ncmp_client.power_on_cell(full_cell_id)
+ self.cell_power_status[full_cell_id] = "on"
else:
- cell_id_number = group_data['cellidnumber'].iloc[0]
+ du_name = self.extract_managed_element(group_data['_measurement'].iloc[0])
+ cell_id_name = group_data['CellID'].iloc[0]
+ full_cell_id = cell_id_name + "-" + du_name
logger.info(f"Turn off the cell {group_name}")
- # Create policy instance with the cell_id_number before performing action
- #self.policy_manager.create_policy_instance(cell_id_number)
# Wait for 3 seconds before performing the action
time.sleep(3)
- if cell_id_number not in self.cell_power_status:
- logger.debug(f"Cell {cell_id_number} not in local cache. Adding it...")
- self.cell_power_status[cell_id_number] = "on"
+ if full_cell_id not in self.cell_power_status:
+ logger.debug(f"Cell {full_cell_id} not in local cache. Adding it...")
+ self.cell_power_status[full_cell_id] = "on"
- if self.cell_power_status[cell_id_number] == "off":
- logger.debug(f"Cell {cell_id_number} is already powered off.")
+ if self.cell_power_status[full_cell_id] == "off":
+ logger.debug(f"Cell {full_cell_id} is already powered off.")
# continue
else:
- self.ncmp_client.power_off_cell(cell_id_number)
- self.cell_power_status[cell_id_number] = "off"
+ self.ncmp_client.power_off_cell(full_cell_id)
+ self.cell_power_status[full_cell_id] = "off"
+
+ def extract_managed_element(self, measurement):
+ if '=' not in measurement or ',' not in measurement:
+ return measurement
+
+ parts = measurement.split(',')
+ for part in parts:
+ if part.startswith('ManagedElement='):
+ return part.split('=')[1]
+ return measurement
# Generate the input data for ML rApp
def generate_json_data(self, data):
# rrc_conn_mean_values = data["RRC.ConnMean"].tolist()