Use blocking get call w/ timeout to read msg queue 17/3617/5
authorLott, Christopher (cl778h) <cl778h@att.com>
Thu, 7 May 2020 12:39:49 +0000 (08:39 -0400)
committerLott, Christopher (cl778h) <cl778h@att.com>
Thu, 7 May 2020 18:54:53 +0000 (14:54 -0400)
Add parameters to the queue get method in the xapp_frame loop that reads
messages so it waits for a message to arrive and occasionally checks for
the end-loop flag, instead of spinning the CPU at 100% while waiting.

Upgrade all to use the latest RMR, version 4.0.5.

Tweak the example xapps to emit their names in log messages.

Improve documentation especially the package overview shown at PyPI.

Issue-ID: RIC-354
Signed-off-by: Lott, Christopher (cl778h) <cl778h@att.com>
Change-Id: I08692e6ef60d199cb0b92c1c99740ae808b8885c

15 files changed:
Dockerfile-Unit-Test
docs/developer-guide.rst
docs/installation-guide.rst
docs/overview.rst
docs/release-notes.rst
examples/Dockerfile-Ping
examples/Dockerfile-Pong
examples/ping_xapp.py
examples/pong_xapp.py
ricxappframe/rmr/helpers.py
ricxappframe/xapp_frame.py
ricxappframe/xapp_rmr.py
ricxappframe/xapp_sdl.py
rmr-version.yaml
setup.py

index e3aa77f..2cc7e7a 100644 (file)
@@ -20,7 +20,7 @@ FROM python:3.8-alpine
 RUN apk update && apk add gcc musl-dev
 
 # copy rmr libraries from builder image in lieu of an Alpine package
-COPY --from=nexus3.o-ran-sc.org:10002/o-ran-sc/bldr-alpine3-rmr:4.0.2 /usr/local/lib64/librmr* /usr/local/lib64/
+COPY --from=nexus3.o-ran-sc.org:10002/o-ran-sc/bldr-alpine3-rmr:4.0.5 /usr/local/lib64/librmr* /usr/local/lib64/
 
 # Upgrade pip, install tox
 RUN pip install --upgrade pip && pip install tox
index 85a3cfd..06f01b7 100644 (file)
@@ -4,34 +4,56 @@
 Developer Guide
 ===============
 
-.. contents::
-   :depth: 3
-   :local:
+Tech Stack
+----------
+
+The framework requires Python version 3.7 or later, and depends on
+these packages provided by the O-RAN-SC project and third parties:
+
+* msgpack
+* mdclogpy
+* ricsdl
+
 
 Version bumping the framework
 -----------------------------
 
 This project follows semver. When changes are made, the versions are in:
 
-1) ``docs/release-notes.rst``
-
-2) ``setup.py``
+#. ``docs/release-notes.rst``
+#. ``setup.py``
 
 Version bumping RMR
 -------------------
 
-These items in this repo must be kept in sync:
-* Dockerfile-Unit-Test
-* examples/Dockerfile-Ping
-* examples/Dockerfile-Pong
-* ``rmr-version.yaml`` controls what rmr gets installed for unit testing in Jenkins
+These items in this repo must be kept in sync with the RMR version:
+
+#. Dockerfile-Unit-Test
+#. examples/Dockerfile-Ping
+#. examples/Dockerfile-Pong
+#. ``rmr-version.yaml`` controls what version of RMR is installed for
+   unit testing in Jenkins CI
 
 
 Unit Testing
 ------------
 
-You can run the unit tests in Docker to avoid installing RMR locally:
+Running the unit tests requires the python packages ``tox`` and ``pytest``.
+
+The RMR library is also required during unit tests. If running directly from tox
+(outside a Docker container), install RMR according to its instructions.
+
+Upon completion, view the test coverage like this:
+
+::
+
+   tox
+   open htmlcov/index.html
+
+Alternatively, if you cannot install RMR locally, you can run the unit
+tests in Docker. This is somewhat less nice because you don't get the
+pretty HTML report on coverage.
 
 ::
 
-   docker build -f Dockerfile-Unit-Test .
+   docker build  --no-cache -f Dockerfile-Unit-Test .
index 0785c40..088490f 100755 (executable)
@@ -6,12 +6,8 @@
 Installation Guide
 ==================
 
-.. contents::
-   :depth: 3
-   :local:
-
-The `ricxappframe` is available in `PyPi <https://pypi.org/project/ricxappframe>`_ .
-Use pip to install the version you want.
+The `ricxappframe` is available in `PyPI <https://pypi.org/project/ricxappframe>`_ .
+Use pip to install the version required.
 
 Installing the ricxappframe package does NOT install the required RMR system library,
 a shared object written in C and available for most Linux systems.
