[RICAPP-181] Update Prediction message (from QP to TS) with serving cell prediciton
[ric-app/qp.git] / qp / main.py
index 2074c19..1c88a48 100644 (file)
@@ -1,5 +1,6 @@
 # ==================================================================================
 #       Copyright (c) 2020 AT&T Intellectual Property.
+#       Copyright (c) 2020 HCL Technologies Limited.
 #
 #   Licensed under the Apache License, Version 2.0 (the "License");
 #   you may not use this file except in compliance with the License.
 #   limitations under the License.
 # ==================================================================================
 """
-mock qp module
+qp module main -- using Time series ML predictor
 
 RMR Messages:
- #define TS_QOE_PRED_REQ 30001
+ #define TS_UE_LIST 30000
  #define TS_QOE_PREDICTION 30002
-30001 is the message type QP receives from the driver;
+30000 is the message type QP receives from the TS;
 sends out type 30002 which should be routed to TS.
 
 """
-
-
+import insert
 import os
+import json
 from mdclogpy import Logger
 from ricxappframe.xapp_frame import RMRXapp, rmr
+from prediction import forecast
+from qptrain import train
+from database import DATABASE, DUMMY
+import warnings
+warnings.filterwarnings("ignore")
 
-
+# pylint: disable=invalid-name
 qp_xapp = None
 logger = Logger(name=__name__)
 
 
 def post_init(self):
+    """
+    Function that runs when xapp initialization is complete
+    """
     self.predict_requests = 0
     logger.debug("QP xApp started")
 
 
 def qp_default_handler(self, summary, sbuf):
+    """
+    Function that processes messages for which no handler is defined
+    """
     logger.debug("default handler received message type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
     # we don't use rts here; free this
     self.rmr_free(sbuf)
 
 
 def qp_predict_handler(self, summary, sbuf):
-    logger.debug("predict handler received message type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
+    """
+    Function that processes messages for type 30000
+    """
+    logger.debug("predict handler received payload {}".format(summary[rmr.RMR_MS_PAYLOAD]))
+    pred_msg = predict(summary[rmr.RMR_MS_PAYLOAD])
     self.predict_requests += 1
     # we don't use rts here; free this
     self.rmr_free(sbuf)
-    # send a mock message
-    mock_msg = '{ "12345" : { "310-680-200-555001" : [ 2000000 , 1200000 ] , "310-680-200-555002" : [ 800000 , 400000 ] , "310-680-200-555003" : [ 800000 , 400000 ] } }'
-    ok = self.rmr_send(mock_msg.encode(), 30002)
-    if ok:
+    success = self.rmr_send(pred_msg.encode(), 30002)
+    logger.debug("Sending message to ts : {}".format(pred_msg))  # For debug purpose
+    if success:
         logger.debug("predict handler: sent message successfully")
     else:
-        logger.warn("predict handler: failed to send message")
+        logger.warning("predict handler: failed to send message")
+
+
+def cells(ue):
+    """
+        Extract neighbor cell id for a given UE
+    """
+    db.read_data(meas='liveUE', limit=1, ueid=ue)
+    df = db.data
+
+    nbc = df.filter(regex='nbCell').values[0].tolist()
+    srvc = df.filter(regex='nrCell').values[0].tolist()
+    return srvc+nbc
+
+
+def predict(payload):
+    """
+     Function that forecast the time series
+    """
+    tp = {}
+    payload = json.loads(payload)
+    ueid = payload['UEPredictionSet'][0]
+
+    cell_list = cells(ueid)
+    for cid in cell_list:
+        mcid = cid.replace('/', '')
+        db.read_data(meas='liveCell', cellid=cid, limit=11)
+        if len(db.data) != 0:
+            inp = db.data
+
+            if not os.path.isfile('qp/' + mcid):
+                train(db, cid)
+
+            df_f = forecast(inp, mcid, 1)
+            if df_f is not None:
+                tp[cid] = df_f.values.tolist()[0]
+                df_f['cellid'] = cid
+                db.write_prediction(df_f)
+            else:
+                tp[cid] = [None, None]
+    return json.dumps({ueid: tp})
 
 
 def start(thread=False):
@@ -67,9 +122,15 @@ def start(thread=False):
     """
     logger.debug("QP xApp starting")
     global qp_xapp
+    global db
+    if not thread:
+        insert.populatedb()   # temporory method to popuate db, it will be removed when data will be coming through KPIMON to influxDB
+        db = DATABASE('UEData')
+    else:
+        db = DUMMY()
     fake_sdl = os.environ.get("USE_FAKE_SDL", None)
-    qp_xapp = RMRXapp(qp_default_handler, post_init=post_init, use_fake_sdl=True if fake_sdl else False)
-    qp_xapp.register_callback(qp_predict_handler, 30001)
+    qp_xapp = RMRXapp(qp_default_handler, rmr_port=4560, post_init=post_init, use_fake_sdl=bool(fake_sdl))
+    qp_xapp.register_callback(qp_predict_handler, 30000)
     qp_xapp.run(thread)
 
 
@@ -78,9 +139,13 @@ def stop():
     can only be called if thread=True when started
     TODO: could we register a signal handler for Docker SIGTERM that calls this?
     """
+    global qp_xapp
     qp_xapp.stop()
 
 
 def get_stats():
-    # hacky for now, will evolve
+    """
+    hacky for now, will evolve
+    """
+    global qp_xapp
     return {"PredictRequests": qp_xapp.predict_requests}