--- /dev/null
+# ==================================================================================
+# Copyright (c) 2020 AT&T Intellectual Property.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ==================================================================================
+"""
+mock qp module
+
+RMR Messages:
+ #define TS_QOE_PRED_REQ 30001
+ #define TS_QOE_PREDICTION 30002
+30001 is the message type QP receives from the driver;
+sends out type 30002 which should be routed to TS.
+
+"""
+
+
+import os
+from mdclogpy import Logger
+from ricxappframe.xapp_frame import RMRXapp, rmr
+
+
+qp_xapp = None
+logger = Logger(name=__name__)
+
+
+def post_init(self):
+ self.predict_requests = 0
+ logger.debug("QP xApp started")
+
+
+def qp_default_handler(self, summary, sbuf):
+ logger.debug("default handler received message type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
+ # we don't use rts here; free this
+ self.rmr_free(sbuf)
+
+
+def qp_predict_handler(self, summary, sbuf):
+ logger.debug("predict handler received message type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
+ self.predict_requests += 1
+ # we don't use rts here; free this
+ self.rmr_free(sbuf)
+ # send a mock message
+ mock_msg = '{ "12345" : { "310-680-200-555001" : [ 2000000 , 1200000 ] , "310-680-200-555002" : [ 800000 , 400000 ] , "310-680-200-555003" : [ 800000 , 400000 ] } }'
+ ok = self.rmr_send(mock_msg.encode(), 30002)
+ if ok:
+ logger.debug("predict handler: sent message successfully")
+ else:
+ logger.warn("predict handler: failed to send message")
+
+
+def start(thread=False):
+ """
+ This is a convenience function that allows this xapp to run in Docker
+ for "real" (no thread, real SDL), but also easily modified for unit testing
+ (e.g., use_fake_sdl). The defaults for this function are for the Dockerized xapp.
+ """
+ logger.debug("QP xApp starting")
+ global qp_xapp
+ fake_sdl = os.environ.get("USE_FAKE_SDL", None)
+ qp_xapp = RMRXapp(qp_default_handler, post_init=post_init, use_fake_sdl=True if fake_sdl else False)
+ qp_xapp.register_callback(qp_predict_handler, 30001)
+ qp_xapp.run(thread)
+
+
+def stop():
+ """
+ can only be called if thread=True when started
+ TODO: could we register a signal handler for Docker SIGTERM that calls this?
+ """
+ qp_xapp.stop()
+
+
+def get_stats():
+ # hacky for now, will evolve
+ return {"PredictRequests": qp_xapp.predict_requests}