a) Read live data from influxDB every 0.5 second
b) Detect anomalous records on given input
c) Investigate degradation type for anomalous users
-* send the ue-id, DU-ID, Degradation type and timestamp for the anomalous records to the Traffic Steering (via rmr with the message type as 30003)
+* Listens to RMR port for A1 policy (message type 20011) in a format given below. Which consists throughput threshold parameter (default: 70%) for an degradataion event to qualify for a handover
+ {'operation': 'CREATE', 'payload': '{\"thp_threshold\":74}', 'policy_instance_id': 'demo-1', 'policy_type_id': '9997'}"}
+* Send the ue-id, DU-ID, Degradation type and timestamp for the qualified anomalous records to the Traffic Steering (via rmr with the message type as 30003)
* Get the acknowledgement message from the traffic steering
* store xApp result in "AD" measurement of influxDB
The format is based on `Keep a Changelog <http://keepachangelog.com/>`__
and this project adheres to `Semantic Versioning <http://semver.org/>`__.
+[1.0.1] - 2023-06-28
+--------------------
+* Release version 1.0.1 (`RICAPP-215 <https://jira.o-ran-sc.org/browse/RICAPP-215>`_)
[1.0.0] - 2022-12-09
--------------------
newrt|start
rte|30003|service-ricxapp-trafficxapp-rmr:4560
+rte|20011|service-ricplt-a1mediator-rmr.ricplt:4560
newrt|end
setup(
name="ad",
- version="1.0.0",
+ version="1.0.1",
packages=find_packages(exclude=["tests.*", "tests"]),
description="Anomaly Detection xApp that integrates with Traffic Steering",
url="https://gerrit.o-ran-sc.org/r/admin/repos/ric-app/ad",
prb_usage = RRU.PrbUsedDl
ue = ue-id
anomaly = Viavi.UE.anomalies
+a1_param = thp_threshold
def __init__(self):
self.normal = None
- def cause(self, df, db):
+ def cause(self, df, db, threshold):
""" Filter normal data for a particular ue-id to compare with a given sample
Compare with normal data to find and return degradaton type
"""
sample.index = range(len(sample))
for i in range(len(sample)):
if sample.iloc[i]['Anomaly'] == 1:
- query = """select * from "{}" where {} = \'{}\' and time<now() and time>now()-20s""".format(db.meas, db.ue, sample.iloc[i][db.ue])
+ query = """select * from {} where "{}" = \'{}\' and time<now() and time>now()-20s""".format(db.meas, db.ue, sample.iloc[i][db.ue])
normal = db.query(query)
if normal:
normal = normal[db.meas][[db.thpt, db.rsrp, db.rsrq]]
- deg = self.find(sample.loc[i, :], normal.max(), db)
+ deg = self.find(sample.loc[i, :], normal.max(), db, threshold)
if deg:
sample.loc[i, 'Degradation'] = deg
if 'Throughput' in deg and ('RSRP' in deg or 'RSRQ' in deg):
sample.loc[i, 'Anomaly'] = 0
return sample[['Anomaly', 'Degradation']].values.tolist()
- def find(self, row, l, db):
+ def find(self, row, l, db, threshold):
""" store if a particular parameter is below threshold and return """
deg = []
- if row[db.thpt] < l[db.thpt]*0.5:
+ if row[db.thpt] < l[db.thpt]*(100 - threshold)*0.01:
deg.append('Throughput')
if row[db.rsrp] < l[db.rsrp]-15:
deg.append('RSRP')
self.prb = cfg.get(section, "prb_usage")
self.ue = cfg.get(section, "ue")
self.anomaly = cfg.get(section, "anomaly")
+ self.a1_param = cfg.get(section, "a1_param")
class DUMMY(DATABASE):
super().__init__()
self.config()
self.connect()
- self.dropdb('RIC-Test')
- self.createdb('RIC-Test')
+# self.dropdb('RIC-Test')
+# self.createdb('RIC-Test')
def config(self):
cfg = ConfigParser()
d = df[df['measTimeStampRf'] == timestamp]
d.index = pd.date_range(start=datetime.datetime.now(), freq='1ms', periods=len(d))
self.client.write_points(d, self.meas)
- time.sleep(0.4)
+ time.sleep(0.7)
def populatedb():
import time
import pandas as pd
import schedule
-from ricxappframe.xapp_frame import Xapp
+from ricxappframe.xapp_frame import Xapp, rmr
from ricxappframe.xapp_sdl import SDLWrapper
from mdclogpy import Logger
from ad_model import modelling, CAUSE
db = None
cp = None
+threshold = None
sdl = SDLWrapper(use_fake_sdl=True)
logger = Logger(name=__name__)
def load_model():
global md
global cp
+ global threshold
md = modelling()
cp = CAUSE()
+ threshold = 70
+ logger.info("throughput threshold parameter is set as {}% (default)".format(threshold))
def train_model():
df.loc[:, 'Degradation'] = ''
val = None
if 1 in df.Anomaly.unique():
- df.loc[:, ['Anomaly', 'Degradation']] = cp.cause(df, db)
+ df.loc[:, ['Anomaly', 'Degradation']] = cp.cause(df, db, threshold)
df_a = df.loc[df['Anomaly'] == 1].copy()
if len(df_a) > 0:
df_a['time'] = df_a.index
success = self.rmr_send(val, 30003)
if success:
logger.info(" Message to TS: message sent Successfully")
- # rmr receive to get the acknowledgement message from the traffic steering.
- for (summary, sbuf) in self.rmr_get_messages():
- logger.info("Received acknowldgement from TS (TS_ANOMALY_ACK): {}".format(summary))
+ # rmr receive to get the acknowledgement message from the traffic steering.
+
+ for summary, sbuf in self.rmr_get_messages():
+ if sbuf.contents.mtype == 30004:
+ logger.info("Received acknowldgement from TS (TS_ANOMALY_ACK): {}".format(summary))
+ if sbuf.contents.mtype == 20010:
+ a1_request_handler(self, summary, sbuf)
self.rmr_free(sbuf)
success = db.connect()
+def a1_request_handler(self, summary, sbuf):
+ logger.info("A1 policy received")
+ try:
+ req = json.loads(summary[rmr.RMR_MS_PAYLOAD]) # input should be a json encoded as bytes
+ logger.debug("A1PolicyHandler.resp_handler:: Handler processing request")
+ except (json.decoder.JSONDecodeError, KeyError):
+ logger.error("A1PolicyManager.resp_handler:: Handler failed to parse request")
+ return
+
+ if verifyPolicy(req):
+ logger.info("A1PolicyHandler.resp_handler:: Handler processed request: {}".format(req))
+ else:
+ logger.error("A1PolicyHandler.resp_handler:: Request verification failed: {}".format(req))
+ logger.debug("A1PolicyHandler.resp_handler:: Request verification success: {}".format(req))
+ change_threshold(self, req)
+ resp = buildPolicyResp(self, req)
+ self.rmr_send(json.dumps(resp).encode(), 20011)
+ logger.info("A1PolicyHandler.resp_handler:: Response sent: {}".format(resp))
+ self.rmr_free(sbuf)
+
+
+def change_threshold(self, req: dict):
+ if req["operation"] == "CREATE":
+ payload = req["payload"]
+ threshold = json.loads(payload)[db.a1_param]
+ logger.info("throughput threshold parameter updated to: {}% ".format(threshold))
+
+
+def verifyPolicy(req: dict):
+ for i in ["policy_type_id", "operation", "policy_instance_id"]:
+ if i not in req:
+ return False
+ return True
+
+
+def buildPolicyResp(self, req: dict):
+ req["handler_id"] = "ad"
+ del req["operation"]
+ del req["payload"]
+ req["status"] = "OK"
+ return req
+
+
def start(thread=False):
# Initiates xapp api and runs the entry() using xapp.run()
xapp = Xapp(entrypoint=entry, rmr_port=4560, use_fake_sdl=False)
- xapp.logger.debug("AD xApp starting")
+ logger.debug("AD xApp starting")
xapp.run()
sphinxcontrib-httpdomain
recommonmark
lfdocs-conf
+ urllib3~=1.26.15
+allowlist_externals = echo
commands =
sphinx-build -W -b html -n -d {envtmpdir}/doctrees ./docs/ {toxinidir}/docs/_build/html
echo "Generated docs available in {toxinidir}/docs/_build/html"
sphinxcontrib-httpdomain
recommonmark
lfdocs-conf
+ urllib3~=1.26.15
commands = sphinx-build -W -b linkcheck -d {envtmpdir}/doctrees ./docs/ {toxinidir}/docs/_build/linkcheck
{
"xapp_name": "ad",
- "version": "1.0.0",
+ "version": "1.0.1",
"containers": [
{
"name": "ad",