-"""
-qpdriver entrypoint module
-"""
# ==================================================================================
# Copyright (c) 2020 AT&T Intellectual Property.
#
# See the License for the specific language governing permissions and
# limitations under the License.
# ==================================================================================
-import json
-from os import getenv
-from ricxappframe.xapp_frame import RMRXapp, rmr
-from qpdriver import data
-from qpdriver.exceptions import UENotFound
-
"""
+qpdriver entrypoint module
+
RMR Messages
#define TS_UE_LIST 30000
#define TS_QOE_PRED_REQ 30001
30000 is the message type QPD receives; sends out type 30001, which should be routed to QP.
"""
+import json
+from os import getenv
+from ricxappframe.xapp_frame import RMRXapp, rmr
+from qpdriver import data
+from qpdriver.exceptions import UENotFound
+
+# pylint: disable=invalid-name
rmr_xapp = None
def post_init(self):
+ """
+ Function that runs when xapp initialization is complete
+ """
self.def_hand_called = 0
self.traffic_steering_requests = 0
def default_handler(self, summary, sbuf):
+ """
+ Function that processes messages for which no handler is defined
+ """
self.def_hand_called += 1
- self.logger.info("QP Driver received an unexpected message of type: {}, dropping.".format(summary[rmr.RMR_MS_MSG_TYPE]))
+ self.logger.warning("QP Driver received an unexpected message of type: {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
self.rmr_free(sbuf)
try:
to_qpp = data.form_qp_pred_req(self, ueid)
payload = json.dumps(to_qpp).encode()
- ok = self.rmr_send(payload, 30001)
- if not ok:
+ success = self.rmr_send(payload, 30001)
+ if not success:
self.logger.debug("QP Driver was unable to send to QP!")
except UENotFound:
self.logger.debug("Received a TS Request for a UE that does not exist!")
"""
global rmr_xapp
fake_sdl = getenv("USE_FAKE_SDL", None)
- rmr_xapp = RMRXapp(default_handler, post_init=post_init, use_fake_sdl=True if fake_sdl else False)
+ rmr_xapp = RMRXapp(default_handler, rmr_port=4560, post_init=post_init, use_fake_sdl=bool(fake_sdl))
rmr_xapp.register_callback(steering_req_handler, 30000)
rmr_xapp.run(thread)
def get_stats():
- # hacky for now, will evolve
- return {"DefCalled": rmr_xapp.def_hand_called, "SteeringRequests": rmr_xapp.traffic_steering_requests}
+ """
+ hacky for now, will evolve
+ """
+ return {"DefCalled": rmr_xapp.def_hand_called,
+ "SteeringRequests": rmr_xapp.traffic_steering_requests}