2 qpdriver entrypoint module
4 # ==================================================================================
5 # Copyright (c) 2020 AT&T Intellectual Property.
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 # ==================================================================================
21 from ricxappframe.xapp_frame import RMRXapp, rmr
22 from qpdriver import data
23 from qpdriver.exceptions import UENotFound
27 #define TS_UE_LIST 30000
28 #define TS_QOE_PRED_REQ 30001
29 #define TS_QOE_PREDICTION 30002
30 30000 is the message type QPD receives; sends out type 30001, which should be routed to QP.
38 self.def_hand_called = 0
39 self.traffic_steering_requests = 0
42 def default_handler(self, summary, sbuf):
43 self.def_hand_called += 1
44 self.logger.info("QP Driver received an unexpected message of type: {}, dropping.".format(summary[rmr.RMR_MS_MSG_TYPE]))
48 def steering_req_handler(self, summary, sbuf):
50 This is the main handler for this xapp, which handles traffic steering requests.
51 Traffic steering requests predictions on a set of UEs.
52 This app fetches a set of data, merges it together in a deterministic way,
53 then sends a new message out to the QP predictor Xapp.
55 The incoming message that this function handles looks like:
56 {“UEPredictionSet” : [“UEId1”,”UEId2”,”UEId3”]}
58 self.traffic_steering_requests += 1
61 req = json.loads(summary[rmr.RMR_MS_PAYLOAD]) # input should be a json encoded as bytes
62 ue_list = req["UEPredictionSet"]
63 except (json.decoder.JSONDecodeError, KeyError):
64 self.logger.debug("Received a TS Request but it was malformed!")
66 # we don't use rts here; free this
69 # iterate over the UEs and send a request for each, if it is a valid UE, to QP
72 to_qpp = data.form_qp_pred_req(self, ueid)
73 payload = json.dumps(to_qpp).encode()
74 ok = self.rmr_send(payload, 30001)
76 self.logger.debug("QP Driver was unable to send to QP!")
78 self.logger.debug("Received a TS Request for a UE that does not exist!")
81 def start(thread=False):
83 This is a convenience function that allows this xapp to run in Docker
84 for "real" (no thread, real SDL), but also easily modified for unit testing
85 (e.g., use_fake_sdl). The defaults for this function are for the Dockerized xapp.
88 fake_sdl = getenv("USE_FAKE_SDL", None)
89 rmr_xapp = RMRXapp(default_handler, post_init=post_init, use_fake_sdl=True if fake_sdl else False)
90 rmr_xapp.register_callback(steering_req_handler, 30000)
96 can only be called if thread=True when started
97 TODO: could we register a signal handler for Docker SIGTERM that calls this?
103 # hacky for now, will evolve
104 return {"DefCalled": rmr_xapp.def_hand_called, "SteeringRequests": rmr_xapp.traffic_steering_requests}