Release 1.0.1 55/11455/2 h-release
authorDeepanshu Karnwal <deepanshu.k@hcl.com>
Thu, 29 Jun 2023 16:40:46 +0000 (22:10 +0530)
committerDeepanshu Karnwal <deepanshu.k@hcl.com>
Thu, 29 Jun 2023 17:34:18 +0000 (17:34 +0000)
Change-Id: Ie8fc46d2fd8635922bde841809c9c85ac75fccdf
Signed-off-by: Deepanshu Karnwal <deepanshu.k@hcl.com>
README.txt
docs/release-notes.rst
local.rt
setup.py
src/ad_config.ini
src/ad_model.py
src/database.py
src/insert.py
src/main.py
tox.ini
xapp-descriptor/config.json

index cffd078..624ded5 100644 (file)
@@ -45,7 +45,9 @@ AD xApp performs following:
    a) Read live data from influxDB every 0.5 second
    b) Detect anomalous records on given input
    c) Investigate degradation type for anomalous users
    a) Read live data from influxDB every 0.5 second
    b) Detect anomalous records on given input
    c) Investigate degradation type for anomalous users
-* send the ue-id, DU-ID, Degradation type and timestamp for the anomalous records to the Traffic Steering (via rmr with the message type as 30003)
+* Listens to RMR port for A1 policy (message type 20011) in a format given below. Which consists throughput threshold parameter (default: 70%) for an degradataion event to qualify for a handover
+   {'operation': 'CREATE', 'payload': '{\"thp_threshold\":74}', 'policy_instance_id': 'demo-1', 'policy_type_id': '9997'}"}
+* Send the ue-id, DU-ID, Degradation type and timestamp for the qualified anomalous records to the Traffic Steering (via rmr with the message type as 30003)
 * Get the acknowledgement message from the traffic steering 
 * store xApp result in "AD" measurement of influxDB
 
 * Get the acknowledgement message from the traffic steering 
 * store xApp result in "AD" measurement of influxDB
 
index 0db4174..2dde1d2 100644 (file)
@@ -22,6 +22,9 @@ All notable changes to this project will be documented in this file.
 The format is based on `Keep a Changelog <http://keepachangelog.com/>`__
 and this project adheres to `Semantic Versioning <http://semver.org/>`__.
 
 The format is based on `Keep a Changelog <http://keepachangelog.com/>`__
 and this project adheres to `Semantic Versioning <http://semver.org/>`__.
 
+[1.0.1] - 2023-06-28
+--------------------
+* Release version 1.0.1 (`RICAPP-215 <https://jira.o-ran-sc.org/browse/RICAPP-215>`_)
 
 [1.0.0] - 2022-12-09
 --------------------
 
 [1.0.0] - 2022-12-09
 --------------------
index 40ab9b9..e084807 100644 (file)
--- a/local.rt
+++ b/local.rt
@@ -1,4 +1,5 @@
 newrt|start
 rte|30003|service-ricxapp-trafficxapp-rmr:4560
 newrt|start
 rte|30003|service-ricxapp-trafficxapp-rmr:4560
+rte|20011|service-ricplt-a1mediator-rmr.ricplt:4560
 newrt|end
 
 newrt|end
 
