[RICAPP-181] Update Prediction message (from QP to TS) with serving cell prediciton
[ric-app/qp.git] / qp / main.py
1 # ==================================================================================
2 #       Copyright (c) 2020 AT&T Intellectual Property.
3 #       Copyright (c) 2020 HCL Technologies Limited.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #          http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 # ==================================================================================
17 """
18 qp module main -- using Time series ML predictor
19
20 RMR Messages:
21  #define TS_UE_LIST 30000
22  #define TS_QOE_PREDICTION 30002
23 30000 is the message type QP receives from the TS;
24 sends out type 30002 which should be routed to TS.
25
26 """
27 import insert
28 import os
29 import json
30 from mdclogpy import Logger
31 from ricxappframe.xapp_frame import RMRXapp, rmr
32 from prediction import forecast
33 from qptrain import train
34 from database import DATABASE, DUMMY
35 import warnings
36 warnings.filterwarnings("ignore")
37
38 # pylint: disable=invalid-name
39 qp_xapp = None
40 logger = Logger(name=__name__)
41
42
43 def post_init(self):
44     """
45     Function that runs when xapp initialization is complete
46     """
47     self.predict_requests = 0
48     logger.debug("QP xApp started")
49
50
51 def qp_default_handler(self, summary, sbuf):
52     """
53     Function that processes messages for which no handler is defined
54     """
55     logger.debug("default handler received message type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
56     # we don't use rts here; free this
57     self.rmr_free(sbuf)
58
59
60 def qp_predict_handler(self, summary, sbuf):
61     """
62     Function that processes messages for type 30000
63     """
64     logger.debug("predict handler received payload {}".format(summary[rmr.RMR_MS_PAYLOAD]))
65     pred_msg = predict(summary[rmr.RMR_MS_PAYLOAD])
66     self.predict_requests += 1
67     # we don't use rts here; free this
68     self.rmr_free(sbuf)
69     success = self.rmr_send(pred_msg.encode(), 30002)
70     logger.debug("Sending message to ts : {}".format(pred_msg))  # For debug purpose
71     if success:
72         logger.debug("predict handler: sent message successfully")
73     else:
74         logger.warning("predict handler: failed to send message")
75
76
77 def cells(ue):
78     """
79         Extract neighbor cell id for a given UE
80     """
81     db.read_data(meas='liveUE', limit=1, ueid=ue)
82     df = db.data
83
84     nbc = df.filter(regex='nbCell').values[0].tolist()
85     srvc = df.filter(regex='nrCell').values[0].tolist()
86     return srvc+nbc
87
88
89 def predict(payload):
90     """
91      Function that forecast the time series
92     """
93     tp = {}
94     payload = json.loads(payload)
95     ueid = payload['UEPredictionSet'][0]
96
97     cell_list = cells(ueid)
98     for cid in cell_list:
99         mcid = cid.replace('/', '')
100         db.read_data(meas='liveCell', cellid=cid, limit=11)
101         if len(db.data) != 0:
102             inp = db.data
103
104             if not os.path.isfile('qp/' + mcid):
105                 train(db, cid)
106
107             df_f = forecast(inp, mcid, 1)
108             if df_f is not None:
109                 tp[cid] = df_f.values.tolist()[0]
110                 df_f['cellid'] = cid
111                 db.write_prediction(df_f)
112             else:
113                 tp[cid] = [None, None]
114     return json.dumps({ueid: tp})
115
116
117 def start(thread=False):
118     """
119     This is a convenience function that allows this xapp to run in Docker
120     for "real" (no thread, real SDL), but also easily modified for unit testing
121     (e.g., use_fake_sdl). The defaults for this function are for the Dockerized xapp.
122     """
123     logger.debug("QP xApp starting")
124     global qp_xapp
125     global db
126     if not thread:
127         insert.populatedb()   # temporory method to popuate db, it will be removed when data will be coming through KPIMON to influxDB
128         db = DATABASE('UEData')
129     else:
130         db = DUMMY()
131     fake_sdl = os.environ.get("USE_FAKE_SDL", None)
132     qp_xapp = RMRXapp(qp_default_handler, rmr_port=4560, post_init=post_init, use_fake_sdl=bool(fake_sdl))
133     qp_xapp.register_callback(qp_predict_handler, 30000)
134     qp_xapp.run(thread)
135
136
137 def stop():
138     """
139     can only be called if thread=True when started
140     TODO: could we register a signal handler for Docker SIGTERM that calls this?
141     """
142     global qp_xapp
143     qp_xapp.stop()
144
145
146 def get_stats():
147     """
148     hacky for now, will evolve
149     """
150     global qp_xapp
151     return {"PredictRequests": qp_xapp.predict_requests}