1 # ==================================================================================
2 # Copyright (c) 2020 AT&T Intellectual Property.
3 # Copyright (c) 2020 HCL Technologies Limited.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ==================================================================================
18 qp module main -- using Time series ML predictor
21 #define TS_UE_LIST 30000
22 #define TS_QOE_PREDICTION 30002
23 30000 is the message type QP receives from the TS;
24 sends out type 30002 which should be routed to TS.
29 from mdclogpy import Logger
30 from ricxappframe.xapp_frame import RMRXapp, rmr
31 from prediction import forecast
32 from qptrain import train
33 from database import DATABASE, DUMMY
34 from exceptions import DataNotMatchError
37 warnings.filterwarnings("ignore")
39 # pylint: disable=invalid-name
42 logger = Logger(name=__name__)
47 Function that runs when xapp initialization is complete
49 self.predict_requests = 0
50 logger.debug("QP xApp started")
53 def qp_default_handler(self, summary, sbuf):
55 Function that processes messages for which no handler is defined
57 logger.debug("default handler received message type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
58 # we don't use rts here; free this
62 def qp_predict_handler(self, summary, sbuf):
64 Function that processes messages for type 30000
66 logger.debug("predict handler received payload {}".format(summary[rmr.RMR_MS_PAYLOAD]))
67 pred_msg = predict(summary[rmr.RMR_MS_PAYLOAD])
68 self.predict_requests += 1
69 # we don't use rts here; free this
71 success = self.rmr_send(pred_msg.encode(), 30002)
72 logger.debug("Sending message to ts : {}".format(pred_msg)) # For debug purpose
74 logger.debug("predict handler: sent message successfully")
76 logger.warning("predict handler: failed to send message")
81 Extract neighbor cell id for a given UE
87 nbc = df.filter(regex=db.nbcells).values[0].tolist()
88 srvc = df.filter(regex=db.servcell).values[0].tolist()
95 Function that forecast the time series
98 payload = json.loads(payload)
99 ue_list = payload['UEPredictionSet']
102 cell_list = cells(ueid)
103 for cid in cell_list:
105 mcid = cid.replace('/', '')
106 db.read_data(cellid=cid, limit=101)
107 if db.data is not None and len(db.data) != 0:
109 inp = db.data[db.thptparam]
110 except DataNotMatchError:
111 logger.debug("UL/DL parameters do not exist in provided data")
112 df_f = forecast(inp, mcid, 1)
114 tp[cid] = df_f.values.tolist()[0]
116 db.write_prediction(df_f)
118 tp[cid] = [None, None]
120 return json.dumps(output)
123 def train_model(cid):
124 if not os.path.isfile('src/'+cid):
128 def start(thread=False):
130 This is a convenience function that allows this xapp to run in Docker
131 for "real" (no thread, real SDL), but also easily modified for unit testing
132 (e.g., use_fake_sdl). The defaults for this function are for the Dockerized xapp.
134 logger.debug("QP xApp starting")
137 fake_sdl = os.environ.get("USE_FAKE_SDL", None)
138 qp_xapp = RMRXapp(qp_default_handler, rmr_port=4560, post_init=post_init, use_fake_sdl=bool(fake_sdl))
139 qp_xapp.register_callback(qp_predict_handler, 30000)
143 def connectdb(thread=False):
144 # Create a connection to InfluxDB if thread=True, otherwise it will create a dummy data instance
151 while not success and not thread:
152 success = db.connect()
157 can only be called if thread=True when started
158 TODO: could we register a signal handler for Docker SIGTERM that calls this?
166 hacky for now, will evolve
169 return {"PredictRequests": qp_xapp.predict_requests}