Release 1.0.1 55/11455/2 h-release
authorDeepanshu Karnwal <deepanshu.k@hcl.com>
Thu, 29 Jun 2023 16:40:46 +0000 (22:10 +0530)
committerDeepanshu Karnwal <deepanshu.k@hcl.com>
Thu, 29 Jun 2023 17:34:18 +0000 (17:34 +0000)
Change-Id: Ie8fc46d2fd8635922bde841809c9c85ac75fccdf
Signed-off-by: Deepanshu Karnwal <deepanshu.k@hcl.com>
README.txt
docs/release-notes.rst
local.rt
setup.py
src/ad_config.ini
src/ad_model.py
src/database.py
src/insert.py
src/main.py
tox.ini
xapp-descriptor/config.json

index cffd078..624ded5 100644 (file)
@@ -45,7 +45,9 @@ AD xApp performs following:
    a) Read live data from influxDB every 0.5 second
    b) Detect anomalous records on given input
    c) Investigate degradation type for anomalous users
-* send the ue-id, DU-ID, Degradation type and timestamp for the anomalous records to the Traffic Steering (via rmr with the message type as 30003)
+* Listens to RMR port for A1 policy (message type 20011) in a format given below. Which consists throughput threshold parameter (default: 70%) for an degradataion event to qualify for a handover
+   {'operation': 'CREATE', 'payload': '{\"thp_threshold\":74}', 'policy_instance_id': 'demo-1', 'policy_type_id': '9997'}"}
+* Send the ue-id, DU-ID, Degradation type and timestamp for the qualified anomalous records to the Traffic Steering (via rmr with the message type as 30003)
 * Get the acknowledgement message from the traffic steering 
 * store xApp result in "AD" measurement of influxDB
 
index 0db4174..2dde1d2 100644 (file)
@@ -22,6 +22,9 @@ All notable changes to this project will be documented in this file.
 The format is based on `Keep a Changelog <http://keepachangelog.com/>`__
 and this project adheres to `Semantic Versioning <http://semver.org/>`__.
 
+[1.0.1] - 2023-06-28
+--------------------
+* Release version 1.0.1 (`RICAPP-215 <https://jira.o-ran-sc.org/browse/RICAPP-215>`_)
 
 [1.0.0] - 2022-12-09
 --------------------
index 40ab9b9..e084807 100644 (file)
--- a/local.rt
+++ b/local.rt
@@ -1,4 +1,5 @@
 newrt|start
 rte|30003|service-ricxapp-trafficxapp-rmr:4560
+rte|20011|service-ricplt-a1mediator-rmr.ricplt:4560
 newrt|end
 
