X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=qp%2Fmain.py;h=002b063f7d9a851e246da08c22d7f6d30c1473b0;hb=a1cb1abac5f5979f507b6760f0862c38c3ba2347;hp=188e686241c7e6a27759fb5a2e2213cf4894a966;hpb=cd32fb9b8f63431086809c542b6dab26c8ea09b1;p=ric-app%2Fqp.git diff --git a/qp/main.py b/qp/main.py index 188e686..002b063 100644 --- a/qp/main.py +++ b/qp/main.py @@ -1,5 +1,6 @@ # ================================================================================== # Copyright (c) 2020 AT&T Intellectual Property. +# Copyright (c) 2020 HCL Technologies Limited. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,20 +15,25 @@ # limitations under the License. # ================================================================================== """ -mock qp module +qp module main -- using Time series ML predictor RMR Messages: - #define TS_QOE_PRED_REQ 30001 + #define TS_UE_LIST 30000 #define TS_QOE_PREDICTION 30002 -30001 is the message type QP receives from the driver; +30000 is the message type QP receives from the TS; sends out type 30002 which should be routed to TS. """ - -import json +import insert import os +import json from mdclogpy import Logger from ricxappframe.xapp_frame import RMRXapp, rmr +from prediction import forecast +from qptrain import train +from database import DATABASE, DUMMY +import warnings +warnings.filterwarnings("ignore") # pylint: disable=invalid-name qp_xapp = None @@ -53,34 +59,60 @@ def qp_default_handler(self, summary, sbuf): def qp_predict_handler(self, summary, sbuf): """ - Function that processes messages for type 30001 - """ - logger.debug("predict handler received message type {}".format(summary[rmr.RMR_MS_MSG_TYPE])) - logger.debug("adding somethign") - logger.debug("message is " + summary[rmr.RMR_MS_PAYLOAD].decode()) - pred_req_msg = json.loads(summary[rmr.RMR_MS_PAYLOAD].decode()) - all_cells = {} - ind = 0 - for ncell in pred_req_msg["CellMeasurements"]: - if (ind == 0): - all_cells[ncell["CellID"]] = [50000, 20000] - else: - all_cells[ncell["CellID"]] = [20000, 10000] - ind += 1 - - pred_msg = {} - pred_msg[pred_req_msg["PredictionUE"]] = all_cells + Function that processes messages for type 30000 + """ + logger.debug("predict handler received payload {}".format(summary[rmr.RMR_MS_PAYLOAD])) + pred_msg = predict(summary[rmr.RMR_MS_PAYLOAD]) self.predict_requests += 1 # we don't use rts here; free this self.rmr_free(sbuf) - # send a mock message based on input - success = self.rmr_send(json.dumps(pred_msg).encode(), 30002) + success = self.rmr_send(pred_msg.encode(), 30002) + logger.debug("Sending message to ts : {}".format(pred_msg)) # For debug purpose if success: logger.debug("predict handler: sent message successfully") else: logger.warning("predict handler: failed to send message") +def nbcells(ue): + """ + Extract neighbor cell id for a given UE + """ + db.read_data(meas='liveUE', limit=1, ueid=ue) + df = db.data + + nbc = df.filter(regex='nbCell').values[0] + return nbc + + +def predict(payload): + """ + Function that forecast the time series + """ + tp = {} + payload = json.loads(payload) + ueid = payload['UEPredictionSet'][0] + + nbc = nbcells(ueid) + for cid in nbc: + mcid = cid.replace('/', '') + db.read_data(meas='liveCell', cellid=cid, limit=11) + if len(db.data) != 0: + inp = db.data + + if not os.path.isfile('qp/' + mcid): + train(db, cid) + + df_f = forecast(inp, mcid, 1) + if df_f is not None: + tp[cid] = df_f.values.tolist()[0] + df_f['cellid'] = cid + db.write_prediction(df_f) + else: + tp[cid] = [None, None] + return json.dumps({ueid: tp}) + + def start(thread=False): """ This is a convenience function that allows this xapp to run in Docker @@ -89,9 +121,15 @@ def start(thread=False): """ logger.debug("QP xApp starting") global qp_xapp + global db + if not thread: + insert.populatedb() # temporory method to popuate db, it will be removed when data will be coming through KPIMON to influxDB + db = DATABASE('UEData') + else: + db = DUMMY() fake_sdl = os.environ.get("USE_FAKE_SDL", None) qp_xapp = RMRXapp(qp_default_handler, rmr_port=4560, post_init=post_init, use_fake_sdl=bool(fake_sdl)) - qp_xapp.register_callback(qp_predict_handler, 30001) + qp_xapp.register_callback(qp_predict_handler, 30000) qp_xapp.run(thread)