index 19625a3..671b008 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -18,7 +18,7 @@ from setuptools import setup, find_packages
 
 setup(
     name="ad",
 
 setup(
     name="ad",
-    version="1.0.0",
+    version="1.0.1",
     packages=find_packages(exclude=["tests.*", "tests"]),
     description="Anomaly Detection xApp that integrates with Traffic Steering",
     url="https://gerrit.o-ran-sc.org/r/admin/repos/ric-app/ad",
     packages=find_packages(exclude=["tests.*", "tests"]),
     description="Anomaly Detection xApp that integrates with Traffic Steering",
     url="https://gerrit.o-ran-sc.org/r/admin/repos/ric-app/ad",
index 7ce400e..cc7a8c4 100644 (file)
@@ -32,3 +32,4 @@ rssinr  = RF.serving.RSSINR
 prb_usage = RRU.PrbUsedDl
 ue = ue-id
 anomaly = Viavi.UE.anomalies
 prb_usage = RRU.PrbUsedDl
 ue = ue-id
 anomaly = Viavi.UE.anomalies
+a1_param = thp_threshold
index 6e14fa5..4bac6e0 100644 (file)
@@ -91,7 +91,7 @@ class CAUSE(object):
     def __init__(self):
         self.normal = None
 
     def __init__(self):
         self.normal = None
 
-    def cause(self, df, db):
+    def cause(self, df, db, threshold):
         """ Filter normal data for a particular ue-id to compare with a given sample
             Compare with normal data to find and return degradaton type
         """
         """ Filter normal data for a particular ue-id to compare with a given sample
             Compare with normal data to find and return degradaton type
         """
@@ -99,11 +99,11 @@ class CAUSE(object):
         sample.index = range(len(sample))
         for i in range(len(sample)):
             if sample.iloc[i]['Anomaly'] == 1:
         sample.index = range(len(sample))
         for i in range(len(sample)):
             if sample.iloc[i]['Anomaly'] == 1:
-                query = """select * from "{}" where {} = \'{}\' and time<now() and time>now()-20s""".format(db.meas, db.ue, sample.iloc[i][db.ue])
+                query = """select * from {} where "{}" = \'{}\' and time<now() and time>now()-20s""".format(db.meas, db.ue, sample.iloc[i][db.ue])
                 normal = db.query(query)
                 if normal:
                     normal = normal[db.meas][[db.thpt, db.rsrp, db.rsrq]]
                 normal = db.query(query)
                 if normal:
                     normal = normal[db.meas][[db.thpt, db.rsrp, db.rsrq]]
-                    deg = self.find(sample.loc[i, :], normal.max(), db)
+                    deg = self.find(sample.loc[i, :], normal.max(), db, threshold)
                     if deg:
                         sample.loc[i, 'Degradation'] = deg
                         if 'Throughput' in deg and ('RSRP' in deg or 'RSRQ' in deg):
                     if deg:
                         sample.loc[i, 'Degradation'] = deg
                         if 'Throughput' in deg and ('RSRP' in deg or 'RSRQ' in deg):
@@ -112,10 +112,10 @@ class CAUSE(object):
                         sample.loc[i, 'Anomaly'] = 0
         return sample[['Anomaly', 'Degradation']].values.tolist()
 
                         sample.loc[i, 'Anomaly'] = 0
         return sample[['Anomaly', 'Degradation']].values.tolist()
 
-    def find(self, row, l, db):
+    def find(self, row, l, db, threshold):
         """ store if a particular parameter is below threshold and return """
         deg = []
         """ store if a particular parameter is below threshold and return """
         deg = []
-        if row[db.thpt] < l[db.thpt]*0.5:
+        if row[db.thpt] < l[db.thpt]*(100 - threshold)*0.01:
             deg.append('Throughput')
         if row[db.rsrp] < l[db.rsrp]-15:
             deg.append('RSRP')
             deg.append('Throughput')
         if row[db.rsrp] < l[db.rsrp]-15:
             deg.append('RSRP')
index 8609a2e..121f3fa 100644 (file)
@@ -139,6 +139,7 @@ class DATABASE(object):
                 self.prb = cfg.get(section, "prb_usage")
                 self.ue = cfg.get(section, "ue")
                 self.anomaly = cfg.get(section, "anomaly")
                 self.prb = cfg.get(section, "prb_usage")
                 self.ue = cfg.get(section, "ue")
                 self.anomaly = cfg.get(section, "anomaly")
+                self.a1_param = cfg.get(section, "a1_param")
 
 
 class DUMMY(DATABASE):
 
 
 class DUMMY(DATABASE):
index 815d42f..d485b7b 100644 (file)
@@ -31,8 +31,8 @@ class INSERTDATA(DATABASE):
         super().__init__()
         self.config()
         self.connect()
         super().__init__()
         self.config()
         self.connect()
-        self.dropdb('RIC-Test')
-        self.createdb('RIC-Test')
+#        self.dropdb('RIC-Test')
+#        self.createdb('RIC-Test')
 
     def config(self):
         cfg = ConfigParser()
 
     def config(self):
         cfg = ConfigParser()
@@ -67,7 +67,7 @@ class INSERTDATA(DATABASE):
             d = df[df['measTimeStampRf'] == timestamp]
             d.index = pd.date_range(start=datetime.datetime.now(), freq='1ms', periods=len(d))
             self.client.write_points(d, self.meas)
             d = df[df['measTimeStampRf'] == timestamp]
             d.index = pd.date_range(start=datetime.datetime.now(), freq='1ms', periods=len(d))
             self.client.write_points(d, self.meas)
-            time.sleep(0.4)
+            time.sleep(0.7)
 
 
 def populatedb():
 
 
 def populatedb():
index f2a5baf..45542c8 100644 (file)
@@ -19,7 +19,7 @@ import os
 import time
 import pandas as pd
 import schedule
 import time
 import pandas as pd
 import schedule
-from ricxappframe.xapp_frame import Xapp
+from ricxappframe.xapp_frame import Xapp, rmr
 from ricxappframe.xapp_sdl import SDLWrapper
 from mdclogpy import Logger
 from ad_model import modelling, CAUSE
 from ricxappframe.xapp_sdl import SDLWrapper
 from mdclogpy import Logger
 from ad_model import modelling, CAUSE
@@ -28,6 +28,7 @@ from database import DATABASE, DUMMY
 
 db = None
 cp = None
 
 db = None
 cp = None
+threshold = None
 sdl = SDLWrapper(use_fake_sdl=True)
 
 logger = Logger(name=__name__)
 sdl = SDLWrapper(use_fake_sdl=True)
 
 logger = Logger(name=__name__)
@@ -48,8 +49,11 @@ def entry(self):
 def load_model():
     global md
     global cp
 def load_model():
     global md
     global cp
+    global threshold
     md = modelling()
     cp = CAUSE()
     md = modelling()
     cp = CAUSE()
+    threshold = 70
+    logger.info("throughput threshold parameter is set as {}% (default)".format(threshold))
 
 
 def train_model():
 
 
 def train_model():
@@ -96,7 +100,7 @@ def predict_anomaly(self, df):
     df.loc[:, 'Degradation'] = ''
     val = None
     if 1 in df.Anomaly.unique():
     df.loc[:, 'Degradation'] = ''
     val = None
     if 1 in df.Anomaly.unique():
-        df.loc[:, ['Anomaly', 'Degradation']] = cp.cause(df, db)
+        df.loc[:, ['Anomaly', 'Degradation']] = cp.cause(df, db, threshold)
         df_a = df.loc[df['Anomaly'] == 1].copy()
         if len(df_a) > 0:
             df_a['time'] = df_a.index
         df_a = df.loc[df['Anomaly'] == 1].copy()
         if len(df_a) > 0:
             df_a['time'] = df_a.index
@@ -116,9 +120,13 @@ def msg_to_ts(self, val):
     success = self.rmr_send(val, 30003)
     if success:
         logger.info(" Message to TS: message sent Successfully")
     success = self.rmr_send(val, 30003)
     if success:
         logger.info(" Message to TS: message sent Successfully")
-    # rmr receive to get the acknowledgement message from the traffic steering.
-    for (summary, sbuf) in self.rmr_get_messages():
-        logger.info("Received acknowldgement from TS (TS_ANOMALY_ACK): {}".format(summary))
+        # rmr receive to get the acknowledgement message from the traffic steering.
+
+    for summary, sbuf in self.rmr_get_messages():
+        if sbuf.contents.mtype == 30004:
+            logger.info("Received acknowldgement from TS (TS_ANOMALY_ACK): {}".format(summary))
+        if sbuf.contents.mtype == 20010:
+            a1_request_handler(self, summary, sbuf)
         self.rmr_free(sbuf)
 
 
         self.rmr_free(sbuf)
 
 
@@ -134,8 +142,51 @@ def connectdb(thread=False):
         success = db.connect()
 
 
         success = db.connect()
 
 
+def a1_request_handler(self, summary, sbuf):
+    logger.info("A1 policy received")
+    try:
+        req = json.loads(summary[rmr.RMR_MS_PAYLOAD])  # input should be a json encoded as bytes
+        logger.debug("A1PolicyHandler.resp_handler:: Handler processing request")
+    except (json.decoder.JSONDecodeError, KeyError):
+        logger.error("A1PolicyManager.resp_handler:: Handler failed to parse request")
+        return
+
+    if verifyPolicy(req):
+        logger.info("A1PolicyHandler.resp_handler:: Handler processed request: {}".format(req))
+    else:
+        logger.error("A1PolicyHandler.resp_handler:: Request verification failed: {}".format(req))
+    logger.debug("A1PolicyHandler.resp_handler:: Request verification success: {}".format(req))
+    change_threshold(self, req)
+    resp = buildPolicyResp(self, req)
+    self.rmr_send(json.dumps(resp).encode(), 20011)
+    logger.info("A1PolicyHandler.resp_handler:: Response sent: {}".format(resp))
+    self.rmr_free(sbuf)
+
+
+def change_threshold(self, req: dict):
+    if req["operation"] == "CREATE":
+        payload = req["payload"]
+        threshold = json.loads(payload)[db.a1_param]
+        logger.info("throughput threshold parameter updated to: {}% ".format(threshold))
+
+
+def verifyPolicy(req: dict):
+    for i in ["policy_type_id", "operation", "policy_instance_id"]:
+        if i not in req:
+            return False
+    return True
+
+
+def buildPolicyResp(self, req: dict):
+    req["handler_id"] = "ad"
+    del req["operation"]
+    del req["payload"]
+    req["status"] = "OK"
+    return req
+
+
 def start(thread=False):
     # Initiates xapp api and runs the entry() using xapp.run()
     xapp = Xapp(entrypoint=entry, rmr_port=4560, use_fake_sdl=False)
 def start(thread=False):
     # Initiates xapp api and runs the entry() using xapp.run()
     xapp = Xapp(entrypoint=entry, rmr_port=4560, use_fake_sdl=False)
-    xapp.logger.debug("AD xApp starting")
+    logger.debug("AD xApp starting")
     xapp.run()
     xapp.run()
diff --git a/tox.ini b/tox.ini
index 09d34a7..94563ee 100644 (file)
--- a/tox.ini
+++ b/tox.ini
@@ -62,6 +62,8 @@ deps =
     sphinxcontrib-httpdomain
     recommonmark
     lfdocs-conf
     sphinxcontrib-httpdomain
     recommonmark
     lfdocs-conf
+    urllib3~=1.26.15
+allowlist_externals = echo
 commands =
     sphinx-build -W -b html -n -d {envtmpdir}/doctrees ./docs/ {toxinidir}/docs/_build/html
     echo "Generated docs available in {toxinidir}/docs/_build/html"
 commands =
     sphinx-build -W -b html -n -d {envtmpdir}/doctrees ./docs/ {toxinidir}/docs/_build/html
     echo "Generated docs available in {toxinidir}/docs/_build/html"
@@ -74,4 +76,5 @@ deps = sphinx
        sphinxcontrib-httpdomain
        recommonmark
        lfdocs-conf
        sphinxcontrib-httpdomain
        recommonmark
        lfdocs-conf
+       urllib3~=1.26.15
 commands = sphinx-build -W -b linkcheck -d {envtmpdir}/doctrees ./docs/ {toxinidir}/docs/_build/linkcheck
 commands = sphinx-build -W -b linkcheck -d {envtmpdir}/doctrees ./docs/ {toxinidir}/docs/_build/linkcheck
index 86f6ea6..13f333e 100644 (file)
@@ -1,6 +1,6 @@
 {
         "xapp_name": "ad",
 {
         "xapp_name": "ad",
-        "version": "1.0.0",
+        "version": "1.0.1",
         "containers": [
             {
                 "name": "ad",
         "containers": [
             {
                 "name": "ad",