sends out type 30002 which should be routed to TS.
"""
-import insert
import os
import json
from mdclogpy import Logger
from prediction import forecast
from qptrain import train
from database import DATABASE, DUMMY
+from exceptions import DataNotMatchError
import warnings
+# import schedule
warnings.filterwarnings("ignore")
# pylint: disable=invalid-name
qp_xapp = None
+db = None
logger = Logger(name=__name__)
"""
Extract neighbor cell id for a given UE
"""
- db.read_data(meas='liveUE', limit=1, ueid=ue)
+ db.read_data(ueid=ue)
df = db.data
-
- nbc = df.filter(regex='nbCell').values[0].tolist()
- srvc = df.filter(regex='nrCell').values[0].tolist()
- return srvc+nbc
+ cells = []
+ if df is not None:
+ nbc = df.filter(regex=db.nbcells).values[0].tolist()
+ srvc = df.filter(regex=db.servcell).values[0].tolist()
+ cells = srvc+nbc
+ return cells
def predict(payload):
"""
Function that forecast the time series
"""
- tp = {}
+ output = {}
payload = json.loads(payload)
- ueid = payload['UEPredictionSet'][0]
-
- cell_list = cells(ueid)
- for cid in cell_list:
- mcid = cid.replace('/', '')
- db.read_data(meas='liveCell', cellid=cid, limit=11)
- if len(db.data) != 0:
- inp = db.data
-
- if not os.path.isfile('qp/' + mcid):
- train(db, cid)
-
- df_f = forecast(inp, mcid, 1)
- if df_f is not None:
- tp[cid] = df_f.values.tolist()[0]
- df_f['cellid'] = cid
- db.write_prediction(df_f)
- else:
- tp[cid] = [None, None]
- return json.dumps({ueid: tp})
+ ue_list = payload['UEPredictionSet']
+ for ueid in ue_list:
+ tp = {}
+ cell_list = cells(ueid)
+ for cid in cell_list:
+ train_model(cid)
+ mcid = cid.replace('/', '')
+ db.read_data(cellid=cid, limit=101)
+ if db.data is not None and len(db.data) != 0:
+ try:
+ inp = db.data[db.thptparam]
+ except DataNotMatchError:
+ logger.debug("UL/DL parameters do not exist in provided data")
+ df_f = forecast(inp, mcid, 1)
+ if df_f is not None:
+ tp[cid] = df_f.values.tolist()[0]
+ df_f[db.cid] = cid
+ db.write_prediction(df_f)
+ else:
+ tp[cid] = [None, None]
+ output[ueid] = tp
+ return json.dumps(output)
+
+
+def train_model(cid):
+ if not os.path.isfile('src/'+cid):
+ train(db, cid)
def start(thread=False):
"""
logger.debug("QP xApp starting")
global qp_xapp
- global db
- if not thread:
- insert.populatedb() # temporory method to popuate db, it will be removed when data will be coming through KPIMON to influxDB
- db = DATABASE('UEData')
- else:
- db = DUMMY()
+ connectdb(thread)
fake_sdl = os.environ.get("USE_FAKE_SDL", None)
qp_xapp = RMRXapp(qp_default_handler, rmr_port=4560, post_init=post_init, use_fake_sdl=bool(fake_sdl))
qp_xapp.register_callback(qp_predict_handler, 30000)
qp_xapp.run(thread)
+def connectdb(thread=False):
+ # Create a connection to InfluxDB if thread=True, otherwise it will create a dummy data instance
+ global db
+ if thread:
+ db = DUMMY()
+ else:
+ db = DATABASE()
+ success = False
+ while not success and not thread:
+ success = db.connect()
+
+
def stop():
"""
can only be called if thread=True when started