000d3a5ad720800ca84c497478f8a9fb33fc2afe
[ric-app/qp-driver.git] / qpdriver / main.py
1 """
2 qpdriver entrypoint module
3 """
4 # ==================================================================================
5 #       Copyright (c) 2020 AT&T Intellectual Property.
6 #
7 #   Licensed under the Apache License, Version 2.0 (the "License");
8 #   you may not use this file except in compliance with the License.
9 #   You may obtain a copy of the License at
10 #
11 #          http://www.apache.org/licenses/LICENSE-2.0
12 #
13 #   Unless required by applicable law or agreed to in writing, software
14 #   distributed under the License is distributed on an "AS IS" BASIS,
15 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 #   See the License for the specific language governing permissions and
17 #   limitations under the License.
18 # ==================================================================================
19 import json
20 from os import getenv
21 from ricxappframe.xapp_frame import RMRXapp, rmr
22 from qpdriver import data
23 from qpdriver.exceptions import UENotFound
24
25 """
26 RMR Messages
27  #define TS_UE_LIST 30000
28  #define TS_QOE_PRED_REQ 30001
29 30000 is the message QPD receives, sends out 30001 to QP
30 """
31
32
33 rmr_xapp = None
34
35
36 def post_init(self):
37     self.def_hand_called = 0
38     self.traffic_steering_requests = 0
39
40
41 def default_handler(self, summary, sbuf):
42     self.def_hand_called += 1
43     self.logger.info("QP Driver received an unexpected message of type: {}, dropping.".format(summary[rmr.RMR_MS_MSG_TYPE]))
44     self.rmr_free(sbuf)
45
46
47 def steering_req_handler(self, summary, sbuf):
48     """
49     This is the main handler for this xapp, which handles the traffic steering requests.
50     Traffic steering requests predictions on a set of UEs.
51     QP Driver (this) fetches a set of data, merges it together in a deterministic way, then sends a new message out to the QP predictor Xapp.
52
53     The incoming message that this function handles looks like:
54         {“UEPredictionSet” : [“UEId1”,”UEId2”,”UEId3”]}
55     """
56     self.traffic_steering_requests += 1
57     ue_list = []
58     try:
59         req = json.loads(summary[rmr.RMR_MS_PAYLOAD])  # input should be a json encoded as bytes
60         ue_list = req["UEPredictionSet"]
61     except (json.decoder.JSONDecodeError, KeyError):
62         self.logger.debug("Received a TS Request but it was malformed!")
63
64     # we don't use rts here; free this
65     self.rmr_free(sbuf)
66
67     # iterate over the ues and send a request each, if it is a valid UE, to QPP
68     for ueid in ue_list:
69         try:
70             to_qpp = data.form_qp_pred_req(self, ueid)
71             payload = json.dumps(to_qpp).encode()
72             ok = self.rmr_send(payload, 30001)
73             if not ok:
74                 self.logger.debug("QP Driver was unable to send to QP Predictor!")
75         except UENotFound:
76             self.logger.debug("Received a TS Request for a UE that does not exist!")
77
78
79 def start(thread=False):
80     """
81     this is a convienence function that allows this xapp to run in Docker for "real" (no thread, real SDL)
82     but also easily modified for unit testing (e.g., use_fake_sdl)
83     the defaults for this function are for the Dockerized xapp.
84     """
85     global rmr_xapp
86     fake_sdl = getenv("USE_FAKE_SDL", None)
87     rmr_xapp = RMRXapp(default_handler, post_init=post_init, use_fake_sdl=True if fake_sdl else False)
88     rmr_xapp.register_callback(steering_req_handler, 30000)
89     rmr_xapp.run(thread)
90
91
92 def stop():
93     """
94     can only be called if thread=True when started
95     TODO: could we register a signal handler for Docker SIGTERM that calls this?
96     """
97     rmr_xapp.stop()
98
99
100 def get_stats():
101     # hacky for now, will evolve
102     return {"DefCalled": rmr_xapp.def_hand_called, "SteeringRequests": rmr_xapp.traffic_steering_requests}