From 46f13fb6dba57eb23f6c815b80530529d05a21de Mon Sep 17 00:00:00 2001 From: "saul.gill" Date: Tue, 29 Jul 2025 14:41:14 +0100 Subject: [PATCH] Updates for version 0.2.5 of es-rapp Makes changes to allow ranpm use case flow Update version of docker and helm Alter generated data to match simulator data structure Use threading to prevent overlapping execution Expose more config options Issue-ID: NONRTRIC-1083 Change-Id: Ib05d0d92763d5ec038282823755f5ec38facbd2e Signed-off-by: saul.gill --- .gitignore | 2 + .../Deployment/HELM/energy-saving-chart/Chart.yaml | 4 +- .../resources/config/config.json | 3 +- .../HELM/energy-saving-chart/values.yaml | 13 ++- .../Deployment/HELM/energy-saving-rapp-0.2.5.tgz | Bin 0 -> 5057 bytes .../rapp-energy-saving/Definitions/asd.yaml | 2 +- .../Files/Acm/instances/es-instance.json | 2 +- sample-rapp-generator/es-demo-rapp/src/assist.py | 14 ++- sample-rapp-generator/es-demo-rapp/src/config.json | 19 ++-- sample-rapp-generator/es-demo-rapp/src/data.py | 55 +++++++----- sample-rapp-generator/es-demo-rapp/src/main.py | 99 ++++++++++++++++----- 11 files changed, 151 insertions(+), 62 deletions(-) create mode 100644 sample-rapp-generator/es-demo-rapp/rapp-energy-saving/Artifacts/Deployment/HELM/energy-saving-rapp-0.2.5.tgz diff --git a/.gitignore b/.gitignore index ac4fcd6..b64e4e5 100755 --- a/.gitignore +++ b/.gitignore @@ -37,3 +37,5 @@ build/ openapi/sme/ *.csar + +**/__pycache__/ diff --git a/sample-rapp-generator/es-demo-rapp/rapp-energy-saving/Artifacts/Deployment/HELM/energy-saving-chart/Chart.yaml b/sample-rapp-generator/es-demo-rapp/rapp-energy-saving/Artifacts/Deployment/HELM/energy-saving-chart/Chart.yaml index f7b9238..2fd7fe3 100644 --- a/sample-rapp-generator/es-demo-rapp/rapp-energy-saving/Artifacts/Deployment/HELM/energy-saving-chart/Chart.yaml +++ b/sample-rapp-generator/es-demo-rapp/rapp-energy-saving/Artifacts/Deployment/HELM/energy-saving-chart/Chart.yaml @@ -32,10 +32,10 @@ type: application # This is the chart version. This version number should be incremented each time you make changes # to the chart and its templates, including the app version. # Versions are expected to follow Semantic Versioning (https://semver.org/) -version: 0.1.0 +version: 0.2.5 # This is the version number of the application being deployed. This version number should be # incremented each time you make changes to the application. Versions are not expected to # follow Semantic Versioning. They should reflect the version the application is using. # It is recommended to use it with quotes. -appVersion: "1.16.0" +appVersion: "0.2.5" diff --git a/sample-rapp-generator/es-demo-rapp/rapp-energy-saving/Artifacts/Deployment/HELM/energy-saving-chart/resources/config/config.json b/sample-rapp-generator/es-demo-rapp/rapp-energy-saving/Artifacts/Deployment/HELM/energy-saving-chart/resources/config/config.json index 2fbd20f..e473291 100644 --- a/sample-rapp-generator/es-demo-rapp/rapp-energy-saving/Artifacts/Deployment/HELM/energy-saving-chart/resources/config/config.json +++ b/sample-rapp-generator/es-demo-rapp/rapp-energy-saving/Artifacts/Deployment/HELM/energy-saving-chart/resources/config/config.json @@ -28,7 +28,8 @@ "password": "{{ .Values.influxdb.password }}", "example_data_file": "example_data.json", "database": "{{ .Values.influxdb.bucket }}", - "measurement": "CellReports", + "time_range": "{{ .Values.influxdb.timeRange }}", + "measurements": {{ .Values.influxdb.measurements | toJson }}, "ssl": false, "address": "http://localhost:8086" } diff --git a/sample-rapp-generator/es-demo-rapp/rapp-energy-saving/Artifacts/Deployment/HELM/energy-saving-chart/values.yaml b/sample-rapp-generator/es-demo-rapp/rapp-energy-saving/Artifacts/Deployment/HELM/energy-saving-chart/values.yaml index 1cc7102..890579a 100644 --- a/sample-rapp-generator/es-demo-rapp/rapp-energy-saving/Artifacts/Deployment/HELM/energy-saving-chart/values.yaml +++ b/sample-rapp-generator/es-demo-rapp/rapp-energy-saving/Artifacts/Deployment/HELM/energy-saving-chart/values.yaml @@ -25,7 +25,7 @@ image: repository: "nexus3.onap.org:10001/estdemoimages/es-rapp" pullPolicy: Always # Overrides the image tag whose default is the chart appVersion. - tag: "0.1.1" + tag: "0.2.5" imagePullSecrets: [] nameOverride: "energy-saving-rapp" @@ -102,13 +102,18 @@ environment: smeDiscoveryEndpoint: "http://sme-discovery.default.svc.cluster.local:8080/service-apis/v1/allServiceAPIs" influxdb: - token: "WcrfLfqC63uCxfDa15C1sb6WtG5fhzrS" + token: "hIVMgY7vn3M772PSk3yrI2IjeWzybPw0" user: "admin" password: "mySuP3rS3cr3tT0keN" - bucket: "pm-bucket" + bucket: "pm-logg-bucket" org: "est" apiName: "influxdb2-http" resourceName: "root" + timeRange: "-6h" + measurements: + - "ManagedElement=o-du-pynts-1122,ManagedElement=o-du-pynts-1122,GNBDUFunction=1,NRCellDU=1" + - "ManagedElement=o-du-pynts-1123,ManagedElement=o-du-pynts-1123,GNBDUFunction=1,NRCellDU=1" + - "o-ran-pm" ncmp: @@ -127,5 +132,5 @@ teiv: appStartup: command: ["/bin/sh", "-c"] - args: [ "python main.py --generate_db_data=True --use_sme_db=True --random_predictions=False" ] + args: [ "python main.py --generate_db_data=False --use_sme_db=True --random_predictions=False" ] diff --git a/sample-rapp-generator/es-demo-rapp/rapp-energy-saving/Artifacts/Deployment/HELM/energy-saving-rapp-0.2.5.tgz b/sample-rapp-generator/es-demo-rapp/rapp-energy-saving/Artifacts/Deployment/HELM/energy-saving-rapp-0.2.5.tgz new file mode 100644 index 0000000000000000000000000000000000000000..c4b3413df9cbeb3c4197012ae3179be564c6fe0b GIT binary patch literal 5057 zcmV;y6F%%8iwG0|00000|0w_~VMtOiV@ORlOnEsqVl!4SWK%V1T2nbTPgYhoO;>Dc zVQyr3R8em|NM&qo0PH<$bKADE{mfsn!{khpb4^;3U%9;FJLB4Jk8WexV>#($Ivo#0 zLK12cU;$9Jj-&s62Ma_O1Yag`sJMI0$!%q9VR(t=bwf`NozEb-uOC}`Z z-?g6IR-L)ONkJ+3h`At)1~BP3C{2r>Ex+R*I1v^hC#lqrPT*fSP9Rho0z<~(=WKvE z#S#mLq67!9s85cZrZ@nUrZEYTs>hjBoA}l;_)jhk<9~wU42!R50M?BEqr+CaHvSLV z2c7Nse~I!6;J8%YcTdlI{qx)Y&B^u6@zaX)3gDEbQ%*)>32#o{LZ{U^fXfuqE*)|N zKeCKQ+5*6N##xGeIEiD>@B%m%m``xzJ2je~Q&e5=NrB49{j4EQ0$sRTkHWJwwmL_-YsM2>Yk zwieCOCtC*_NJJHY@`IlFlmytjk!+G}zs}zrXj9CfH|uv>O{xv1|Lv>-a5$ z^nu1$C_(;_5so7mOmnL{h%v)v!P zdwsb45l(ua;OB1dYzHurW6S|Rq+D^pI1nXFs|5Pi%!dRhFKhM0OlwoLqdEKhFUtBg}zZF zEZ}~OwI(94aDWUTh%w9JNQpk=7)cyCVE+q1@(k;WPAFx$ib)12(gRKFYPC8C0L@_o zc${hcn=uB(L~a)9L`bI0kFR07Lv4-(mP)}bEn3bz!#jXXMMS_z$pPunEY#3Y73b0_ z)W!iY0i5ePPRgQ_8g=SfABPzya(c?B#1FEddMJM4EK39Ev|24C;byQ9rD#B6A_*2o z?f?;IY5uxzXHr6?%fHq26kJsDu-nai%-M1EGhWVY$+@Cu3oIYfk2?yy+$?9gYRd zc!=jp{tJdoWGIfOz;Vcu1k*^XvSikzPYbhAU~3(h3}TEX*8d`*ohTI{VVNN-95Xds zatDzK%za~HQ7n|lQ=>#+p-82n`O8)8kx}KzFr5(2sFIK{PCO=3##Xs8ru0VuXmnJ)#v(N@y81+)6V9Ssb%_LPxsxm{0|RqTe&2-zTa8ibE^};?o_? zKgS817+)A>Jf6aUBQ2Yu#2mh#i~4)tR+^A`ABAbA$3VCW!eL9-Zy^Eux&Q zacG#NK}BHdi5DfrOXF-r=sL0GkF^r+-tLXk)KeydQ73+Yh%s)&q2F=SeySj+;1Oj=YL+J zJU;IJb6M)UodnuW0&OROo-qmZ&)wO~aoRrogiXv}S_FX)*Y8#5@QhNMl?$9ZBM~@{ zj~KwM z(@lNK<#P$W%d_*_tIO+~2EPkN z6_Lm8qpy_3SG9^?Tfh0#5;1f0i58)1A~cM%2-ng8Kc6yj!H3!GZH>V0RgFAVx%eU? ziNuMfPe>#);iD*09fI{U`(x&60>UxN4a93pM)0~2*b>>lXDIps#mY$WRq_V_6#xX# zmD*-WbcX@K5Z|lJPDOmhAS_o2-77msi6)4|N?rZZY=ky}A);`PWM~Ye;Zs?`(s=m( z$$c#*A$a#2_u%*Q9_*iAf9#$%bfT_uleRZ)6g)2vufTnLbRQq>pnOAUgdZU9^SC6? zd_+R)tsZl}9(ZPWxw8gMn6ncof@>YDIbJ$2iwUv~M&{M!*=_IS;=Crbltr5=ehe;3 zQ@=VTrVx*p3v@2bokd(ig;YX2uD7XVwRE5*SRxg}>$w9E%WwH@J-c8hh|P*X$=RsPD5rcQ ziWL>!knwxOqZ0Y5jo@`vC}m^+t)tm(d1_gi|J{ypoMJ9~nLcMLz`Fde)2ZiwM|e;ux z(;WggA&RM_h*iyA0<+33;hw~?N;)z%$o0~oBHmKFM!fSFJr{8)^R90!$R)S*{U%aZ zaku^dwdNo=-n?7qG#0WjmDD3%LucjT6NyVXp>hcBKZN%W;nwOhjc%@F&Hq+Zuhge% zwAhWJsy}tVS|u@1F&f}_ZQp9xdB@fnD$CmZpSyw2z;PO@c-Z|f7u;L->BC7c-55H> zIyw!MA#{x6#24dTO?$8+xwOprruk)1!mfzEw@vsZ4YEw16gzU%yT!b!*s-fJh6R+* z@JGfZt_fkWmV6OwFbyiHhg|?|6k@)dyP+8Cyn2m%XyR1f9=C))n^15C7&%!K1V+9K~;af+=|2bu4 z{ueSjB%=hS&)W>XCjaZSJ4f~W@36hy|NSCmoBwU|zis~ar{;f^BUWwaj?gH8Q*Bx= zQ0nB9&q1*TcaylpD{Qj^HZ(H3s&pjKj2##8-)EqNMwm+2SMJs{|M&2x7_+a z#cTj>v8T{F1N+DST`=mt^-qLhMAvDn?7(dHtOih|9^Pc z*{k~hPOG)IJ^%S4W$XX9{(tNLU(ElbG!?r=5O7x5*C#~*)hz{I!T&2626hJzF~^GX zw7s;)zr6QIpCq#z`Ad5Y$_LLDHU$CXlQV|W1j%sxzC`*;;uj~e=1d%#>T}0st1n43 zuUn7FoY?9qr((}b<$O=EaYbQE_6a3%rR8^XvDy!fX~Syq0JLpX-YPm3dmX-pW)vl>nMU~Ywn z(s>|X9|kH-JACKb~KIQf}hvvfI0PQaep&i<+CAusBQbg&Io>9hRu4 zSLW=W3gSy?T8$b*xOrcDC9;93q8U3sDY4K~rJ3jX*$Ur+*G1RRwGBwi`?$d>siIG6 zY)F}ceHvLB<4H6vSm^TEv^iMi#M*N#^Fn+1LAR1)%a6W&)pklJ?B-DAV|UMK&8U2X zEnm>zFrQEf8uk6jb1LOxjqnAFZ~(2NgM-?8y^*lquA=o0PnQj*yrc^Q?nU#zmHGcT zMPK6@m^JzT-a)5+|I^`qr@hVpU!rXD|84%i&Hw+@{J$QDG-uS6S%Nvp?xbTU3;xA8 z`L~L`<0}@OSP0Dj8Z!r)mK%LY{~8ZXeNFQo>iNRl$0lBSf%3}b0;SNpEZ!1~MJiuF zuncJqW0A1rBrFT5xieQ^Wst`-p1q0W+NLu$B*?Ryc@$yeoHQ$*ml|(4zn3W6_-`BkZR5X}jQ{E*!^{wJkVEN^>E$4sDA$Br^Cei%&rlPT1k>@cn#qyr< zJJqju@60aicI8xVQ*--YwYdS7sMKa11wTkk_3){}8ljjtzH7wToQBMG`m1CcP3v9_ z{jVVxR`Hh1KJG89Q`S~P*NmCx4Hwh6t@d<&(O`M?W}(f+h1De{#$4fG@AoT&U~hp~ z2AkhHSev-DSfv{!@<~obYtw>7!{XumT0w<0|J|5kRDSR*#%p<~5SQPbs9MtZRVr=iAKOR`0;5V&~eE-@v#+TV6jwUFKzewPQ}kP5JtUWgTJ40EKtB zzMNAKzSP!jMCsxJVCgE(YFTD1szqPc*)V-hvt)p|!OtDx0($wBLL*hZ4QQ~S#k%L3 z=IZip$e#ZSnd8d*FApRB-2AVzx7Tp~dprOCB4wNZZS%iv{`d0x|Lu{-zsBsnEJBE- zSN@cC33>MB?6rG;%rvTTn5C^mOa89qSnwys3oEu?{|Pg3HNL*$gxueB zY4vYW8vOsNBcET?#ybChbZ}7j|NHH|t^a?C^5_7#{fl!qfJZ$lU43_bz9Ib<+a2j{ z^#sG(4eYa-yJPAbGoi8dH!$=M`$8oPZUB4jgQElch=xgeOX!5%VSY=Z6^)wR`SVsZ zCQ8YzPLQg2>YZ=DDNskrm3=8*`8xw8 range(start: {time_range}) ' + + measurements = getattr(self, 'measurements', ['o-ran-pm']) + if isinstance(measurements, str): + measurements = [measurements] + + measurement_filters = [f'r["_measurement"] == "{m}"' for m in measurements] + query += f' |> filter(fn: (r) => {" or ".join(measurement_filters)})' + + query += ' |> filter(fn: (r) => r["_field"] == "CellID" or r["_field"] == "DRB.UEThpUl" or r["_field"] == "RRU.PrbUsedUl" or r["_field"] == "PEE.AvgPower") ' + # Keep _measurement in the rowKey to preserve it + query += ' |> pivot(rowKey: ["_time", "_measurement"], columnKey: ["_field"], valueColumn: "_value") ' + + result = self.query(query) + #logger.debug(f"Data grouped by measurement:\n{result.groupby('_measurement').size()}") + self.data = result + return result + # Query data def query(self, query): while True: @@ -129,40 +154,29 @@ class DATABASE(object): def generate_synthetic_data(self): data = [] - fields = ["CellID", "DRB.UEThpUl", "RRU.PrbUsedUl", "PEE.AvgPower"] - # fields = ["CellID", "DRB.UEThpUl", "RRU.PrbUsedUl", "PEE.AvgPower", "RRC.ConnMean"] - other_fields = ["TestFilterField1", "TestFilterField2", "TestFilterField3", "TestFilterField4"] - measurements = ["o-ran-pm", "test-filter-measurement1", "test-filter-measurement2", "test-filter-measurement3", "test-filter-measurement4"] + fields = ["CellID", "DRB.UEThpUl", "RRU.PrbUsedUl", "PEE.AvgPower", "GranularityPeriod", "RRC.ConnMean", "RRU.PrbTotDl", "DRB.UEThpDl"] + measurements = ["o-ran-pm", "ManagedElement=o-du-pynts-1122,ManagedElement=o-du-pynts-1122,GNBDUFunction=1,NRCellDU=1", "ManagedElement=o-du-pynts-1123,ManagedElement=o-du-pynts-1123,GNBDUFunction=1,NRCellDU=1"] # Generate matching records (synchronized _time for each group of 4) for _ in range(50): # 50 records, each with 4 rows sharing the same time common_time = datetime.now() - timedelta(minutes=random.randint(0, 60)) iso_time = common_time.isoformat() + measurement = random.choice(measurements) for field in fields: value = ( - f"S{random.randint(1,9)}/B{random.randint(1,9)}/C{random.randint(1,9)}" - if field == "CellID" - else round(random.uniform(1, 100), 2) + f"S{random.randint(1,9)}/B{random.randint(1,9)}/C{random.randint(1,9)}" if field == "CellID" + else (900 if field == "GranularityPeriod" + else str(round(random.uniform(1, 100), 5))) ) record = { "_time": iso_time, - "_measurement": "o-ran-pm", + "_measurement": measurement, "_field": field, "_value": value } data.append(record) - # Generate non-matching records (randomized) - for _ in range(50): - record = { - "_time": (datetime.now() - timedelta(minutes=random.randint(0, 60))).isoformat(), - "_measurement": random.choice(measurements), - "_field": random.choice(other_fields), - "_value": str(round(random.uniform(0, 100), 2)) - } - data.append(record) - # Log data to be written print(pd.DataFrame(data).to_string()) @@ -202,6 +216,7 @@ class DATABASE(object): self.port = influx_config.get("port") self.ssl = influx_config.get("ssl") self.dbname = influx_config.get("database") - self.meas = influx_config.get("measurement") self.user = influx_config.get("user") self.password = influx_config.get("password") + self.time_range = influx_config.get("time_range") + self.measurements = influx_config.get("measurements") diff --git a/sample-rapp-generator/es-demo-rapp/src/main.py b/sample-rapp-generator/es-demo-rapp/src/main.py index 97dfd1c..89dd281 100644 --- a/sample-rapp-generator/es-demo-rapp/src/main.py +++ b/sample-rapp-generator/es-demo-rapp/src/main.py @@ -19,6 +19,7 @@ import argparse import time import pandas as pd import schedule +from threading import Lock import logging from data import DATABASE from assist import ASSIST @@ -67,13 +68,52 @@ class ESrapp(): # Create policy type and policy instance #self.policy_manager.create_policy_type() + self.inference_lock = Lock() + self._running = False + def entry(self): - schedule.every(10).seconds.do(self.inference) + if self._running: + logger.warning("ES rApp is already running") + return - while True: - schedule.run_pending() + self._running = True + self.job = schedule.every(10).seconds.do(self.safe_inference) + last_run = 0 + + try: + while self._running: + now = time.time() + if now - last_run >= 10: # 10 second interval + self.safe_inference() + last_run = now + time.sleep(1) + + except KeyboardInterrupt: + logger.info("ES rApp shutting down gracefully") + except Exception as e: + logger.error(f"Error in entry loop: {str(e)}", exc_info=True) + finally: + try: + schedule.cancel_job(self.job) + except: + pass + self._running = False + try: + if self.inference_lock.locked(): + self.inference_lock.release() + except: + pass + + def safe_inference(self): + if not self.inference_lock.acquire(blocking=False): + logger.warning("Previous inference still running, skipping this iteration") + return + try: + self.inference() + finally: + self.inference_lock.release() # Send data to ML rApp def inference(self): data = self.db.read_data() @@ -83,48 +123,59 @@ class ESrapp(): return data_mapping = self.mapping(data) - groups = data_mapping.groupby("CellID") + # Group the data by CellID and _measurement. This means that even if cell ids are the same, but the measurement is different, they will be processed separately. + groups = data_mapping.groupby(["CellID", "_measurement"]) for group_name, group_data in groups: json_data = self.generate_json_data(group_data) logger.info(f"Send data to ML rApp {group_name}: {json_data}") status_code, response_text = self.assist.send_request_to_server(json_data, randomize=self.random_predictions) if not self.check_and_perform_action(response_text): - cell_id_number = group_data['cellidnumber'].iloc[0] + cell_id_name = group_data['CellID'].iloc[0] + du_name = self.extract_managed_element(group_data['_measurement'].iloc[0]) + full_cell_id = cell_id_name + "-" + du_name logger.info(f"Turn on the cell {group_name}") - # Create policy instance with the cell_id_number before performing action - #self.policy_manager.create_policy_instance(cell_id_number) # Wait for 3 seconds before performing the action time.sleep(3) - if cell_id_number not in self.cell_power_status: - logger.debug(f"Cell {cell_id_number} not in local cache. Adding it...") - self.cell_power_status[cell_id_number] = "off" + if full_cell_id not in self.cell_power_status: + logger.debug(f"Cell {full_cell_id} not in local cache. Adding it...") + self.cell_power_status[full_cell_id] = "off" # Check if the cell is already powered on - if self.cell_power_status[cell_id_number] == "on": - logger.debug(f"Cell {cell_id_number} is already powered on.") + if self.cell_power_status[full_cell_id] == "on": + logger.debug(f"Cell {full_cell_id} is already powered on.") # continue else: - self.ncmp_client.power_on_cell(cell_id_number) - self.cell_power_status[cell_id_number] = "on" + self.ncmp_client.power_on_cell(full_cell_id) + self.cell_power_status[full_cell_id] = "on" else: - cell_id_number = group_data['cellidnumber'].iloc[0] + du_name = self.extract_managed_element(group_data['_measurement'].iloc[0]) + cell_id_name = group_data['CellID'].iloc[0] + full_cell_id = cell_id_name + "-" + du_name logger.info(f"Turn off the cell {group_name}") - # Create policy instance with the cell_id_number before performing action - #self.policy_manager.create_policy_instance(cell_id_number) # Wait for 3 seconds before performing the action time.sleep(3) - if cell_id_number not in self.cell_power_status: - logger.debug(f"Cell {cell_id_number} not in local cache. Adding it...") - self.cell_power_status[cell_id_number] = "on" + if full_cell_id not in self.cell_power_status: + logger.debug(f"Cell {full_cell_id} not in local cache. Adding it...") + self.cell_power_status[full_cell_id] = "on" - if self.cell_power_status[cell_id_number] == "off": - logger.debug(f"Cell {cell_id_number} is already powered off.") + if self.cell_power_status[full_cell_id] == "off": + logger.debug(f"Cell {full_cell_id} is already powered off.") # continue else: - self.ncmp_client.power_off_cell(cell_id_number) - self.cell_power_status[cell_id_number] = "off" + self.ncmp_client.power_off_cell(full_cell_id) + self.cell_power_status[full_cell_id] = "off" + + def extract_managed_element(self, measurement): + if '=' not in measurement or ',' not in measurement: + return measurement + + parts = measurement.split(',') + for part in parts: + if part.startswith('ManagedElement='): + return part.split('=')[1] + return measurement # Generate the input data for ML rApp def generate_json_data(self, data): # rrc_conn_mean_values = data["RRC.ConnMean"].tolist() -- 2.16.6