2 qpdriver entrypoint module
4 # ==================================================================================
5 # Copyright (c) 2020 AT&T Intellectual Property.
7 # Licensed under the Apache License, Version 2.0 (the "License");
8 # you may not use this file except in compliance with the License.
9 # You may obtain a copy of the License at
11 # http://www.apache.org/licenses/LICENSE-2.0
13 # Unless required by applicable law or agreed to in writing, software
14 # distributed under the License is distributed on an "AS IS" BASIS,
15 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 # See the License for the specific language governing permissions and
17 # limitations under the License.
18 # ==================================================================================
21 from ricxappframe.xapp_frame import RMRXapp, rmr
22 from qpdriver import data
23 from qpdriver.exceptions import UENotFound
27 #define TS_UE_LIST 30000
28 #define TS_QOE_PRED_REQ 30001
29 30000 is the message QPD receives, sends out 30001 to QP
37 self.def_hand_called = 0
38 self.traffic_steering_requests = 0
41 def default_handler(self, summary, sbuf):
42 self.def_hand_called += 1
43 self.logger.info("QP Driver received an unexpected message of type: {}, dropping.".format(summary[rmr.RMR_MS_MSG_TYPE]))
47 def steering_req_handler(self, summary, sbuf):
49 This is the main handler for this xapp, which handles the traffic steering requests.
50 Traffic steering requests predictions on a set of UEs.
51 QP Driver (this) fetches a set of data, merges it together in a deterministic way, then sends a new message out to the QP predictor Xapp.
53 The incoming message that this function handles looks like:
54 {“UEPredictionSet” : [“UEId1”,”UEId2”,”UEId3”]}
56 self.traffic_steering_requests += 1
59 req = json.loads(summary[rmr.RMR_MS_PAYLOAD]) # input should be a json encoded as bytes
60 ue_list = req["UEPredictionSet"]
61 except (json.decoder.JSONDecodeError, KeyError):
62 self.logger.debug("Received a TS Request but it was malformed!")
64 # we don't use rts here; free this
67 # iterate over the ues and send a request each, if it is a valid UE, to QPP
70 to_qpp = data.form_qp_pred_req(self, ueid)
71 payload = json.dumps(to_qpp).encode()
72 ok = self.rmr_send(payload, 30001)
74 self.logger.debug("QP Driver was unable to send to QP Predictor!")
76 self.logger.debug("Received a TS Request for a UE that does not exist!")
79 def start(thread=False):
81 this is a convienence function that allows this xapp to run in Docker for "real" (no thread, real SDL)
82 but also easily modified for unit testing (e.g., use_fake_sdl)
83 the defaults for this function are for the Dockerized xapp.
86 fake_sdl = getenv("USE_FAKE_SDL", None)
87 rmr_xapp = RMRXapp(default_handler, post_init=post_init, use_fake_sdl=True if fake_sdl else False)
88 rmr_xapp.register_callback(steering_req_handler, 30000)
94 can only be called if thread=True when started
95 TODO: could we register a signal handler for Docker SIGTERM that calls this?
101 # hacky for now, will evolve
102 return {"DefCalled": rmr_xapp.def_hand_called, "SteeringRequests": rmr_xapp.traffic_steering_requests}