index ffac38f..d087334 100644 (file)
 Framework Overview
 ==================
 
-This library is a framework for writing Xapps in python.
-There may or may not be many Xapps written in python; however rmr, sdl, and logging libraries all exist for python, and this framework brings them together.
+This package is a framework for writing Xapps in python. The framework
+reduces the amount of code required in an Xapp by providing common
+features needed by all Python-based Xapps including communication with
+the RIC message router (RMR) and the Shared Data Layer (SDL).
 
-There are (at the time of writing) two "kinds" of Xapps one can instantiate with this framework that model "push" (RMR Xapps) and "pull" (General Xapps), as described below.
+The framework was designed to suport many types of Xapps, including
+applications that are purely reactive to RMR messages, and
+applications that initiate actions according to other criteria.
 
-RMR Xapps
----------
-This class of Xapps are purely reactive to rmr; data is always "pushed" to it via rmr.
-That is, every time the Xapp receives an rmr message, they do something, then wait for the next message to arrive, end never need to execute functionality at another time (if they do, use the next class).
-This is represented by a series of callbacks that get registered to receive rmr message types.
-Every time an rmr message arrives, the user callback for that message type is invoked, or if the user has not registered a callback for that type, their default callback (mandatory) is invoked.
-An analogy of this is AWS Lambda: "execute this code every time an event comes in" (the code to execute can depend on the type of event).
+Reactive Xapps
+--------------
+
+A reactive Xapp acts on messages that are delivered (pushed) via RMR.
+The Xapp only takes action upon receipt of an RMR message. The Xapp
+never takes action at another time.
+
+This type of application is constructed by creating callback functions
+and registering them with the framework by message type.  When an RMR
+message arrives, the appropriate callback is invoked.  An Xapp may
+define and register a separate callback for each expected message
+type.  Every Xapp must define a default callback function, which is
+invoked when a message arrives for which no type-specific callback was
+registered.  An analogy of this is AWS Lambda: "execute this code
+every time an event comes in" (the code to execute can depend on the
+type of event).
 
 General Xapps
 -------------
-In this class of Xapp the user simply provides a function that gets invoked, and typically that function has a `while (something)` in it.
-If the function returns, the Xapp will stop.
-In this type of Xapp, the Xapp must "pull" it's own data, typically from SDL, rmr (ie query another component for data), or other sources.
-The framework is "lighter" in this case then the former; it sets up an SDL connection, an rmr thread, and then calls the client provided function.
-This is to be used for Xapps that are not purely event driven.
-
-RMR Threading in the framework
-------------------------------
-NOTE: this is an implementation detail!
-We expose this for transparency but most users will not have to worry about this.
-
-In both types of Xapp, the framework launches a seperate thread whose only job is to read from rmr and deposit all messages (and their summaries) into a thread safe queue.
-When the client Xapp reads using the framework (this read is done by the framework itself in the RMR Xapp, but by the client in a general Xapp), the read is done from the queue.
-The framework is implemented this way so that a long running client function (e.g., consume) cannot block rmr reads.
-This is important because rmr is *not* a persistent message bus, if any rmr client does not read "fast enough", messages can be lost.
-So in this framework the client code is not in the same thread as the rmr reads, so that long running client code can never lead to lost messages.
-
-In the case of RMR Xapps, there are currently 3 potential threads; the thread that reads from rmr directly, and the user can optionally have the rmr queue read run in a thread, returning execution back to the user thread.
-The default is only two threads however, where `.run` does not return back execution and the user code is "finished" at that point.
+
+A general Xapp acts according to its own criteria, which may include
+receipt of RMR messages.
+
+This type of application is constructed by creating a function that
+gets invoked by the framework.  Typically that function contains a
+`while (something)` event loop.  If the function returns, the Xapp
+stops.  In this type of Xapp, the Xapp must fetch its own data, either
+from RMR, SDL or other source.  The framework does less work for a
+general application compared to a reactive application.  The framework
+sets up an RMR thread and an SDL connection, then invokes the
+client-provided function.
+
+Threading in the Framework
+--------------------------
+
+RMR interactions are processed in a thread started by the framework.
+This implementation detail is documented here for transparency, but
+most users will not have to worry about this.
+
+In both types of Xapp, the framework launches a separate thread whose
+only job is to read from RMR and deposit all messages (and their
+summaries) into a thread-safe queue.  When the client Xapp reads from
+RMR using the framework (this read is done by the framework itself in
+the RMR Xapp, but by the client in a general Xapp), the read is done
+from the framework-managed queue.  The framework is implemented this
+way so that a long-running client function (e.g., consume) will not
+block RMR reads.  This is important because RMR is *not* a persistent
+message bus, if an RMR client does not read fast enough, messages can
+be lost.  So in this framework the client code is not in the same
+thread as the RMR reads, to ensure that long-running client code will
+not cause message loss.
+
+In the case of RMR Xapps, there are currently 3 potential threads; the
+thread that reads from RMR directly, and the user can optionally have
+the RMR queue read run in a thread, returning execution back to the
+user thread.  The default is only two threads however, where `.run`
+does not return back execution and the user code is finished at that
+point.
 
 Healthchecks
 ------------
