1 # ==================================================================================
2 # Copyright (c) 2020 AT&T Intellectual Property.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ==================================================================================
17 qpdriver entrypoint module
20 #define TS_UE_LIST 30000
21 #define TS_QOE_PRED_REQ 30001
22 #define TS_QOE_PREDICTION 30002
23 30000 is the message type QPD receives; sends out type 30001, which should be routed to QP.
28 from ricxappframe.xapp_frame import RMRXapp, rmr
29 # from ricxappframe.alarm import alarm
30 from qpdriver import data
31 from qpdriver.exceptions import UENotFound
34 # pylint: disable=invalid-name
40 Function that runs when xapp initialization is complete
42 self.def_hand_called = 0
43 self.traffic_steering_requests = 0
44 # self.alarm_mgr = alarm.AlarmManager(self._mrc, "ric-xapp", "qp-driver")
45 # self.alarm_sdl = None
48 def handle_config_change(self, config):
50 Function that runs at start and on every configuration file change.
52 self.logger.debug("handle_config_change: config: {}".format(config))
55 def default_handler(self, summary, sbuf):
57 Function that processes messages for which no handler is defined
59 self.def_hand_called += 1
60 self.logger.warning("default_handler unexpected message type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
64 def steering_req_handler(self, summary, sbuf):
66 This is the main handler for this xapp, which handles traffic steering requests.
67 Traffic steering requests predictions on a set of UEs.
68 This app fetches a set of data from SDL, merges it together in a deterministic way,
69 then sends a new message to the QP predictor Xapp.
71 The incoming message that this function handles looks like:
72 {"UEPredictionSet" : ["UEId1","UEId2","UEId3"]}
74 self.traffic_steering_requests += 1
75 # we don't use rts here; free the buffer
80 req = json.loads(summary[rmr.RMR_MS_PAYLOAD]) # input should be a json encoded as bytes
81 ue_list = req["UEPredictionSet"]
82 self.logger.debug("steering_req_handler processing request for UE list {}".format(ue_list))
83 except (json.decoder.JSONDecodeError, KeyError):
84 self.logger.warning("steering_req_handler failed to parse request: {}".format(summary[rmr.RMR_MS_PAYLOAD]))
87 # if self._sdl.healthcheck():
88 # healthy, so clear the alarm if it was raised
90 # self.logger.debug("steering_req_handler clearing alarm")
91 # self.alarm_mgr.clear_alarm(self.alarm_sdl)
92 # self.alarm_sdl = None
94 # not healthy, so (re-)raise the alarm
95 # self.logger.debug("steering_req_handler connection to SDL is not healthy, raising alarm")
97 # self.alarm_mgr.reraise_alarm(self.alarm_sdl)
99 # self.alarm_sdl = self.alarm_mgr.create_alarm(1, alarm.AlarmSeverity.CRITICAL, "SDL failure")
100 # self.alarm_mgr.raise_alarm(self.alarm_sdl)
101 # self.logger.warning("steering_req_handler dropping request!")
104 # iterate over the UEs and send a request for each, if it is a valid UE, to QP
107 to_qpp = data.form_qp_pred_req(self, ueid)
108 payload = json.dumps(to_qpp).encode()
109 success = self.rmr_send(payload, 30001)
111 self.logger.warning("steering_req_handler failed to send to QP!")
113 self.logger.warning("steering_req_handler received a TS Request for a UE that does not exist!")
116 def start(thread=False):
118 This is a convenience function that allows this xapp to run in Docker
119 for "real" (no thread, real SDL), but also easily modified for unit testing
120 (e.g., use_fake_sdl). The defaults for this function are for the Dockerized xapp.
123 fake_sdl = getenv("USE_FAKE_SDL", None)
124 rmr_xapp = RMRXapp(default_handler,
125 config_handler=handle_config_change,
128 use_fake_sdl=bool(fake_sdl))
129 rmr_xapp.register_callback(steering_req_handler, 30000)
135 can only be called if thread=True when started
136 TODO: could we register a signal handler for Docker SIGTERM that calls this?
143 hacky for now, will evolve
145 return {"DefCalled": rmr_xapp.def_hand_called,
146 "SteeringRequests": rmr_xapp.traffic_steering_requests}