Release 0.0.5
[ric-app/qp.git] / src / main.py
similarity index 70%
rename from qp/main.py
rename to src/main.py
index 1c88a48..3ccf58a 100644 (file)
@@ -24,7 +24,6 @@ RMR Messages:
 sends out type 30002 which should be routed to TS.
 
 """
-import insert
 import os
 import json
 from mdclogpy import Logger
@@ -32,11 +31,14 @@ from ricxappframe.xapp_frame import RMRXapp, rmr
 from prediction import forecast
 from qptrain import train
 from database import DATABASE, DUMMY
+from exceptions import DataNotMatchError
 import warnings
+# import schedule
 warnings.filterwarnings("ignore")
 
 # pylint: disable=invalid-name
 qp_xapp = None
+db = None
 logger = Logger(name=__name__)
 
 
@@ -78,40 +80,49 @@ def cells(ue):
     """
         Extract neighbor cell id for a given UE
     """
-    db.read_data(meas='liveUE', limit=1, ueid=ue)
+    db.read_data(ueid=ue)
     df = db.data
-
-    nbc = df.filter(regex='nbCell').values[0].tolist()
-    srvc = df.filter(regex='nrCell').values[0].tolist()
-    return srvc+nbc
+    cells = []
+    if df is not None:
+        nbc = df.filter(regex=db.nbcells).values[0].tolist()
+        srvc = df.filter(regex=db.servcell).values[0].tolist()
+        cells = srvc+nbc
+    return cells
 
 
 def predict(payload):
     """
      Function that forecast the time series
     """
-    tp = {}
+    output = {}
     payload = json.loads(payload)
-    ueid = payload['UEPredictionSet'][0]
-
-    cell_list = cells(ueid)
-    for cid in cell_list:
-        mcid = cid.replace('/', '')
-        db.read_data(meas='liveCell', cellid=cid, limit=11)
-        if len(db.data) != 0:
-            inp = db.data
-
-            if not os.path.isfile('qp/' + mcid):
-                train(db, cid)
-
-            df_f = forecast(inp, mcid, 1)
-            if df_f is not None:
-                tp[cid] = df_f.values.tolist()[0]
-                df_f['cellid'] = cid
-                db.write_prediction(df_f)
-            else:
-                tp[cid] = [None, None]
-    return json.dumps({ueid: tp})
+    ue_list = payload['UEPredictionSet']
+    for ueid in ue_list:
+        tp = {}
+        cell_list = cells(ueid)
+        for cid in cell_list:
+            train_model(cid)
+            mcid = cid.replace('/', '')
+            db.read_data(cellid=cid, limit=101)
+            if db.data is not None and len(db.data) != 0:
+                try:
+                    inp = db.data[db.thptparam]
+                except DataNotMatchError:
+                    logger.debug("UL/DL parameters do not exist in provided data")
+                df_f = forecast(inp, mcid, 1)
+                if df_f is not None:
+                    tp[cid] = df_f.values.tolist()[0]
+                    df_f[db.cid] = cid
+                    db.write_prediction(df_f)
+                else:
+                    tp[cid] = [None, None]
+        output[ueid] = tp
+    return json.dumps(output)
+
+
+def train_model(cid):
+    if not os.path.isfile('src/'+cid):
+        train(db, cid)
 
 
 def start(thread=False):
@@ -122,18 +133,25 @@ def start(thread=False):
     """
     logger.debug("QP xApp starting")
     global qp_xapp
-    global db
-    if not thread:
-        insert.populatedb()   # temporory method to popuate db, it will be removed when data will be coming through KPIMON to influxDB
-        db = DATABASE('UEData')
-    else:
-        db = DUMMY()
+    connectdb(thread)
     fake_sdl = os.environ.get("USE_FAKE_SDL", None)
     qp_xapp = RMRXapp(qp_default_handler, rmr_port=4560, post_init=post_init, use_fake_sdl=bool(fake_sdl))
     qp_xapp.register_callback(qp_predict_handler, 30000)
     qp_xapp.run(thread)
 
 
+def connectdb(thread=False):
+    # Create a connection to InfluxDB if thread=True, otherwise it will create a dummy data instance
+    global db
+    if thread:
+        db = DUMMY()
+    else:
+        db = DATABASE()
+    success = False
+    while not success and not thread:
+        success = db.connect()
+
+
 def stop():
     """
     can only be called if thread=True when started