-RMRXapps come with a default rmr healthcheck probe handler.
-When the RMRXapp is sent an rmr healthcheck, it will check to see if the rmr thread is healthy (well it can't even reply if it's not!), and that the SDL connection is healthy.
-The Xapp responds accordingly.
-The user can override this default handler by registering a new callback to the appropriate message type.
 
-General Xapps must handle healthchecks when they read their rmr mailbox, since there is no notion of handlers.
+The framework provides a default RMR healthcheck probe handler for
+reactive Xapps.  When an RMR healthcheck message arrives, this handler
+checks that the RMR thread is healthy (of course the Xapp cannot even
+reply if the thread is not healthy!), and that the SDL connection is
+healthy.  The handler responds accordingly via RMA.  The Xapp can
+override this probe handler by registering a new callback for the
+appropriate message type.
+
+The framework provides no healthcheck handler for general Xapps. Those
+applications must handle healthcheck probe messages appropriately when
+they read their RMR mailboxes.
 
-There is no http service (Currently) in the framework therefore there are no http healthchecks.
+There is no http service in the framework, so there is no support for
+HTTP-based healthcheck probes, such as what a deployment manager like
+Kubernetes may use.
 
 Examples
 --------
-There are two examples in the `examples` directory; `ping` which is a general Xapp, and `pong` which is an RMR Xapp.
-Ping sends a message, pong receives the message and use rts to reply.
-Ping then reads it's own mailbox and demonstrates other functionality.
-The highlight to note is that `pong` is purely reactive, it only does anything when a message is received.
-Ping uses a general that also happens to read it's rmr mailbox inside.
+
+Two sample Xapps using this framework are provided in the `examples`
+directory of the git repository.  The first, `ping`, is a general Xapp
+that defines a main function that reads its RMR mailbox in addition to
+other work.  The second, `pong`, is a reactive Xapp that only takes
+action when a message is received.
+
+To run a demonstration, build the Docker images for both examples
+using the supplied Dockerfiles.  Then start the Pong container (the
+listener) followed by the Ping container (the sender).  The Ping
+application sends a message, the pong application receives the message
+and use RMR's return-to-sender feature to reply.  Ping then reads its
+own mailbox and demonstrates other functionality.
index 8a5001f..74348c9 100644 (file)
@@ -11,6 +11,12 @@ The format is based on `Keep a Changelog <http://keepachangelog.com/>`__
 and this project adheres to `Semantic Versioning <http://semver.org/>`__.
 
 
+[1.1.1] - 2020-05-07
+--------------------
+* Use timeout on queue get method to avoid 100% CPU usage (`RIC-354 <https://jira.o-ran-sc.org/browse/RIC-354>`_)
+* Upgrade to RMR version 4.0.5
+
+
 [1.1.0] - 2020-05-06
 --------------------
 * Use RMR timeout on receive to avoid 100% CPU usage (`RIC-354 <https://jira.o-ran-sc.org/browse/RIC-354>`_)
index 66e3f0a..40b19d8 100644 (file)
@@ -17,7 +17,7 @@
 FROM python:3.8-alpine
 
 # copy rmr libraries from builder image in lieu of an Alpine package
-COPY --from=nexus3.o-ran-sc.org:10002/o-ran-sc/bldr-alpine3-rmr:4.0.2 /usr/local/lib64/librmr* /usr/local/lib64/
+COPY --from=nexus3.o-ran-sc.org:10002/o-ran-sc/bldr-alpine3-rmr:4.0.5 /usr/local/lib64/librmr* /usr/local/lib64/
 # RMR setup
 RUN mkdir -p /opt/route/
 COPY test_route.rt /opt/route/test_route.rt
index 5d09213..8013e90 100644 (file)
@@ -17,7 +17,7 @@
 FROM python:3.8-alpine
 
 # copy rmr libraries from builder image in lieu of an Alpine package
-COPY --from=nexus3.o-ran-sc.org:10002/o-ran-sc/bldr-alpine3-rmr:4.0.2 /usr/local/lib64/librmr* /usr/local/lib64/
+COPY --from=nexus3.o-ran-sc.org:10002/o-ran-sc/bldr-alpine3-rmr:4.0.5 /usr/local/lib64/librmr* /usr/local/lib64/
 # RMR setup
 RUN mkdir -p /opt/route/
 COPY test_route.rt /opt/route/test_route.rt
