Release 1.0.1
[ric-app/ad.git] / src / main.py
index f2a5baf..45542c8 100644 (file)
@@ -19,7 +19,7 @@ import os
 import time
 import pandas as pd
 import schedule
-from ricxappframe.xapp_frame import Xapp
+from ricxappframe.xapp_frame import Xapp, rmr
 from ricxappframe.xapp_sdl import SDLWrapper
 from mdclogpy import Logger
 from ad_model import modelling, CAUSE
@@ -28,6 +28,7 @@ from database import DATABASE, DUMMY
 
 db = None
 cp = None
+threshold = None
 sdl = SDLWrapper(use_fake_sdl=True)
 
 logger = Logger(name=__name__)
@@ -48,8 +49,11 @@ def entry(self):
 def load_model():
     global md
     global cp
+    global threshold
     md = modelling()
     cp = CAUSE()
+    threshold = 70
+    logger.info("throughput threshold parameter is set as {}% (default)".format(threshold))
 
 
 def train_model():
@@ -96,7 +100,7 @@ def predict_anomaly(self, df):
     df.loc[:, 'Degradation'] = ''
     val = None
     if 1 in df.Anomaly.unique():
-        df.loc[:, ['Anomaly', 'Degradation']] = cp.cause(df, db)
+        df.loc[:, ['Anomaly', 'Degradation']] = cp.cause(df, db, threshold)
         df_a = df.loc[df['Anomaly'] == 1].copy()
         if len(df_a) > 0:
             df_a['time'] = df_a.index
@@ -116,9 +120,13 @@ def msg_to_ts(self, val):
     success = self.rmr_send(val, 30003)
     if success:
         logger.info(" Message to TS: message sent Successfully")
-    # rmr receive to get the acknowledgement message from the traffic steering.
-    for (summary, sbuf) in self.rmr_get_messages():
-        logger.info("Received acknowldgement from TS (TS_ANOMALY_ACK): {}".format(summary))
+        # rmr receive to get the acknowledgement message from the traffic steering.
+
+    for summary, sbuf in self.rmr_get_messages():
+        if sbuf.contents.mtype == 30004:
+            logger.info("Received acknowldgement from TS (TS_ANOMALY_ACK): {}".format(summary))
+        if sbuf.contents.mtype == 20010:
+            a1_request_handler(self, summary, sbuf)
         self.rmr_free(sbuf)
 
 
@@ -134,8 +142,51 @@ def connectdb(thread=False):
         success = db.connect()
 
 
+def a1_request_handler(self, summary, sbuf):
+    logger.info("A1 policy received")
+    try:
+        req = json.loads(summary[rmr.RMR_MS_PAYLOAD])  # input should be a json encoded as bytes
+        logger.debug("A1PolicyHandler.resp_handler:: Handler processing request")
+    except (json.decoder.JSONDecodeError, KeyError):
+        logger.error("A1PolicyManager.resp_handler:: Handler failed to parse request")
+        return
+
+    if verifyPolicy(req):
+        logger.info("A1PolicyHandler.resp_handler:: Handler processed request: {}".format(req))
+    else:
+        logger.error("A1PolicyHandler.resp_handler:: Request verification failed: {}".format(req))
+    logger.debug("A1PolicyHandler.resp_handler:: Request verification success: {}".format(req))
+    change_threshold(self, req)
+    resp = buildPolicyResp(self, req)
+    self.rmr_send(json.dumps(resp).encode(), 20011)
+    logger.info("A1PolicyHandler.resp_handler:: Response sent: {}".format(resp))
+    self.rmr_free(sbuf)
+
+
+def change_threshold(self, req: dict):
+    if req["operation"] == "CREATE":
+        payload = req["payload"]
+        threshold = json.loads(payload)[db.a1_param]
+        logger.info("throughput threshold parameter updated to: {}% ".format(threshold))
+
+
+def verifyPolicy(req: dict):
+    for i in ["policy_type_id", "operation", "policy_instance_id"]:
+        if i not in req:
+            return False
+    return True
+
+
+def buildPolicyResp(self, req: dict):
+    req["handler_id"] = "ad"
+    del req["operation"]
+    del req["payload"]
+    req["status"] = "OK"
+    return req
+
+
 def start(thread=False):
     # Initiates xapp api and runs the entry() using xapp.run()
     xapp = Xapp(entrypoint=entry, rmr_port=4560, use_fake_sdl=False)
-    xapp.logger.debug("AD xApp starting")
+    logger.debug("AD xApp starting")
     xapp.run()