X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=src%2Fmain.py;h=45542c846c6d74205a5fe43685ec338ac8047b08;hb=refs%2Fchanges%2F55%2F11455%2F2;hp=f2a5bafa0e5a4b402789711149d80ab820609668;hpb=5da0637daa11fc09c2980efad7f116ef2a537be8;p=ric-app%2Fad.git diff --git a/src/main.py b/src/main.py index f2a5baf..45542c8 100644 --- a/src/main.py +++ b/src/main.py @@ -19,7 +19,7 @@ import os import time import pandas as pd import schedule -from ricxappframe.xapp_frame import Xapp +from ricxappframe.xapp_frame import Xapp, rmr from ricxappframe.xapp_sdl import SDLWrapper from mdclogpy import Logger from ad_model import modelling, CAUSE @@ -28,6 +28,7 @@ from database import DATABASE, DUMMY db = None cp = None +threshold = None sdl = SDLWrapper(use_fake_sdl=True) logger = Logger(name=__name__) @@ -48,8 +49,11 @@ def entry(self): def load_model(): global md global cp + global threshold md = modelling() cp = CAUSE() + threshold = 70 + logger.info("throughput threshold parameter is set as {}% (default)".format(threshold)) def train_model(): @@ -96,7 +100,7 @@ def predict_anomaly(self, df): df.loc[:, 'Degradation'] = '' val = None if 1 in df.Anomaly.unique(): - df.loc[:, ['Anomaly', 'Degradation']] = cp.cause(df, db) + df.loc[:, ['Anomaly', 'Degradation']] = cp.cause(df, db, threshold) df_a = df.loc[df['Anomaly'] == 1].copy() if len(df_a) > 0: df_a['time'] = df_a.index @@ -116,9 +120,13 @@ def msg_to_ts(self, val): success = self.rmr_send(val, 30003) if success: logger.info(" Message to TS: message sent Successfully") - # rmr receive to get the acknowledgement message from the traffic steering. - for (summary, sbuf) in self.rmr_get_messages(): - logger.info("Received acknowldgement from TS (TS_ANOMALY_ACK): {}".format(summary)) + # rmr receive to get the acknowledgement message from the traffic steering. + + for summary, sbuf in self.rmr_get_messages(): + if sbuf.contents.mtype == 30004: + logger.info("Received acknowldgement from TS (TS_ANOMALY_ACK): {}".format(summary)) + if sbuf.contents.mtype == 20010: + a1_request_handler(self, summary, sbuf) self.rmr_free(sbuf) @@ -134,8 +142,51 @@ def connectdb(thread=False): success = db.connect() +def a1_request_handler(self, summary, sbuf): + logger.info("A1 policy received") + try: + req = json.loads(summary[rmr.RMR_MS_PAYLOAD]) # input should be a json encoded as bytes + logger.debug("A1PolicyHandler.resp_handler:: Handler processing request") + except (json.decoder.JSONDecodeError, KeyError): + logger.error("A1PolicyManager.resp_handler:: Handler failed to parse request") + return + + if verifyPolicy(req): + logger.info("A1PolicyHandler.resp_handler:: Handler processed request: {}".format(req)) + else: + logger.error("A1PolicyHandler.resp_handler:: Request verification failed: {}".format(req)) + logger.debug("A1PolicyHandler.resp_handler:: Request verification success: {}".format(req)) + change_threshold(self, req) + resp = buildPolicyResp(self, req) + self.rmr_send(json.dumps(resp).encode(), 20011) + logger.info("A1PolicyHandler.resp_handler:: Response sent: {}".format(resp)) + self.rmr_free(sbuf) + + +def change_threshold(self, req: dict): + if req["operation"] == "CREATE": + payload = req["payload"] + threshold = json.loads(payload)[db.a1_param] + logger.info("throughput threshold parameter updated to: {}% ".format(threshold)) + + +def verifyPolicy(req: dict): + for i in ["policy_type_id", "operation", "policy_instance_id"]: + if i not in req: + return False + return True + + +def buildPolicyResp(self, req: dict): + req["handler_id"] = "ad" + del req["operation"] + del req["payload"] + req["status"] = "OK" + return req + + def start(thread=False): # Initiates xapp api and runs the entry() using xapp.run() xapp = Xapp(entrypoint=entry, rmr_port=4560, use_fake_sdl=False) - xapp.logger.debug("AD xApp starting") + logger.debug("AD xApp starting") xapp.run()