First version with ML prediction code
[ric-app/qp.git] / qp / main.py
index 188e686..002b063 100644 (file)
@@ -1,5 +1,6 @@
 # ==================================================================================
 #       Copyright (c) 2020 AT&T Intellectual Property.
+#       Copyright (c) 2020 HCL Technologies Limited.
 #
 #   Licensed under the Apache License, Version 2.0 (the "License");
 #   you may not use this file except in compliance with the License.
 #   limitations under the License.
 # ==================================================================================
 """
-mock qp module
+qp module main -- using Time series ML predictor
 
 RMR Messages:
- #define TS_QOE_PRED_REQ 30001
+ #define TS_UE_LIST 30000
  #define TS_QOE_PREDICTION 30002
-30001 is the message type QP receives from the driver;
+30000 is the message type QP receives from the TS;
 sends out type 30002 which should be routed to TS.
 
 """
-
-import json
+import insert
 import os
+import json
 from mdclogpy import Logger
 from ricxappframe.xapp_frame import RMRXapp, rmr
+from prediction import forecast
+from qptrain import train
+from database import DATABASE, DUMMY
+import warnings
+warnings.filterwarnings("ignore")
 
 # pylint: disable=invalid-name
 qp_xapp = None
@@ -53,34 +59,60 @@ def qp_default_handler(self, summary, sbuf):
 
 def qp_predict_handler(self, summary, sbuf):
     """
-    Function that processes messages for type 30001
-    """
-    logger.debug("predict handler received message type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
-    logger.debug("adding somethign")
-    logger.debug("message is " + summary[rmr.RMR_MS_PAYLOAD].decode())
-    pred_req_msg = json.loads(summary[rmr.RMR_MS_PAYLOAD].decode())
-    all_cells = {}
-    ind = 0
-    for ncell in pred_req_msg["CellMeasurements"]:
-        if (ind == 0):
-            all_cells[ncell["CellID"]] = [50000, 20000]
-        else:
-            all_cells[ncell["CellID"]] = [20000, 10000]
-        ind += 1
-
-    pred_msg = {}
-    pred_msg[pred_req_msg["PredictionUE"]] = all_cells
+    Function that processes messages for type 30000
+    """
+    logger.debug("predict handler received payload {}".format(summary[rmr.RMR_MS_PAYLOAD]))
+    pred_msg = predict(summary[rmr.RMR_MS_PAYLOAD])
     self.predict_requests += 1
     # we don't use rts here; free this
     self.rmr_free(sbuf)
-    # send a mock message based on input
-    success = self.rmr_send(json.dumps(pred_msg).encode(), 30002)
+    success = self.rmr_send(pred_msg.encode(), 30002)
+    logger.debug("Sending message to ts : {}".format(pred_msg))  # For debug purpose
     if success:
         logger.debug("predict handler: sent message successfully")
     else:
         logger.warning("predict handler: failed to send message")
 
 
+def nbcells(ue):
+    """
+        Extract neighbor cell id for a given UE
+    """
+    db.read_data(meas='liveUE', limit=1, ueid=ue)
+    df = db.data
+
+    nbc = df.filter(regex='nbCell').values[0]
+    return nbc
+
+
+def predict(payload):
+    """
+     Function that forecast the time series
+    """
+    tp = {}
+    payload = json.loads(payload)
+    ueid = payload['UEPredictionSet'][0]
+
+    nbc = nbcells(ueid)
+    for cid in nbc:
+        mcid = cid.replace('/', '')
+        db.read_data(meas='liveCell', cellid=cid, limit=11)
+        if len(db.data) != 0:
+            inp = db.data
+
+            if not os.path.isfile('qp/' + mcid):
+                train(db, cid)
+
+            df_f = forecast(inp, mcid, 1)
+            if df_f is not None:
+                tp[cid] = df_f.values.tolist()[0]
+                df_f['cellid'] = cid
+                db.write_prediction(df_f)
+            else:
+                tp[cid] = [None, None]
+    return json.dumps({ueid: tp})
+
+
 def start(thread=False):
     """
     This is a convenience function that allows this xapp to run in Docker
@@ -89,9 +121,15 @@ def start(thread=False):
     """
     logger.debug("QP xApp starting")
     global qp_xapp
+    global db
+    if not thread:
+        insert.populatedb()   # temporory method to popuate db, it will be removed when data will be coming through KPIMON to influxDB
+        db = DATABASE('UEData')
+    else:
+        db = DUMMY()
     fake_sdl = os.environ.get("USE_FAKE_SDL", None)
     qp_xapp = RMRXapp(qp_default_handler, rmr_port=4560, post_init=post_init, use_fake_sdl=bool(fake_sdl))
-    qp_xapp.register_callback(qp_predict_handler, 30001)
+    qp_xapp.register_callback(qp_predict_handler, 30000)
     qp_xapp.run(thread)