[RICAPP-181] Update Prediction message (from QP to TS) with serving cell prediciton
[ric-app/qp.git] / src / main.py
1 # ==================================================================================
2 #       Copyright (c) 2020 AT&T Intellectual Property.
3 #       Copyright (c) 2020 HCL Technologies Limited.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #          http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 # ==================================================================================
17 """
18 qp module main -- using Time series ML predictor
19
20 RMR Messages:
21  #define TS_UE_LIST 30000
22  #define TS_QOE_PREDICTION 30002
23 30000 is the message type QP receives from the TS;
24 sends out type 30002 which should be routed to TS.
25
26 """
27 import os
28 import json
29 from mdclogpy import Logger
30 from ricxappframe.xapp_frame import RMRXapp, rmr
31 from prediction import forecast
32 from qptrain import train
33 from database import DATABASE, DUMMY
34 from exceptions import DataNotMatchError
35 import warnings
36 # import schedule
37 warnings.filterwarnings("ignore")
38
39 # pylint: disable=invalid-name
40 qp_xapp = None
41 db = None
42 logger = Logger(name=__name__)
43
44
45 def post_init(self):
46     """
47     Function that runs when xapp initialization is complete
48     """
49     self.predict_requests = 0
50     logger.debug("QP xApp started")
51
52
53 def qp_default_handler(self, summary, sbuf):
54     """
55     Function that processes messages for which no handler is defined
56     """
57     logger.debug("default handler received message type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
58     # we don't use rts here; free this
59     self.rmr_free(sbuf)
60
61
62 def qp_predict_handler(self, summary, sbuf):
63     """
64     Function that processes messages for type 30000
65     """
66     logger.debug("predict handler received payload {}".format(summary[rmr.RMR_MS_PAYLOAD]))
67     pred_msg = predict(summary[rmr.RMR_MS_PAYLOAD])
68     self.predict_requests += 1
69     # we don't use rts here; free this
70     self.rmr_free(sbuf)
71     success = self.rmr_send(pred_msg.encode(), 30002)
72     logger.debug("Sending message to ts : {}".format(pred_msg))  # For debug purpose
73     if success:
74         logger.debug("predict handler: sent message successfully")
75     else:
76         logger.warning("predict handler: failed to send message")
77
78
79 def cells(ue):
80     """
81         Extract neighbor cell id for a given UE
82     """
83     db.read_data(ueid=ue)
84     df = db.data
85     cells = []
86     if df is not None:
87         nbc = df.filter(regex=db.nbcells).values[0].tolist()
88         srvc = df.filter(regex=db.servcell).values[0].tolist()
89         cells = srvc+nbc
90     return cells
91
92
93 def predict(payload):
94     """
95      Function that forecast the time series
96     """
97     output = {}
98     payload = json.loads(payload)
99     ue_list = payload['UEPredictionSet']
100     for ueid in ue_list:
101         tp = {}
102         cell_list = cells(ueid)
103         for cid in cell_list:
104             train_model(cid)
105             mcid = cid.replace('/', '')
106             db.read_data(cellid=cid, limit=101)
107             if db.data is not None and len(db.data) != 0:
108                 try:
109                     inp = db.data[db.thptparam]
110                 except DataNotMatchError:
111                     logger.debug("UL/DL parameters do not exist in provided data")
112                 df_f = forecast(inp, mcid, 1)
113                 if df_f is not None:
114                     tp[cid] = df_f.values.tolist()[0]
115                     df_f[db.cid] = cid
116                     db.write_prediction(df_f)
117                 else:
118                     tp[cid] = [None, None]
119         output[ueid] = tp
120     return json.dumps(output)
121
122
123 def train_model(cid):
124     if not os.path.isfile('src/'+cid):
125         train(db, cid)
126
127
128 def start(thread=False):
129     """
130     This is a convenience function that allows this xapp to run in Docker
131     for "real" (no thread, real SDL), but also easily modified for unit testing
132     (e.g., use_fake_sdl). The defaults for this function are for the Dockerized xapp.
133     """
134     logger.debug("QP xApp starting")
135     global qp_xapp
136     connectdb(thread)
137     fake_sdl = os.environ.get("USE_FAKE_SDL", None)
138     qp_xapp = RMRXapp(qp_default_handler, rmr_port=4560, post_init=post_init, use_fake_sdl=bool(fake_sdl))
139     qp_xapp.register_callback(qp_predict_handler, 30000)
140     qp_xapp.run(thread)
141
142
143 def connectdb(thread=False):
144     # Create a connection to InfluxDB if thread=True, otherwise it will create a dummy data instance
145     global db
146     if thread:
147         db = DUMMY()
148     else:
149         db = DATABASE()
150     success = False
151     while not success and not thread:
152         success = db.connect()
153
154
155 def stop():
156     """
157     can only be called if thread=True when started
158     TODO: could we register a signal handler for Docker SIGTERM that calls this?
159     """
160     global qp_xapp
161     qp_xapp.stop()
162
163
164 def get_stats():
165     """
166     hacky for now, will evolve
167     """
168     global qp_xapp
169     return {"PredictRequests": qp_xapp.predict_requests}