Mock QoE xApp that sends a constant prediction
[ric-app/qp.git] / qp / main.py
diff --git a/qp/main.py b/qp/main.py
new file mode 100644 (file)
index 0000000..2074c19
--- /dev/null
@@ -0,0 +1,86 @@
+# ==================================================================================
+#       Copyright (c) 2020 AT&T Intellectual Property.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+# ==================================================================================
+"""
+mock qp module
+
+RMR Messages:
+ #define TS_QOE_PRED_REQ 30001
+ #define TS_QOE_PREDICTION 30002
+30001 is the message type QP receives from the driver;
+sends out type 30002 which should be routed to TS.
+
+"""
+
+
+import os
+from mdclogpy import Logger
+from ricxappframe.xapp_frame import RMRXapp, rmr
+
+
+qp_xapp = None
+logger = Logger(name=__name__)
+
+
+def post_init(self):
+    self.predict_requests = 0
+    logger.debug("QP xApp started")
+
+
+def qp_default_handler(self, summary, sbuf):
+    logger.debug("default handler received message type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
+    # we don't use rts here; free this
+    self.rmr_free(sbuf)
+
+
+def qp_predict_handler(self, summary, sbuf):
+    logger.debug("predict handler received message type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
+    self.predict_requests += 1
+    # we don't use rts here; free this
+    self.rmr_free(sbuf)
+    # send a mock message
+    mock_msg = '{ "12345" : { "310-680-200-555001" : [ 2000000 , 1200000 ] , "310-680-200-555002" : [ 800000 , 400000 ] , "310-680-200-555003" : [ 800000 , 400000 ] } }'
+    ok = self.rmr_send(mock_msg.encode(), 30002)
+    if ok:
+        logger.debug("predict handler: sent message successfully")
+    else:
+        logger.warn("predict handler: failed to send message")
+
+
+def start(thread=False):
+    """
+    This is a convenience function that allows this xapp to run in Docker
+    for "real" (no thread, real SDL), but also easily modified for unit testing
+    (e.g., use_fake_sdl). The defaults for this function are for the Dockerized xapp.
+    """
+    logger.debug("QP xApp starting")
+    global qp_xapp
+    fake_sdl = os.environ.get("USE_FAKE_SDL", None)
+    qp_xapp = RMRXapp(qp_default_handler, post_init=post_init, use_fake_sdl=True if fake_sdl else False)
+    qp_xapp.register_callback(qp_predict_handler, 30001)
+    qp_xapp.run(thread)
+
+
+def stop():
+    """
+    can only be called if thread=True when started
+    TODO: could we register a signal handler for Docker SIGTERM that calls this?
+    """
+    qp_xapp.stop()
+
+
+def get_stats():
+    # hacky for now, will evolve
+    return {"PredictRequests": qp_xapp.predict_requests}