index aafebb8..c247ee3 100644 (file)
@@ -29,10 +29,10 @@ def entry(self):
     number = 0
     while True:
         # test healthcheck
-        print("Healthy? {}".format(xapp.healthcheck()))
+        print("ping is healthy? {}".format(xapp.healthcheck()))
 
         # rmr send to default handler
-        self.rmr_send(json.dumps({"sup": number}).encode(), 6660666)
+        self.rmr_send(json.dumps({"ping": number}).encode(), 6660666)
 
         # rmr send 60000, should trigger registered callback
         val = json.dumps({"test_send": number}).encode()
@@ -40,18 +40,19 @@ def entry(self):
         number += 1
 
         # store it in SDL and read it back; delete and read
-        self.sdl_set(my_ns, "numba", number)
-        self.logger.info(self.sdl_get(my_ns, "numba"))
-        self.logger.info(self.sdl_find_and_get(my_ns, "num"))
-        self.sdl_delete(my_ns, "numba")
-        self.logger.info(self.sdl_get(my_ns, "numba"))
+        self.sdl_set(my_ns, "ping", number)
+        self.logger.info(self.sdl_get(my_ns, "ping"))
+        self.logger.info(self.sdl_find_and_get(my_ns, "pin"))
+        self.sdl_delete(my_ns, "ping")
+        self.logger.info(self.sdl_get(my_ns, "ping"))
 
         # rmr receive
         for (summary, sbuf) in self.rmr_get_messages():
-            # summary is a dict that contains bytes so we can't use json.dumps on it so we have no good way to turn this into a string to use the logger unfortunately
+            # summary is a dict that contains bytes so we can't use json.dumps on it
+            # so we have no good way to turn this into a string to use the logger unfortunately
             # print is more "verbose" than the ric logger
             # if you try to log this you will get: TypeError: Object of type bytes is not JSON serializable
-            print(summary)
+            print("ping: {0}".format(summary))
             self.rmr_free(sbuf)
 
         time.sleep(2)
index 863f635..23db376 100644 (file)
@@ -28,9 +28,9 @@ def post_init(_self):
 
 def sixtyh(self, summary, sbuf):
     """callback for 60000"""
-    self.logger.info("registered 60000 handler called!")
+    self.logger.info("pong registered 60000 handler called!")
     # see comment in ping about this; bytes does not work with the ric mdc logger currently
-    print(summary)
+    print("pong 60000 handler received: {0}".format(summary))
     jpay = json.loads(summary[rmr.RMR_MS_MSG_PAYLOAD])
     self.rmr_rts(sbuf, new_payload=json.dumps({"ACK": jpay["test_send"]}).encode(), new_mtype=60001, retries=100)
     self.rmr_free(sbuf)
@@ -38,8 +38,8 @@ def sixtyh(self, summary, sbuf):
 
 def defh(self, summary, sbuf):
     """default callback"""
-    self.logger.info("default handler called!")
-    print(summary)
+    self.logger.info("pong default handler called!")
+    print("pong default handler received: {0}".format(summary))
     self.rmr_free(sbuf)
 
 
index bc7a4fb..3cf1c54 100644 (file)
 from ricxappframe.rmr import rmr
 
 
-def rmr_rcvall_msgs(mrc, pass_filter=[], timeout=0):
+def rmr_rcvall_msgs(mrc, pass_filter=None, timeout=0):
     """
     Assembles an array of all messages which can be received without blocking
     (but see the timeout parameter).  Effectively drains the message queue if
     RMR is started in mt-call mode, or draining any waiting TCP buffers.  If
     the pass_filter parameter is supplied, it is treated as one or more
     message types to accept (pass through). Using the default, an empty list,
-    results in capturing all messages. if the timeout parameter is supplied,
-    this call may block up to that number of milliseconds waiting for a
-    message to arrive. Using the default, zero, results in non-blocking
-    no-wait behavior.
+    results in capturing all messages. If the timeout parameter is supplied
+    and is not zero, this call may block up to that number of milliseconds
+    waiting for a message to arrive. Using the default, zero, results in
+    non-blocking no-wait behavior.
 
     Parameters
     ----------
@@ -60,14 +60,14 @@ def rmr_rcvall_msgs(mrc, pass_filter=[], timeout=0):
         if summary[rmr.RMR_MS_MSG_STATUS] != "RMR_OK":  # ok indicates msg received, stop on all other states
             break
 
-        if len(pass_filter) == 0 or summary[rmr.RMR_MS_MSG_TYPE] in pass_filter:  # no filter, or passes; capture it
+        if pass_filter is None or len(pass_filter) == 0 or summary[rmr.RMR_MS_MSG_TYPE] in pass_filter:  # no filter, or passes; capture it
             new_messages.append(summary)
 
     rmr.rmr_free_msg(mbuf)  # free the single buffer to avoid leak
     return new_messages
 
 
-def rmr_rcvall_msgs_raw(mrc, pass_filter=[], timeout=0):
+def rmr_rcvall_msgs_raw(mrc, pass_filter=None, timeout=0):
     """
     Same as rmr_rcvall_msgs, but answers tuples with the raw sbuf.
     Useful if return-to-sender (rts) functions are required.
