import time
import pandas as pd
import schedule
-from ricxappframe.xapp_frame import Xapp
+from ricxappframe.xapp_frame import Xapp, rmr
from ricxappframe.xapp_sdl import SDLWrapper
from mdclogpy import Logger
from ad_model import modelling, CAUSE
db = None
cp = None
+threshold = None
sdl = SDLWrapper(use_fake_sdl=True)
logger = Logger(name=__name__)
def load_model():
global md
global cp
+ global threshold
md = modelling()
cp = CAUSE()
+ threshold = 70
+ logger.info("throughput threshold parameter is set as {}% (default)".format(threshold))
def train_model():
df.loc[:, 'Degradation'] = ''
val = None
if 1 in df.Anomaly.unique():
- df.loc[:, ['Anomaly', 'Degradation']] = cp.cause(df, db)
+ df.loc[:, ['Anomaly', 'Degradation']] = cp.cause(df, db, threshold)
df_a = df.loc[df['Anomaly'] == 1].copy()
if len(df_a) > 0:
df_a['time'] = df_a.index
success = self.rmr_send(val, 30003)
if success:
logger.info(" Message to TS: message sent Successfully")
- # rmr receive to get the acknowledgement message from the traffic steering.
- for (summary, sbuf) in self.rmr_get_messages():
- logger.info("Received acknowldgement from TS (TS_ANOMALY_ACK): {}".format(summary))
+ # rmr receive to get the acknowledgement message from the traffic steering.
+
+ for summary, sbuf in self.rmr_get_messages():
+ if sbuf.contents.mtype == 30004:
+ logger.info("Received acknowldgement from TS (TS_ANOMALY_ACK): {}".format(summary))
+ if sbuf.contents.mtype == 20010:
+ a1_request_handler(self, summary, sbuf)
self.rmr_free(sbuf)
success = db.connect()
+def a1_request_handler(self, summary, sbuf):
+ logger.info("A1 policy received")
+ try:
+ req = json.loads(summary[rmr.RMR_MS_PAYLOAD]) # input should be a json encoded as bytes
+ logger.debug("A1PolicyHandler.resp_handler:: Handler processing request")
+ except (json.decoder.JSONDecodeError, KeyError):
+ logger.error("A1PolicyManager.resp_handler:: Handler failed to parse request")
+ return
+
+ if verifyPolicy(req):
+ logger.info("A1PolicyHandler.resp_handler:: Handler processed request: {}".format(req))
+ else:
+ logger.error("A1PolicyHandler.resp_handler:: Request verification failed: {}".format(req))
+ logger.debug("A1PolicyHandler.resp_handler:: Request verification success: {}".format(req))
+ change_threshold(self, req)
+ resp = buildPolicyResp(self, req)
+ self.rmr_send(json.dumps(resp).encode(), 20011)
+ logger.info("A1PolicyHandler.resp_handler:: Response sent: {}".format(resp))
+ self.rmr_free(sbuf)
+
+
+def change_threshold(self, req: dict):
+ if req["operation"] == "CREATE":
+ payload = req["payload"]
+ threshold = json.loads(payload)[db.a1_param]
+ logger.info("throughput threshold parameter updated to: {}% ".format(threshold))
+
+
+def verifyPolicy(req: dict):
+ for i in ["policy_type_id", "operation", "policy_instance_id"]:
+ if i not in req:
+ return False
+ return True
+
+
+def buildPolicyResp(self, req: dict):
+ req["handler_id"] = "ad"
+ del req["operation"]
+ del req["payload"]
+ req["status"] = "OK"
+ return req
+
+
def start(thread=False):
# Initiates xapp api and runs the entry() using xapp.run()
xapp = Xapp(entrypoint=entry, rmr_port=4560, use_fake_sdl=False)
- xapp.logger.debug("AD xApp starting")
+ logger.debug("AD xApp starting")
xapp.run()