From 4cfece2770950e9d9b5a728b7f41f44851e2f1c1 Mon Sep 17 00:00:00 2001 From: Deepanshu Karnwal Date: Thu, 29 Jun 2023 22:10:46 +0530 Subject: [PATCH] Release 1.0.1 Change-Id: Ie8fc46d2fd8635922bde841809c9c85ac75fccdf Signed-off-by: Deepanshu Karnwal --- README.txt | 4 ++- docs/release-notes.rst | 3 +++ local.rt | 1 + setup.py | 2 +- src/ad_config.ini | 1 + src/ad_model.py | 10 +++---- src/database.py | 1 + src/insert.py | 6 ++--- src/main.py | 63 ++++++++++++++++++++++++++++++++++++++++----- tox.ini | 3 +++ xapp-descriptor/config.json | 2 +- 11 files changed, 79 insertions(+), 17 deletions(-) diff --git a/README.txt b/README.txt index cffd078..624ded5 100644 --- a/README.txt +++ b/README.txt @@ -45,7 +45,9 @@ AD xApp performs following: a) Read live data from influxDB every 0.5 second b) Detect anomalous records on given input c) Investigate degradation type for anomalous users -* send the ue-id, DU-ID, Degradation type and timestamp for the anomalous records to the Traffic Steering (via rmr with the message type as 30003) +* Listens to RMR port for A1 policy (message type 20011) in a format given below. Which consists throughput threshold parameter (default: 70%) for an degradataion event to qualify for a handover + {'operation': 'CREATE', 'payload': '{\"thp_threshold\":74}', 'policy_instance_id': 'demo-1', 'policy_type_id': '9997'}"} +* Send the ue-id, DU-ID, Degradation type and timestamp for the qualified anomalous records to the Traffic Steering (via rmr with the message type as 30003) * Get the acknowledgement message from the traffic steering * store xApp result in "AD" measurement of influxDB diff --git a/docs/release-notes.rst b/docs/release-notes.rst index 0db4174..2dde1d2 100644 --- a/docs/release-notes.rst +++ b/docs/release-notes.rst @@ -22,6 +22,9 @@ All notable changes to this project will be documented in this file. The format is based on `Keep a Changelog `__ and this project adheres to `Semantic Versioning `__. +[1.0.1] - 2023-06-28 +-------------------- +* Release version 1.0.1 (`RICAPP-215 `_) [1.0.0] - 2022-12-09 -------------------- diff --git a/local.rt b/local.rt index 40ab9b9..e084807 100644 --- a/local.rt +++ b/local.rt @@ -1,4 +1,5 @@ newrt|start rte|30003|service-ricxapp-trafficxapp-rmr:4560 +rte|20011|service-ricplt-a1mediator-rmr.ricplt:4560 newrt|end diff --git a/setup.py b/setup.py index 19625a3..671b008 100644 --- a/setup.py +++ b/setup.py @@ -18,7 +18,7 @@ from setuptools import setup, find_packages setup( name="ad", - version="1.0.0", + version="1.0.1", packages=find_packages(exclude=["tests.*", "tests"]), description="Anomaly Detection xApp that integrates with Traffic Steering", url="https://gerrit.o-ran-sc.org/r/admin/repos/ric-app/ad", diff --git a/src/ad_config.ini b/src/ad_config.ini index 7ce400e..cc7a8c4 100644 --- a/src/ad_config.ini +++ b/src/ad_config.ini @@ -32,3 +32,4 @@ rssinr = RF.serving.RSSINR prb_usage = RRU.PrbUsedDl ue = ue-id anomaly = Viavi.UE.anomalies +a1_param = thp_threshold diff --git a/src/ad_model.py b/src/ad_model.py index 6e14fa5..4bac6e0 100644 --- a/src/ad_model.py +++ b/src/ad_model.py @@ -91,7 +91,7 @@ class CAUSE(object): def __init__(self): self.normal = None - def cause(self, df, db): + def cause(self, df, db, threshold): """ Filter normal data for a particular ue-id to compare with a given sample Compare with normal data to find and return degradaton type """ @@ -99,11 +99,11 @@ class CAUSE(object): sample.index = range(len(sample)) for i in range(len(sample)): if sample.iloc[i]['Anomaly'] == 1: - query = """select * from "{}" where {} = \'{}\' and timenow()-20s""".format(db.meas, db.ue, sample.iloc[i][db.ue]) + query = """select * from {} where "{}" = \'{}\' and timenow()-20s""".format(db.meas, db.ue, sample.iloc[i][db.ue]) normal = db.query(query) if normal: normal = normal[db.meas][[db.thpt, db.rsrp, db.rsrq]] - deg = self.find(sample.loc[i, :], normal.max(), db) + deg = self.find(sample.loc[i, :], normal.max(), db, threshold) if deg: sample.loc[i, 'Degradation'] = deg if 'Throughput' in deg and ('RSRP' in deg or 'RSRQ' in deg): @@ -112,10 +112,10 @@ class CAUSE(object): sample.loc[i, 'Anomaly'] = 0 return sample[['Anomaly', 'Degradation']].values.tolist() - def find(self, row, l, db): + def find(self, row, l, db, threshold): """ store if a particular parameter is below threshold and return """ deg = [] - if row[db.thpt] < l[db.thpt]*0.5: + if row[db.thpt] < l[db.thpt]*(100 - threshold)*0.01: deg.append('Throughput') if row[db.rsrp] < l[db.rsrp]-15: deg.append('RSRP') diff --git a/src/database.py b/src/database.py index 8609a2e..121f3fa 100644 --- a/src/database.py +++ b/src/database.py @@ -139,6 +139,7 @@ class DATABASE(object): self.prb = cfg.get(section, "prb_usage") self.ue = cfg.get(section, "ue") self.anomaly = cfg.get(section, "anomaly") + self.a1_param = cfg.get(section, "a1_param") class DUMMY(DATABASE): diff --git a/src/insert.py b/src/insert.py index 815d42f..d485b7b 100644 --- a/src/insert.py +++ b/src/insert.py @@ -31,8 +31,8 @@ class INSERTDATA(DATABASE): super().__init__() self.config() self.connect() - self.dropdb('RIC-Test') - self.createdb('RIC-Test') +# self.dropdb('RIC-Test') +# self.createdb('RIC-Test') def config(self): cfg = ConfigParser() @@ -67,7 +67,7 @@ class INSERTDATA(DATABASE): d = df[df['measTimeStampRf'] == timestamp] d.index = pd.date_range(start=datetime.datetime.now(), freq='1ms', periods=len(d)) self.client.write_points(d, self.meas) - time.sleep(0.4) + time.sleep(0.7) def populatedb(): diff --git a/src/main.py b/src/main.py index f2a5baf..45542c8 100644 --- a/src/main.py +++ b/src/main.py @@ -19,7 +19,7 @@ import os import time import pandas as pd import schedule -from ricxappframe.xapp_frame import Xapp +from ricxappframe.xapp_frame import Xapp, rmr from ricxappframe.xapp_sdl import SDLWrapper from mdclogpy import Logger from ad_model import modelling, CAUSE @@ -28,6 +28,7 @@ from database import DATABASE, DUMMY db = None cp = None +threshold = None sdl = SDLWrapper(use_fake_sdl=True) logger = Logger(name=__name__) @@ -48,8 +49,11 @@ def entry(self): def load_model(): global md global cp + global threshold md = modelling() cp = CAUSE() + threshold = 70 + logger.info("throughput threshold parameter is set as {}% (default)".format(threshold)) def train_model(): @@ -96,7 +100,7 @@ def predict_anomaly(self, df): df.loc[:, 'Degradation'] = '' val = None if 1 in df.Anomaly.unique(): - df.loc[:, ['Anomaly', 'Degradation']] = cp.cause(df, db) + df.loc[:, ['Anomaly', 'Degradation']] = cp.cause(df, db, threshold) df_a = df.loc[df['Anomaly'] == 1].copy() if len(df_a) > 0: df_a['time'] = df_a.index @@ -116,9 +120,13 @@ def msg_to_ts(self, val): success = self.rmr_send(val, 30003) if success: logger.info(" Message to TS: message sent Successfully") - # rmr receive to get the acknowledgement message from the traffic steering. - for (summary, sbuf) in self.rmr_get_messages(): - logger.info("Received acknowldgement from TS (TS_ANOMALY_ACK): {}".format(summary)) + # rmr receive to get the acknowledgement message from the traffic steering. + + for summary, sbuf in self.rmr_get_messages(): + if sbuf.contents.mtype == 30004: + logger.info("Received acknowldgement from TS (TS_ANOMALY_ACK): {}".format(summary)) + if sbuf.contents.mtype == 20010: + a1_request_handler(self, summary, sbuf) self.rmr_free(sbuf) @@ -134,8 +142,51 @@ def connectdb(thread=False): success = db.connect() +def a1_request_handler(self, summary, sbuf): + logger.info("A1 policy received") + try: + req = json.loads(summary[rmr.RMR_MS_PAYLOAD]) # input should be a json encoded as bytes + logger.debug("A1PolicyHandler.resp_handler:: Handler processing request") + except (json.decoder.JSONDecodeError, KeyError): + logger.error("A1PolicyManager.resp_handler:: Handler failed to parse request") + return + + if verifyPolicy(req): + logger.info("A1PolicyHandler.resp_handler:: Handler processed request: {}".format(req)) + else: + logger.error("A1PolicyHandler.resp_handler:: Request verification failed: {}".format(req)) + logger.debug("A1PolicyHandler.resp_handler:: Request verification success: {}".format(req)) + change_threshold(self, req) + resp = buildPolicyResp(self, req) + self.rmr_send(json.dumps(resp).encode(), 20011) + logger.info("A1PolicyHandler.resp_handler:: Response sent: {}".format(resp)) + self.rmr_free(sbuf) + + +def change_threshold(self, req: dict): + if req["operation"] == "CREATE": + payload = req["payload"] + threshold = json.loads(payload)[db.a1_param] + logger.info("throughput threshold parameter updated to: {}% ".format(threshold)) + + +def verifyPolicy(req: dict): + for i in ["policy_type_id", "operation", "policy_instance_id"]: + if i not in req: + return False + return True + + +def buildPolicyResp(self, req: dict): + req["handler_id"] = "ad" + del req["operation"] + del req["payload"] + req["status"] = "OK" + return req + + def start(thread=False): # Initiates xapp api and runs the entry() using xapp.run() xapp = Xapp(entrypoint=entry, rmr_port=4560, use_fake_sdl=False) - xapp.logger.debug("AD xApp starting") + logger.debug("AD xApp starting") xapp.run() diff --git a/tox.ini b/tox.ini index 09d34a7..94563ee 100644 --- a/tox.ini +++ b/tox.ini @@ -62,6 +62,8 @@ deps = sphinxcontrib-httpdomain recommonmark lfdocs-conf + urllib3~=1.26.15 +allowlist_externals = echo commands = sphinx-build -W -b html -n -d {envtmpdir}/doctrees ./docs/ {toxinidir}/docs/_build/html echo "Generated docs available in {toxinidir}/docs/_build/html" @@ -74,4 +76,5 @@ deps = sphinx sphinxcontrib-httpdomain recommonmark lfdocs-conf + urllib3~=1.26.15 commands = sphinx-build -W -b linkcheck -d {envtmpdir}/doctrees ./docs/ {toxinidir}/docs/_build/linkcheck diff --git a/xapp-descriptor/config.json b/xapp-descriptor/config.json index 86f6ea6..13f333e 100644 --- a/xapp-descriptor/config.json +++ b/xapp-descriptor/config.json @@ -1,6 +1,6 @@ { "xapp_name": "ad", - "version": "1.0.0", + "version": "1.0.1", "containers": [ { "name": "ad", -- 2.16.6