@@ -101,7 +101,7 @@ def rmr_rcvall_msgs_raw(mrc, pass_filter=[], timeout=0):
             rmr.rmr_free_msg(mbuf)  # free the failed-to-receive buffer
             break
 
-        if len(pass_filter) == 0 or mbuf.contents.mtype in pass_filter:  # no filter, or passes; capture it
+        if pass_filter is None or len(pass_filter) == 0 or mbuf.contents.mtype in pass_filter:  # no filter, or passes; capture it
             new_messages.append((summary, mbuf))  # caller is responsible for freeing the buffer
         else:
             rmr.rmr_free_msg(mbuf)  # free the filtered-out message buffer
index b81804e..4321852 100644 (file)
@@ -19,6 +19,7 @@ Framework for python xapps
 Framework here means Xapp classes that can be subclassed
 """
 
+import queue
 from threading import Thread
 from ricxappframe import xapp_rmr
 from ricxappframe.rmr import rmr
@@ -48,16 +49,20 @@ class _BaseXapp:
             port to listen on
 
         rmr_wait_for_ready: bool (optional)
-            if this is True, then init waits until rmr is ready to send, which includes having a valid routing file.
-            this can be set to False if the client only wants to *receive only*
+
+            if this is True, then init waits until rmr is ready to send, which
+            includes having a valid routing file. This can be set to
+            False if the client only wants to *receive only*.
 
         use_fake_sdl: bool (optional)
-            if this is True, it uses dbaas' "fake dict backend" instead of Redis or other backends.
-            Set this to true when developing your xapp or during unit testing to completely avoid needing a dbaas running or any network at all
+            if this is True, it uses dbaas' "fake dict backend" instead
+            of Redis or other backends. Set this to true when developing
+            your xapp or during unit testing to completely avoid needing
+            a dbaas running or any network at all.
 
         post_init: function (optional)
-            runs this user provided function after the base xapp is initialized
-            it's signature should be post_init(self)
+            runs this user provided function after the base xapp is
+            initialized; its signature should be post_init(self)
         """
         # PUBLIC, can be used by xapps using self.(name):
         self.logger = Logger(name=__name__)
@@ -77,9 +82,11 @@ class _BaseXapp:
 
     def rmr_get_messages(self):
         """
-        Returns a generator iterable over all items in the queue that have not yet been read by the client xapp.
-        Each item is a tuple (S, sbuf) where S is a message summary dict and sbuf is the raw message.
-        The caller MUST call rmr.rmr_free_msg(sbuf) when finished with each sbuf to prevent memory leaks!
+        Returns a generator iterable over all items in the queue that
+        have not yet been read by the client xapp. Each item is a tuple
+        (S, sbuf) where S is a message summary dict and sbuf is the raw
+        message. The caller MUST call rmr.rmr_free_msg(sbuf) when
+        finished with each sbuf to prevent memory leaks!
         """
         while not self._rmr_loop.rcv_queue.empty():
             (summary, sbuf) = self._rmr_loop.rcv_queue.get()
@@ -116,10 +123,10 @@ class _BaseXapp:
 
     def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
         """
-        Allows the xapp to return to sender, possibly adjusting the payload and message type before doing so
-
-        This does NOT free the sbuf for the caller as the caller may wish to perform multiple rts per buffer.
-        The client needs to free.
+        Allows the xapp to return to sender, possibly adjusting the
+        payload and message type before doing so.  This does NOT free
+        the sbuf for the caller as the caller may wish to perform
+        multiple rts per buffer. The client needs to free.
 
         Parameters
         ----------
@@ -130,7 +137,8 @@ class _BaseXapp:
         new_mtype: int (optional)
             New message type (replaces the received message)
         retries: int (optional)
-            Number of times to retry at the application level before excepting RMRFailure
+            Number of times to retry at the application level before
+            throwing exception RMRFailure
 
         Returns
         -------
@@ -147,9 +155,12 @@ class _BaseXapp:
 
     def rmr_free(self, sbuf):
         """
-        Free an rmr message buffer after use
+        Frees an rmr message buffer after use
+
+        Note: this does not need to be a class method, self is not
+        used. However if we break it out as a function we need a home
+        for it.
 
