# ==================================================================================
# Copyright (c) 2020 AT&T Intellectual Property.
+# Copyright (c) 2020 HCL Technologies Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# limitations under the License.
# ==================================================================================
"""
-mock qp module
+qp module main -- using Time series ML predictor
RMR Messages:
- #define TS_QOE_PRED_REQ 30001
+ #define TS_UE_LIST 30000
#define TS_QOE_PREDICTION 30002
-30001 is the message type QP receives from the driver;
+30000 is the message type QP receives from the TS;
sends out type 30002 which should be routed to TS.
"""
-
-import json
+import insert
import os
+import json
from mdclogpy import Logger
from ricxappframe.xapp_frame import RMRXapp, rmr
+from prediction import forecast
+from qptrain import train
+from database import DATABASE, DUMMY
+import warnings
+warnings.filterwarnings("ignore")
# pylint: disable=invalid-name
qp_xapp = None
def qp_predict_handler(self, summary, sbuf):
"""
- Function that processes messages for type 30001
- """
- logger.debug("predict handler received message type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
- logger.debug("adding somethign")
- logger.debug("message is " + summary[rmr.RMR_MS_PAYLOAD].decode())
- pred_req_msg = json.loads(summary[rmr.RMR_MS_PAYLOAD].decode())
- all_cells = {}
- ind = 0
- for ncell in pred_req_msg["CellMeasurements"]:
- if (ind == 0):
- all_cells[ncell["CellID"]] = [50000, 20000]
- else:
- all_cells[ncell["CellID"]] = [20000, 10000]
- ind += 1
-
- pred_msg = {}
- pred_msg[pred_req_msg["PredictionUE"]] = all_cells
+ Function that processes messages for type 30000
+ """
+ logger.debug("predict handler received payload {}".format(summary[rmr.RMR_MS_PAYLOAD]))
+ pred_msg = predict(summary[rmr.RMR_MS_PAYLOAD])
self.predict_requests += 1
# we don't use rts here; free this
self.rmr_free(sbuf)
- # send a mock message based on input
- success = self.rmr_send(json.dumps(pred_msg).encode(), 30002)
+ success = self.rmr_send(pred_msg.encode(), 30002)
+ logger.debug("Sending message to ts : {}".format(pred_msg)) # For debug purpose
if success:
logger.debug("predict handler: sent message successfully")
else:
logger.warning("predict handler: failed to send message")
+def nbcells(ue):
+ """
+ Extract neighbor cell id for a given UE
+ """
+ db.read_data(meas='liveUE', limit=1, ueid=ue)
+ df = db.data
+
+ nbc = df.filter(regex='nbCell').values[0]
+ return nbc
+
+
+def predict(payload):
+ """
+ Function that forecast the time series
+ """
+ tp = {}
+ payload = json.loads(payload)
+ ueid = payload['UEPredictionSet'][0]
+
+ nbc = nbcells(ueid)
+ for cid in nbc:
+ mcid = cid.replace('/', '')
+ db.read_data(meas='liveCell', cellid=cid, limit=11)
+ if len(db.data) != 0:
+ inp = db.data
+
+ if not os.path.isfile('qp/' + mcid):
+ train(db, cid)
+
+ df_f = forecast(inp, mcid, 1)
+ if df_f is not None:
+ tp[cid] = df_f.values.tolist()[0]
+ df_f['cellid'] = cid
+ db.write_prediction(df_f)
+ else:
+ tp[cid] = [None, None]
+ return json.dumps({ueid: tp})
+
+
def start(thread=False):
"""
This is a convenience function that allows this xapp to run in Docker
"""
logger.debug("QP xApp starting")
global qp_xapp
+ global db
+ if not thread:
+ insert.populatedb() # temporory method to popuate db, it will be removed when data will be coming through KPIMON to influxDB
+ db = DATABASE('UEData')
+ else:
+ db = DUMMY()
fake_sdl = os.environ.get("USE_FAKE_SDL", None)
qp_xapp = RMRXapp(qp_default_handler, rmr_port=4560, post_init=post_init, use_fake_sdl=bool(fake_sdl))
- qp_xapp.register_callback(qp_predict_handler, 30001)
+ qp_xapp.register_callback(qp_predict_handler, 30000)
qp_xapp.run(thread)