1 # ==================================================================================
2 # Copyright (c) 2020 AT&T Intellectual Property.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ==================================================================================
17 qpdriver entrypoint module
20 #define TS_UE_LIST 30000
21 #define TS_QOE_PRED_REQ 30001
22 #define TS_QOE_PREDICTION 30002
23 30000 is the message type QPD receives; sends out type 30001, which should be routed to QP.
28 from ricxappframe.xapp_frame import RMRXapp, rmr
29 from qpdriver import data
30 from qpdriver.exceptions import UENotFound
33 # pylint: disable=invalid-name
39 Function that runs when xapp initialization is complete
41 self.def_hand_called = 0
42 self.traffic_steering_requests = 0
45 def default_handler(self, summary, sbuf):
47 Function that processes messages for which no handler is defined
49 self.def_hand_called += 1
50 self.logger.warning("QP Driver received an unexpected message of type: {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
54 def steering_req_handler(self, summary, sbuf):
56 This is the main handler for this xapp, which handles traffic steering requests.
57 Traffic steering requests predictions on a set of UEs.
58 This app fetches a set of data, merges it together in a deterministic way,
59 then sends a new message out to the QP predictor Xapp.
61 The incoming message that this function handles looks like:
62 {“UEPredictionSet” : [“UEId1”,”UEId2”,”UEId3”]}
64 self.traffic_steering_requests += 1
67 req = json.loads(summary[rmr.RMR_MS_PAYLOAD]) # input should be a json encoded as bytes
68 ue_list = req["UEPredictionSet"]
69 except (json.decoder.JSONDecodeError, KeyError):
70 self.logger.debug("Received a TS Request but it was malformed!")
72 # we don't use rts here; free this
75 # iterate over the UEs and send a request for each, if it is a valid UE, to QP
78 to_qpp = data.form_qp_pred_req(self, ueid)
79 payload = json.dumps(to_qpp).encode()
80 success = self.rmr_send(payload, 30001)
82 self.logger.debug("QP Driver was unable to send to QP!")
84 self.logger.debug("Received a TS Request for a UE that does not exist!")
87 def start(thread=False):
89 This is a convenience function that allows this xapp to run in Docker
90 for "real" (no thread, real SDL), but also easily modified for unit testing
91 (e.g., use_fake_sdl). The defaults for this function are for the Dockerized xapp.
94 fake_sdl = getenv("USE_FAKE_SDL", None)
95 rmr_xapp = RMRXapp(default_handler, rmr_port=4560, post_init=post_init, use_fake_sdl=bool(fake_sdl))
96 rmr_xapp.register_callback(steering_req_handler, 30000)
102 can only be called if thread=True when started
103 TODO: could we register a signal handler for Docker SIGTERM that calls this?
110 hacky for now, will evolve
112 return {"DefCalled": rmr_xapp.def_hand_called,
113 "SteeringRequests": rmr_xapp.traffic_steering_requests}