# ==================================================================================
# Copyright (c) 2020 AT&T Intellectual Property.
+# Copyright (c) 2020 HCL Technologies Limited.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# limitations under the License.
# ==================================================================================
"""
-mock qp module
+qp module main -- using Time series ML predictor
RMR Messages:
- #define TS_QOE_PRED_REQ 30001
+ #define TS_UE_LIST 30000
#define TS_QOE_PREDICTION 30002
-30001 is the message type QP receives from the driver;
+30000 is the message type QP receives from the TS;
sends out type 30002 which should be routed to TS.
"""
-
-
+import insert
import os
+import json
from mdclogpy import Logger
from ricxappframe.xapp_frame import RMRXapp, rmr
+from prediction import forecast
+from qptrain import train
+from database import DATABASE, DUMMY
+import warnings
+warnings.filterwarnings("ignore")
-
+# pylint: disable=invalid-name
qp_xapp = None
logger = Logger(name=__name__)
def post_init(self):
+ """
+ Function that runs when xapp initialization is complete
+ """
self.predict_requests = 0
logger.debug("QP xApp started")
def qp_default_handler(self, summary, sbuf):
+ """
+ Function that processes messages for which no handler is defined
+ """
logger.debug("default handler received message type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
# we don't use rts here; free this
self.rmr_free(sbuf)
def qp_predict_handler(self, summary, sbuf):
- logger.debug("predict handler received message type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
+ """
+ Function that processes messages for type 30000
+ """
+ logger.debug("predict handler received payload {}".format(summary[rmr.RMR_MS_PAYLOAD]))
+ pred_msg = predict(summary[rmr.RMR_MS_PAYLOAD])
self.predict_requests += 1
# we don't use rts here; free this
self.rmr_free(sbuf)
- # send a mock message
- mock_msg = '{ "12345" : { "310-680-200-555001" : [ 2000000 , 1200000 ] , "310-680-200-555002" : [ 800000 , 400000 ] , "310-680-200-555003" : [ 800000 , 400000 ] } }'
- ok = self.rmr_send(mock_msg.encode(), 30002)
- if ok:
+ success = self.rmr_send(pred_msg.encode(), 30002)
+ logger.debug("Sending message to ts : {}".format(pred_msg)) # For debug purpose
+ if success:
logger.debug("predict handler: sent message successfully")
else:
- logger.warn("predict handler: failed to send message")
+ logger.warning("predict handler: failed to send message")
+
+
+def cells(ue):
+ """
+ Extract neighbor cell id for a given UE
+ """
+ db.read_data(meas='liveUE', limit=1, ueid=ue)
+ df = db.data
+
+ nbc = df.filter(regex='nbCell').values[0].tolist()
+ srvc = df.filter(regex='nrCell').values[0].tolist()
+ return srvc+nbc
+
+
+def predict(payload):
+ """
+ Function that forecast the time series
+ """
+ tp = {}
+ payload = json.loads(payload)
+ ueid = payload['UEPredictionSet'][0]
+
+ cell_list = cells(ueid)
+ for cid in cell_list:
+ mcid = cid.replace('/', '')
+ db.read_data(meas='liveCell', cellid=cid, limit=11)
+ if len(db.data) != 0:
+ inp = db.data
+
+ if not os.path.isfile('qp/' + mcid):
+ train(db, cid)
+
+ df_f = forecast(inp, mcid, 1)
+ if df_f is not None:
+ tp[cid] = df_f.values.tolist()[0]
+ df_f['cellid'] = cid
+ db.write_prediction(df_f)
+ else:
+ tp[cid] = [None, None]
+ return json.dumps({ueid: tp})
def start(thread=False):
"""
logger.debug("QP xApp starting")
global qp_xapp
+ global db
+ if not thread:
+ insert.populatedb() # temporory method to popuate db, it will be removed when data will be coming through KPIMON to influxDB
+ db = DATABASE('UEData')
+ else:
+ db = DUMMY()
fake_sdl = os.environ.get("USE_FAKE_SDL", None)
- qp_xapp = RMRXapp(qp_default_handler, post_init=post_init, use_fake_sdl=True if fake_sdl else False)
- qp_xapp.register_callback(qp_predict_handler, 30001)
+ qp_xapp = RMRXapp(qp_default_handler, rmr_port=4560, post_init=post_init, use_fake_sdl=bool(fake_sdl))
+ qp_xapp.register_callback(qp_predict_handler, 30000)
qp_xapp.run(thread)
can only be called if thread=True when started
TODO: could we register a signal handler for Docker SIGTERM that calls this?
"""
+ global qp_xapp
qp_xapp.stop()
def get_stats():
- # hacky for now, will evolve
+ """
+ hacky for now, will evolve
+ """
+ global qp_xapp
return {"PredictRequests": qp_xapp.predict_requests}