1 # ==================================================================================
2 # Copyright (c) 2020 AT&T Intellectual Property.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 # ==================================================================================
20 #define TS_QOE_PRED_REQ 30001
21 #define TS_QOE_PREDICTION 30002
22 30001 is the message type QP receives from the driver;
23 sends out type 30002 which should be routed to TS.
29 from mdclogpy import Logger
30 from ricxappframe.xapp_frame import RMRXapp, rmr
32 # pylint: disable=invalid-name
34 logger = Logger(name=__name__)
39 Function that runs when xapp initialization is complete
41 self.predict_requests = 0
42 logger.debug("QP xApp started")
45 def qp_default_handler(self, summary, sbuf):
47 Function that processes messages for which no handler is defined
49 logger.debug("default handler received message type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
50 # we don't use rts here; free this
54 def qp_predict_handler(self, summary, sbuf):
56 Function that processes messages for type 30001
58 logger.debug("predict handler received message type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
59 self.predict_requests += 1
60 # we don't use rts here; free this
63 mock_msg = '{ "12345" : { "310-680-200-555001" : [ 2000000 , 1200000 ], '\
64 ' "310-680-200-555002" : [ 800000 , 400000 ], '\
65 ' "310-680-200-555003" : [ 800000 , 400000 ] } }'
66 success = self.rmr_send(mock_msg.encode(), 30002)
68 logger.debug("predict handler: sent message successfully")
70 logger.warning("predict handler: failed to send message")
73 def start(thread=False):
75 This is a convenience function that allows this xapp to run in Docker
76 for "real" (no thread, real SDL), but also easily modified for unit testing
77 (e.g., use_fake_sdl). The defaults for this function are for the Dockerized xapp.
79 logger.debug("QP xApp starting")
81 fake_sdl = os.environ.get("USE_FAKE_SDL", None)
82 qp_xapp = RMRXapp(qp_default_handler, rmr_port=4560, post_init=post_init, use_fake_sdl=bool(fake_sdl))
83 qp_xapp.register_callback(qp_predict_handler, 30001)
89 can only be called if thread=True when started
90 TODO: could we register a signal handler for Docker SIGTERM that calls this?
98 hacky for now, will evolve
101 return {"PredictRequests": qp_xapp.predict_requests}