index 19625a3..671b008 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -18,7 +18,7 @@ from setuptools import setup, find_packages
 
 setup(
     name="ad",
-    version="1.0.0",
+    version="1.0.1",
     packages=find_packages(exclude=["tests.*", "tests"]),
     description="Anomaly Detection xApp that integrates with Traffic Steering",
     url="https://gerrit.o-ran-sc.org/r/admin/repos/ric-app/ad",
index 7ce400e..cc7a8c4 100644 (file)
@@ -32,3 +32,4 @@ rssinr  = RF.serving.RSSINR
 prb_usage = RRU.PrbUsedDl
 ue = ue-id
 anomaly = Viavi.UE.anomalies
+a1_param = thp_threshold
index 6e14fa5..4bac6e0 100644 (file)
@@ -91,7 +91,7 @@ class CAUSE(object):
     def __init__(self):
         self.normal = None
 
-    def cause(self, df, db):
+    def cause(self, df, db, threshold):
         """ Filter normal data for a particular ue-id to compare with a given sample
             Compare with normal data to find and return degradaton type
         """
@@ -99,11 +99,11 @@ class CAUSE(object):
         sample.index = range(len(sample))
         for i in range(len(sample)):
             if sample.iloc[i]['Anomaly'] == 1:
-                query = """select * from "{}" where {} = \'{}\' and time<now() and time>now()-20s""".format(db.meas, db.ue, sample.iloc[i][db.ue])
+                query = """select * from {} where "{}" = \'{}\' and time<now() and time>now()-20s""".format(db.meas, db.ue, sample.iloc[i][db.ue])
                 normal = db.query(query)
                 if normal:
                     normal = normal[db.meas][[db.thpt, db.rsrp, db.rsrq]]
-                    deg = self.find(sample.loc[i, :], normal.max(), db)
+                    deg = self.find(sample.loc[i, :], normal.max(), db, threshold)
                     if deg:
                         sample.loc[i, 'Degradation'] = deg
                         if 'Throughput' in deg and ('RSRP' in deg or 'RSRQ' in deg):
@@ -112,10 +112,10 @@ class CAUSE(object):
                         sample.loc[i, 'Anomaly'] = 0
         return sample[['Anomaly', 'Degradation']].values.tolist()
 
-    def find(self, row, l, db):
+    def find(self, row, l, db, threshold):
         """ store if a particular parameter is below threshold and return """
         deg = []
-        if row[db.thpt] < l[db.thpt]*0.5:
+        if row[db.thpt] < l[db.thpt]*(100 - threshold)*0.01:
             deg.append('Throughput')
         if row[db.rsrp] < l[db.rsrp]-15:
             deg.append('RSRP')
index 8609a2e..121f3fa 100644 (file)
@@ -139,6 +139,7 @@ class DATABASE(object):
                 self.prb = cfg.get(section, "prb_usage")
                 self.ue = cfg.get(section, "ue")
                 self.anomaly = cfg.get(section, "anomaly")
+                self.a1_param = cfg.get(section, "a1_param")
 
 
 class DUMMY(DATABASE):
index 815d42f..d485b7b 100644 (file)
@@ -31,8 +31,8 @@ class INSERTDATA(DATABASE):
         super().__init__()
         self.config()
         self.connect()
-        self.dropdb('RIC-Test')
-        self.createdb('RIC-Test')
+#        self.dropdb('RIC-Test')
+#        self.createdb('RIC-Test')
 
     def config(self):
         cfg = ConfigParser()
@@ -67,7 +67,7 @@ class INSERTDATA(DATABASE):
             d = df[df['measTimeStampRf'] == timestamp]
             d.index = pd.date_range(start=datetime.datetime.now(), freq='1ms', periods=len(d))
             self.client.write_points(d, self.meas)
-            time.sleep(0.4)
+            time.sleep(0.7)
 
 
 def populatedb():
index f2a5baf..45542c8 100644 (file)
@@ -19,7 +19,7 @@ import os
 import time
 import pandas as pd
 import schedule
-from ricxappframe.xapp_frame import Xapp
+from ricxappframe.xapp_frame import Xapp, rmr
 from ricxappframe.xapp_sdl import SDLWrapper
 from mdclogpy import Logger
 from ad_model import modelling, CAUSE
@@ -28,6 +28,7 @@ from database import DATABASE, DUMMY
 
 db = None
 cp = None
+threshold = None
 sdl = SDLWrapper(use_fake_sdl=True)
 
 logger = Logger(name=__name__)
@@ -48,8 +49,11 @@ def entry(self):
 def load_model():
     global md
     global cp
+    global threshold
     md = modelling()
     cp = CAUSE()
+    threshold = 70
+    logger.info("throughput threshold parameter is set as {}% (default)".format(threshold))
 
 
 def train_model():
@@ -96,7 +100,7 @@ def predict_anomaly(self, df):
     df.loc[:, 'Degradation'] = ''
     val = None
     if 1 in df.Anomaly.unique():
-        df.loc[:, ['Anomaly', 'Degradation']] = cp.cause(df, db)
+        df.loc[:, ['Anomaly', 'Degradation']] = cp.cause(df, db, threshold)
         df_a = df.loc[df['Anomaly'] == 1].copy()
         if len(df_a) > 0:
             df_a['time'] = df_a.index
@@ -116,9 +120,13 @@ def msg_to_ts(self, val):
     success = self.rmr_send(val, 30003)
     if success:
         logger.info(" Message to TS: message sent Successfully")
-    # rmr receive to get the acknowledgement message from the traffic steering.
-    for (summary, sbuf) in self.rmr_get_messages():
-        logger.info("Received acknowldgement from TS (TS_ANOMALY_ACK): {}".format(summary))
+        # rmr receive to get the acknowledgement message from the traffic steering.
+
+    for summary, sbuf in self.rmr_get_messages():
+        if sbuf.contents.mtype == 30004:
+            logger.info("Received acknowldgement from TS (TS_ANOMALY_ACK): {}".format(summary))
+        if sbuf.contents.mtype == 20010:
+            a1_request_handler(self, summary, sbuf)
         self.rmr_free(sbuf)
 
 
@@ -134,8 +142,51 @@ def connectdb(thread=False):
         success = db.connect()
 
 
+def a1_request_handler(self, summary, sbuf):
+    logger.info("A1 policy received")
+    try:
+        req = json.loads(summary[rmr.RMR_MS_PAYLOAD])  # input should be a json encoded as bytes
+        logger.debug("A1PolicyHandler.resp_handler:: Handler processing request")
+    except (json.decoder.JSONDecodeError, KeyError):
+        logger.error("A1PolicyManager.resp_handler:: Handler failed to parse request")
+        return
+
+    if verifyPolicy(req):
+        logger.info("A1PolicyHandler.resp_handler:: Handler processed request: {}".format(req))
+    else:
+        logger.error("A1PolicyHandler.resp_handler:: Request verification failed: {}".format(req))
+    logger.debug("A1PolicyHandler.resp_handler:: Request verification success: {}".format(req))
+    change_threshold(self, req)
+    resp = buildPolicyResp(self, req)
+    self.rmr_send(json.dumps(resp).encode(), 20011)
+    logger.info("A1PolicyHandler.resp_handler:: Response sent: {}".format(resp))
+    self.rmr_free(sbuf)
+
+
+def change_threshold(self, req: dict):
+    if req["operation"] == "CREATE":
+        payload = req["payload"]
+        threshold = json.loads(payload)[db.a1_param]
+        logger.info("throughput threshold parameter updated to: {}% ".format(threshold))
+
+
+def verifyPolicy(req: dict):
+    for i in ["policy_type_id", "operation", "policy_instance_id"]:
+        if i not in req:
+            return False
+    return True
+
+
+def buildPolicyResp(self, req: dict):
+    req["handler_id"] = "ad"
+    del req["operation"]
+    del req["payload"]
+    req["status"] = "OK"
+    return req
+
+
 def start(thread=False):
     # Initiates xapp api and runs the entry() using xapp.run()
     xapp = Xapp(entrypoint=entry, rmr_port=4560, use_fake_sdl=False)
-    xapp.logger.debug("AD xApp starting")
+    logger.debug("AD xApp starting")
     xapp.run()
diff --git a/tox.ini b/tox.ini
index 09d34a7..94563ee 100644 (file)
--- a/tox.ini
+++ b/tox.ini
@@ -62,6 +62,8 @@ deps =
     sphinxcontrib-httpdomain
     recommonmark
     lfdocs-conf
+    urllib3~=1.26.15
+allowlist_externals = echo
 commands =
     sphinx-build -W -b html -n -d {envtmpdir}/doctrees ./docs/ {toxinidir}/docs/_build/html
     echo "Generated docs available in {toxinidir}/docs/_build/html"
@@ -74,4 +76,5 @@ deps = sphinx
        sphinxcontrib-httpdomain
        recommonmark
        lfdocs-conf
+       urllib3~=1.26.15
 commands = sphinx-build -W -b linkcheck -d {envtmpdir}/doctrees ./docs/ {toxinidir}/docs/_build/linkcheck
index 86f6ea6..13f333e 100644 (file)
@@ -1,6 +1,6 @@
 {
         "xapp_name": "ad",
-        "version": "1.0.0",
+        "version": "1.0.1",
         "containers": [
             {
                 "name": "ad",