-        Note: this does not need to be a class method, self is not used. However if we break it out as a function we need a home for it.
         Parameters
         ----------
         sbuf: ctypes c_void_p
@@ -158,8 +169,10 @@ class _BaseXapp:
         rmr.rmr_free_msg(sbuf)
 
     # SDL
-    # NOTE, even though these are passthroughs, the seperate SDL wrapper is useful for other applications like A1.
-    # Therefore, we don't embed that SDLWrapper functionality here so that it can be instantiated on it's own.
+    # NOTE, even though these are passthroughs, the seperate SDL wrapper
+    # is useful for other applications like A1. Therefore, we don't
+    # embed that SDLWrapper functionality here so that it can be
+    # instantiated on its own.
 
     def sdl_set(self, ns, key, value, usemsgpack=True):
         """
@@ -244,10 +257,12 @@ class _BaseXapp:
 
     def stop(self):
         """
-        cleans up and stops the xapp rmr thread (currently)
-        This is critical for unit testing as pytest will never return if the thread is running.
+        cleans up and stops the xapp rmr thread (currently). This is
+        critical for unit testing as pytest will never return if the
+        thread is running.
 
-        TODO: can we register a ctrl-c handler so this gets called on ctrl-c? Because currently two ctrl-c are needed to stop
+        TODO: can we register a ctrl-c handler so this gets called on
+        ctrl-c? Because currently two ctrl-c are needed to stop.
         """
         self._rmr_loop.stop()
 
@@ -257,8 +272,10 @@ class _BaseXapp:
 
 class RMRXapp(_BaseXapp):
     """
