import json
from os import getenv
from ricxappframe.xapp_frame import RMRXapp, rmr
+from ricxappframe.alarm import alarm
from qpdriver import data
from qpdriver.exceptions import UENotFound
"""
self.def_hand_called = 0
self.traffic_steering_requests = 0
+ self.alarm_mgr = alarm.AlarmManager(self._mrc, "ric-xapp", "qp-driver")
+ self.alarm_sdl = None
def default_handler(self, summary, sbuf):
Function that processes messages for which no handler is defined
"""
self.def_hand_called += 1
- self.logger.warning("QP Driver received an unexpected message of type: {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
+ self.logger.warning("default_handler unexpected message type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
self.rmr_free(sbuf)
"""
This is the main handler for this xapp, which handles traffic steering requests.
Traffic steering requests predictions on a set of UEs.
- This app fetches a set of data, merges it together in a deterministic way,
- then sends a new message out to the QP predictor Xapp.
+ This app fetches a set of data from SDL, merges it together in a deterministic way,
+ then sends a new message to the QP predictor Xapp.
The incoming message that this function handles looks like:
- {“UEPredictionSet” : [“UEId1”,”UEId2”,”UEId3”]}
+ {"UEPredictionSet" : ["UEId1","UEId2","UEId3"]}
"""
self.traffic_steering_requests += 1
+ # we don't use rts here; free the buffer
+ self.rmr_free(sbuf)
+
ue_list = []
try:
req = json.loads(summary[rmr.RMR_MS_PAYLOAD]) # input should be a json encoded as bytes
ue_list = req["UEPredictionSet"]
+ self.logger.debug("steering_req_handler processing request for UE list {}".format(ue_list))
except (json.decoder.JSONDecodeError, KeyError):
- self.logger.debug("Received a TS Request but it was malformed!")
-
- # we don't use rts here; free this
- self.rmr_free(sbuf)
+ self.logger.warning("steering_req_handler failed to parse request: {}".format(summary[rmr.RMR_MS_PAYLOAD]))
+ return
+
+ if self._sdl.healthcheck():
+ # healthy, so clear the alarm if it was raised
+ if self.alarm_sdl:
+ self.logger.debug("steering_req_handler clearing alarm")
+ self.alarm_mgr.clear_alarm(self.alarm_sdl)
+ self.alarm_sdl = None
+ else:
+ # not healthy, so (re-)raise the alarm
+ self.logger.debug("steering_req_handler connection to SDL is not healthy, raising alarm")
+ if self.alarm_sdl:
+ self.alarm_mgr.reraise_alarm(self.alarm_sdl)
+ else:
+ self.alarm_sdl = self.alarm_mgr.create_alarm(1, alarm.AlarmSeverity.CRITICAL, "SDL failure")
+ self.alarm_mgr.raise_alarm(self.alarm_sdl)
+ self.logger.warning("steering_req_handler dropping request!")
+ return
# iterate over the UEs and send a request for each, if it is a valid UE, to QP
for ueid in ue_list:
payload = json.dumps(to_qpp).encode()
success = self.rmr_send(payload, 30001)
if not success:
- self.logger.debug("QP Driver was unable to send to QP!")
+ self.logger.warning("steering_req_handler failed to send to QP!")
except UENotFound:
- self.logger.debug("Received a TS Request for a UE that does not exist!")
+ self.logger.warning("steering_req_handler received a TS Request for a UE that does not exist!")
def start(thread=False):
setup(
name="qpdriver",
- version="1.0.9",
+ version="1.1.0",
packages=find_packages(exclude=["tests.*", "tests"]),
- author="Tommy Carpenter",
- description="QP Driver Xapp for traffic steering",
+ author="O-RAN-SC Community",
+ description="QP Driver Xapp for traffic steering use case",
url="https://gerrit.o-ran-sc.org/r/admin/repos/ric-app/qp-driver",
- install_requires=["ricxappframe>=1.1.1,<2.0.0"],
- entry_points={"console_scripts": ["start.py=qpdriver.main:start"]}, # adds a magical entrypoint for Docker
+ install_requires=["ricxappframe>=1.2.0,<2.0.0"],
+ entry_points={"console_scripts": ["start-qpd.py=qpdriver.main:start"]}, # adds a magical entrypoint for Docker
license="Apache 2.0",
data_files=[("", ["LICENSE.txt"])],
)
def test_init_xapp(monkeypatch, ue_metrics, cell_metrics_1, cell_metrics_2, cell_metrics_3, ue_metrics_with_bad_cell):
# monkeypatch post_init to set the data we want in SDL
# the metrics arguments are JSON (dict) objects
+
+ _original_post_init = main.post_init
+
def fake_post_init(self):
- self.def_hand_called = 0
- self.traffic_steering_requests = 0
+ _original_post_init(self)
self.sdl_set(data.UE_NS, "12345", json.dumps(ue_metrics).encode(), usemsgpack=False)
self.sdl_set(data.UE_NS, "8675309", json.dumps(ue_metrics_with_bad_cell).encode(), usemsgpack=False)
self.sdl_set(data.CELL_NS, "310-680-200-555001", json.dumps(cell_metrics_1).encode(), usemsgpack=False)
assert expected_result == {"12345": qpd_to_qp, "8675309": qpd_to_qp_bad_cell}
assert main.get_stats() == {"DefCalled": 1, "SteeringRequests": 4}
+ # break SDL and send traffic again
+ def sdl_healthcheck_fails(self):
+ return False
+ monkeypatch.setattr("ricxappframe.xapp_sdl.SDLWrapper.healthcheck", sdl_healthcheck_fails)
+ mock_ts_xapp.run()
+
+ # restore SDL and send traffic once more
+ def sdl_healthcheck_passes(self):
+ return True
+ monkeypatch.setattr("ricxappframe.xapp_sdl.SDLWrapper.healthcheck", sdl_healthcheck_passes)
+ mock_ts_xapp.run()
+
def teardown_module():
"""