1 # ==================================================================================
2 # Copyright (c) 2020 AT&T Intellectual Property.
3 # Copyright (c) 2020 HCL Technologies Limited.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ==================================================================================
18 qp module main -- using Time series ML predictor
21 #define TS_UE_LIST 30000
22 #define TS_QOE_PREDICTION 30002
23 30000 is the message type QP receives from the TS;
24 sends out type 30002 which should be routed to TS.
30 from mdclogpy import Logger
31 from ricxappframe.xapp_frame import RMRXapp, rmr
32 from prediction import forecast
33 from qptrain import train
34 from database import DATABASE, DUMMY
36 warnings.filterwarnings("ignore")
38 # pylint: disable=invalid-name
40 logger = Logger(name=__name__)
45 Function that runs when xapp initialization is complete
47 self.predict_requests = 0
48 logger.debug("QP xApp started")
51 def qp_default_handler(self, summary, sbuf):
53 Function that processes messages for which no handler is defined
55 logger.debug("default handler received message type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
56 # we don't use rts here; free this
60 def qp_predict_handler(self, summary, sbuf):
62 Function that processes messages for type 30000
64 logger.debug("predict handler received payload {}".format(summary[rmr.RMR_MS_PAYLOAD]))
65 pred_msg = predict(summary[rmr.RMR_MS_PAYLOAD])
66 self.predict_requests += 1
67 # we don't use rts here; free this
69 success = self.rmr_send(pred_msg.encode(), 30002)
70 logger.debug("Sending message to ts : {}".format(pred_msg)) # For debug purpose
72 logger.debug("predict handler: sent message successfully")
74 logger.warning("predict handler: failed to send message")
79 Extract neighbor cell id for a given UE
81 db.read_data(meas='liveUE', limit=1, ueid=ue)
84 nbc = df.filter(regex='nbCell').values[0]
90 Function that forecast the time series
93 payload = json.loads(payload)
94 ueid = payload['UEPredictionSet'][0]
98 mcid = cid.replace('/', '')
99 db.read_data(meas='liveCell', cellid=cid, limit=11)
100 if len(db.data) != 0:
103 if not os.path.isfile('qp/' + mcid):
106 df_f = forecast(inp, mcid, 1)
108 tp[cid] = df_f.values.tolist()[0]
110 db.write_prediction(df_f)
112 tp[cid] = [None, None]
113 return json.dumps({ueid: tp})
116 def start(thread=False):
118 This is a convenience function that allows this xapp to run in Docker
119 for "real" (no thread, real SDL), but also easily modified for unit testing
120 (e.g., use_fake_sdl). The defaults for this function are for the Dockerized xapp.
122 logger.debug("QP xApp starting")
126 insert.populatedb() # temporory method to popuate db, it will be removed when data will be coming through KPIMON to influxDB
127 db = DATABASE('UEData')
130 fake_sdl = os.environ.get("USE_FAKE_SDL", None)
131 qp_xapp = RMRXapp(qp_default_handler, rmr_port=4560, post_init=post_init, use_fake_sdl=bool(fake_sdl))
132 qp_xapp.register_callback(qp_predict_handler, 30000)
138 can only be called if thread=True when started
139 TODO: could we register a signal handler for Docker SIGTERM that calls this?
147 hacky for now, will evolve
150 return {"PredictRequests": qp_xapp.predict_requests}