-    Represents an xapp that is purely driven by rmr messages (i.e., when messages are received, the xapp does something
-    When run is called, the xapp framework waits for rmr messages, and calls the client provided consume callback on every one
+    Represents an xapp that is purely driven by RMR messages; i.e., when
+    messages are received, the xapp does something. When run is called,
+    the xapp framework waits for rmr messages, and calls the
+    client-provided consume callback on every one.
     """
 
     def __init__(self, default_handler, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
@@ -266,15 +283,16 @@ class RMRXapp(_BaseXapp):
         Parameters
         ----------
         default_handler: function
-            a function with the signature (summary, sbuf) to be called when a message of type message_type is received
-            summary: dict
-                the rmr message summary
-            sbuf: ctypes c_void_p
-                Pointer to an rmr message buffer. The user must call free on this when done.
-
+            a function with the signature (summary, sbuf) to be called
+            when a message of type message_type is received.
+        summary: dict
+            the rmr message summary
+        sbuf: ctypes c_void_p
+            Pointer to an rmr message buffer. The user must call free on
+            this when done.
         post_init: function (optional)
-            optionally runs this function after the app initializes and before the run loop
-            it's signature should be post_init(self)
+            optionally runs this function after the app initializes and
+        before the run loop; its signature should be post_init(self)
 
         For the other parameters, see _BaseXapp
         """
@@ -292,7 +310,8 @@ class RMRXapp(_BaseXapp):
 
         # register a default healthcheck handler
         # this default checks that rmr is working and SDL is working
-        # the user can override this and register their own handler if they wish since the "last registered callback wins".
+        # the user can override this and register their own handler
+        # if they wish since the "last registered callback wins".
         def handle_healthcheck(self, summary, sbuf):
             ok = self.healthcheck()
             payload = b"OK\n" if ok else b"ERROR [RMR or SDL is unhealthy]\n"
@@ -308,11 +327,12 @@ class RMRXapp(_BaseXapp):
         Parameters
         ----------
         handler: function
-            a function with the signature (summary, sbuf) to be called when a message of type message_type is received
-            summary: dict
-                the rmr message summary
-            sbuf: ctypes c_void_p
-                Pointer to an rmr message buffer. The user must call free on this when done.
+            a function with the signature (summary, sbuf) to be called
+            when a message of type message_type is received
+        summary: dict
+            the rmr message summary
+        sbuf: ctypes c_void_p
+            Pointer to an rmr message buffer. The user must call free on this when done.
 
         message:type: int
             the message type to look for
@@ -323,25 +343,30 @@ class RMRXapp(_BaseXapp):
 
     def run(self, thread=False):
         """
-        This function should be called when the client xapp is ready to wait for their handlers to be called on received messages
+        This function should be called when the client xapp is ready to
+        wait for its handlers to be called on received messages.
 
         Parameters
         ----------
         thread: bool (optional)
-            if thread is True, execution is returned to caller and the queue read loop is executed in a thread.
-            The thread can be stopped using .stop()
-            if False, execution is not returned and the framework loops
+            If True, a thread is started to run the queue read/dispatch loop
+            and execution is returned to caller; the thread can be stopped
+            by calling .stop(). If False (the default), execution is not
+            returned and the framework loops forever.
         """
 
         def loop():
             while self._keep_going:
-                if not self._rmr_loop.rcv_queue.empty():
-                    (summary, sbuf) = self._rmr_loop.rcv_queue.get()
+                try:
+                    (summary, sbuf) = self._rmr_loop.rcv_queue.get(block=True, timeout=5)
                     # dispatch
                     func = self._dispatch.get(summary[rmr.RMR_MS_MSG_TYPE], None)
                     if not func:
                         func = self._default_handler
                     func(self, summary, sbuf)
+                except queue.Empty:
+                    # the get timed out
+                    pass
 
         if thread:
             Thread(target=loop).start()
@@ -350,16 +375,17 @@ class RMRXapp(_BaseXapp):
 
     def stop(self):
         """
-        stops the rmr xapp completely.
+        Sets the flag to end the dispatch loop.
         """
         super().stop()
-        self.logger.debug("Stopping queue reading thread..")
+        self.logger.debug("Setting flag to end framework work loop.")
         self._keep_going = False
 
 
 class Xapp(_BaseXapp):
     """
-    Represents an xapp where the client provides a generic function to call, which is mostly likely a loop-forever loop
+    Represents an xapp where the client provides a generic function to
+    call, which is mostly likely a loop-forever loop.
     """
 
     def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
@@ -367,8 +393,8 @@ class Xapp(_BaseXapp):
         Parameters
         ----------
         entrypoint: function
-            this function is called when the xapp runs; this is the user code
-            it's signature should be function(self)
+            this function is called when the xapp runs; this is the user code.
+            its signature should be function(self)
 
         For the other parameters, see _BaseXapp
         """
@@ -378,8 +404,10 @@ class Xapp(_BaseXapp):
 
     def run(self):
         """
-        This function should be called when the client xapp is ready to start their code
+        This function should be called when the client xapp is ready to
+        start their code.
         """
         self._entrypoint(self)
 
-    # there is no need for stop currently here (base has, and nothing special to do here)
+    # there is no need for stop currently here (base has, and nothing
+    # special to do here)
index 93eaeeb..2a35b57 100644 (file)
@@ -34,14 +34,15 @@ class RmrLoop:
     """
     Class represents an RMR loop that constantly reads from RMR.
 
-    Note, we use a queue here, and a thread, rather than the xapp frame just looping
-    and calling consume, so that a possibly slow running consume function does not
-    block the reading of new messages
+    Note, we use a queue here, and a thread, rather than the xapp
+    frame just looping and calling consume, so that a possibly slow
+    running consume function does not block the reading of new messages.
     """
 
     def __init__(self, port, wait_for_ready=True):
         """
-        sets up RMR, then launches a thread that reads and injects messages into a queue.
+        sets up RMR, then launches a thread that reads and injects
+        messages into a queue.
 
         Parameters
         ----------
@@ -49,9 +50,9 @@ class RmrLoop:
             port to listen on
 
         wait_for_ready: bool (optional)
-            If True, then this function hangs until RMR is ready to send, which includes
-            having a valid routing file. This can be set to False if the client only wants
-            to *receive only*.
+            If True, then this function hangs until RMR is ready to
+            send, which includes having a valid routing file. This can
+            be set to False if the client only wants to *receive only*.
         """
 
         # Public
@@ -82,10 +83,13 @@ class RmrLoop:
 
                 # read our mailbox
                 # TODO: take a flag as to whether RAW is needed or not
-                # RAW allows for RTS however the caller must free, and the caller may not need RTS.
-                # Currently after consuming, callers should do rmr.rmr_free_msg(sbuf)
-
-                for (msg, sbuf) in helpers.rmr_rcvall_msgs_raw(self.mrc, timeout=1000):
+                # RAW allows for RTS however the caller must free, and
+                # the caller may not need RTS. Currently after
+                # consuming, callers must call rmr.rmr_free_msg(sbuf)
+                # Use a non-trivial timeout to avoid spinning the CPU.
+                # The function returns if no messages arrive for that
+                # interval, which allows a stop request to be processed.
+                for (msg, sbuf) in helpers.rmr_rcvall_msgs_raw(self.mrc, timeout=5000):
                     self.rcv_queue.put((msg, sbuf))
 
                 self._last_ran = time.time()
@@ -102,16 +106,19 @@ class RmrLoop:
         """
         sets a flag that will cleanly stop the thread
         """
-        mdc_logger.debug("Stopping RMR thread. Waiting for last iteration to finish..")
+        # wait until the current batch of messages is done, then kill
+        # the rmr connection. I debated putting this in "loop" however
+        # if the while loop was still going, setting mrc to close here
+        # would blow up any processing still currently happening.
+        # Probably more polite to at least finish the current batch
+        # and then close. So here we wait until the current batch is
+        # done, then we kill the mrc.
+        mdc_logger.debug("Setting flag to end RMR work loop.")
         self._keep_going = False
-        # wait until the current batch of messages is done, then kill the rmr connection
-        # note; I debated putting this in "loop" however if the while loop was still going
-        # setting mrc to close here would blow up any processing still currently happening
-        # probably more polite to at least finish the current batch and then close. So here
-        # we wait until the current batch is done, then we kill the mrc
         while self._loop_is_running:
-            time.sleep(0.5)
-        mdc_logger.debug("Closing rmr connection")
+            time.sleep(1)
+            mdc_logger.debug("Waiting for RMR work loop to end")
+        mdc_logger.debug("Closing RMR connection")
         rmr.rmr_close(self.mrc)
 
     def healthcheck(self, seconds=30):
index e39c711..f8f2efc 100644 (file)
@@ -27,11 +27,14 @@ class SDLWrapper:
     """
     This is a wrapper around the SDL Python interface.
 
-    We do not embed the below directly in the Xapp classes because this SDL wrapper is useful for other python apps, for example A1 Mediator uses this verbatim.
-    Therefore, we leave this here as a seperate instantiable object so it can be used outside of xapps too
-    One could argue this get moved into *sdl itself*.
-
-    We currently use msgpack for binary (de)serialization: https://msgpack.org/index.html
+    We do not embed the below directly in the Xapp classes because
+    this SDL wrapper is useful for other python apps, for example A1
+    Mediator uses this verbatim. Therefore, we leave this here as a
+    seperate instantiable object so it can be used outside of xapps
+    too.  One could argue this get moved into *sdl itself*.
+
+    We currently use msgpack for binary (de)serialization:
+    https://msgpack.org/index.html
     """
 
     def __init__(self, use_fake_sdl=False):
@@ -41,8 +44,12 @@ class SDLWrapper:
         Parameters
         ----------
         use_fake_sdl: bool
-            if this is True (default: False), then SDLs "fake dict backend" is used, which is very useful for testing since it allows you to use SDL without any SDL or Redis deployed at all.
-            This can be used while developing your xapp, and also for monkeypatching during unit testing (e.g., the xapp framework unit tests do this)
+            if this is True (default: False), then SDLs "fake dict
+            backend" is used, which is very useful for testing since
+            it allows you to use SDL without any SDL or Redis deployed at
+            all. This can be used while developing your xapp, and also
+            for monkeypatching during unit testing (e.g., the xapp
+            framework unit tests do this).
         """
         if use_fake_sdl:
             self._sdl = SyncStorage(fake_db_backend="dict")
@@ -51,21 +58,28 @@ class SDLWrapper:
 
     def set(self, ns, key, value, usemsgpack=True):
         """
-        set a key
+        sets a key
 
-       NOTE: I am down for a discussion about whether usemsgpack should *default* to True or False here. This seems like a usage statistic question (that we don't have enough data for yet). Are more uses for an xapp to write/read their own data, or will more xapps end up reading data written by some other thing? I think it's too early to know this. So we go with True as the very first user of this, a1, does this. I'm open to changing this default to False later with evidence.
+        TODO: discuss whether usemsgpack should *default* to True or
+        False here. This seems like a usage statistic question (that we
+        don't have enough data for yet). Are more uses for an xapp to
+        write/read their own data, or will more xapps end up reading data
+        written by some other thing? I think it's too early to know
+        this. So we go with True as the very first user of this, a1, does
+        this. I'm open to changing this default to False later with
+        evidence.
 
         Parameters
         ----------
         ns: string
-           the sdl namespace
+        the sdl namespace
         key: string
-            the sdl key
+        the sdl key
         value:
-            if usemsgpack is True, value can be anything serializable by msgpack
-            if usemsgpack is False, value must be bytes
+        if usemsgpack is True, value can be anything serializable by msgpack
+        if usemsgpack is False, value must be bytes
         usemsgpack: boolean (optional)
-            determines whether the value is serialized using msgpack
+        determines whether the value is serialized using msgpack
         """
         if usemsgpack:
             self._sdl.set(ns, {key: msgpack.packb(value, use_bin_type=True)})
index 48cb15e..d7b94dd 100644 (file)
@@ -1,3 +1,3 @@
 # CI script installs RMR from PackageCloud using this version
 ---
-version: 4.0.2
+version: 4.0.5
index 3d2e96f..26640d0 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -32,7 +32,7 @@ def _long_descr():
 
 setup(
     name="ricxappframe",
-    version="1.1.0",
+    version="1.1.1",
     packages=find_packages(exclude=["tests.*", "tests"]),
     author="Tommy Carpenter, E. Scott Daniels",
     description="Xapp and RMR framework for python",