Towards a1 1.0.0: rmr improvements 56/1056/2
authorTommy Carpenter <tc677g@att.com>
Wed, 2 Oct 2019 15:15:04 +0000 (11:15 -0400)
committerTommy Carpenter <tc677g@att.com>
Wed, 2 Oct 2019 16:20:54 +0000 (12:20 -0400)
    * Upgrade rmr to 1.9.0
    * Upgrade rmr-python to 0.13.2
    * Use the new helpers module in rmr-python for the rec all functionality
    * Switch rmr mode to a multithreaded mode that continuously reads from rmr and populates an internal queue of messages with a deterministic queue size (2048) which is better behavior for A1
    * Fix a memory leak (python obj is garbage collected but not the underlying C memory allocation)

Change-Id: I0f9cf7943071d6d58aef9c8c8bd86affe9b9223a
Signed-off-by: Tommy Carpenter <tc677g@att.com>
14 files changed:
.gitignore
Dockerfile
Dockerfile-Unit-Test
a1/a1rmr.py
a1/data.py
container-tag.yaml
docs/release-notes.rst
install_deps.sh
integration_tests/Dockerfile
integration_tests/a1mediator/Chart.yaml
integration_tests/receiver.py
setup.py
tests/test_controller.py
tox-integration.ini

index 542e654..9180140 100644 (file)
@@ -1,5 +1,8 @@
 *.log
 NOTES.txt
+docs/*
+rmr/*
+
 .pytest_cache/
 xunit-results.xml
 .DS_Store
index 4d76cff..988cdfd 100644 (file)
@@ -17,7 +17,7 @@
 # install a well known working rmr
 FROM python:3.7-alpine
 RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git
-RUN git clone --branch 1.8.1 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
+RUN git clone --branch 1.9.0 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
     && cd rmr \
     && mkdir build \
     && cd build \
index dbaccbe..49df469 100644 (file)
@@ -16,8 +16,8 @@
 # ==================================================================================
 # install a well known working rmr
 FROM python:3.7-alpine
-RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git 
-RUN git clone --branch 1.3.0 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
+RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git
+RUN git clone --branch 1.9.0 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
     && cd rmr \
     && mkdir build \
     && cd build \
index abbb84f..51b5694 100644 (file)
@@ -16,7 +16,7 @@
 # ==================================================================================
 import os
 import gevent
-from rmr import rmr
+from rmr import rmr, helpers
 from a1 import get_module_logger
 
 logger = get_module_logger(__name__)
@@ -31,7 +31,10 @@ def init_rmr():
     called from run; not called for unit tests
     """
     global MRC
-    MRC = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, 0x00)
+    # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
+    # internal ring of messages, and receive calls read from that
+    # currently the size is 2048 messages, so this is fine for the foreseeable future
+    MRC = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
 
     while rmr.rmr_ready(MRC) == 0:
         gevent.sleep(1)
@@ -44,7 +47,7 @@ def send(payload, message_type=0):
     If the message is sent successfully, it returns the transactionid
     Does nothing otherwise
     """
-    # we may be called many times in asyncronous loops, so for now, it is safer not to share buffers. We can investifgate later whether this is really a problem.
+    # we may be called many times in asynchronous loops, so for now, it is safer not to share buffers. We can investigate later whether this is really a problem.
     sbuf = rmr.rmr_alloc_msg(MRC, 4096)
     payload = payload if isinstance(payload, bytes) else payload.encode("utf-8")
 
@@ -68,33 +71,17 @@ def send(payload, message_type=0):
         if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
             # we are good
             logger.debug("Message sent successfully!")
+            rmr.rmr_free_msg(sbuf)
             return transaction_id
 
     # we failed all RETRY_TIMES
     logger.debug("Send failed all %s times, stopping", RETRY_TIMES)
+    rmr.rmr_free_msg(sbuf)
     return None
 
 
-def dequeue_all_waiting_messages(filter_type=None):
+def dequeue_all_waiting_messages(filter_type=[]):
     """
     dequeue all waiting rmr messages from rmr
-    We only add messages of type 21024; we drop other "spam";
-    see https://rancodev.atlassian.net/wiki/spaces/RICPLT/pages/60784719/RIC+message+types
     """
-    new_messages = []
-    sbuf = rmr.rmr_alloc_msg(MRC, 4096)
-    while True:
-        sbuf = rmr.rmr_torcv_msg(MRC, sbuf, 0)  # set the timeout to 0 so this doesn't block!!
-        summary = rmr.message_summary(sbuf)
-        if summary["message state"] == 12 and summary["message status"] == "RMR_ERR_TIMEOUT":
-            # no new messages
-            break
-        else:
-            if (not filter_type) or (summary["message type"] == filter_type):
-                # message is relevent
-                new_messages.append(summary)
-            else:
-                # "spam", do nothing with message, effectively dropped
-                logger.debug("A message was received by a1, but it was not desired, droping: %s", summary)
-
-    return new_messages
+    return helpers.rmr_rcvall_msgs(MRC, filter_type)
index 51ba044..d701373 100644 (file)
@@ -23,10 +23,10 @@ Hopefully, the access functions are a good api so nothing else has to change whe
 For now, the database is in memory.
 We use dict data structures (KV) with the expectation of having to move this into Redis
 """
+import json
 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound, PolicyTypeAlreadyExists
 from a1 import get_module_logger
 from a1 import a1rmr
-import json
 
 logger = get_module_logger(__name__)
 
@@ -109,7 +109,7 @@ def delete_policy_instance_if_applicable(policy_type_id, policy_instance_id):
     pops a1s waiting mailbox
     """
     # pop through a1s mailbox, updating a1s db of all policy statuses
-    for msg in a1rmr.dequeue_all_waiting_messages(21024):
+    for msg in a1rmr.dequeue_all_waiting_messages([21024]):
         # try to parse the messages as responses. Drop those that are malformed
         # NOTE: we don't use the parameters "policy_type_id, policy_instance" from above here,
         # because we are popping the whole mailbox, which might include other statuses
index 2ac2520..3bf180b 100644 (file)
@@ -1,4 +1,4 @@
 # The Jenkins job uses this string for the tag in the image name
 # for example nexus3.o-ran-sc.org:10004/my-image-name:my-tag
 ---
-tag: 0.14.0-NOT_FOR_USE_YET
+tag: 0.14.1-NOT_FOR_USE_YET
index dd26d9b..19f660e 100644 (file)
@@ -29,6 +29,17 @@ and this project adheres to `Semantic Versioning <http://semver.org/>`__.
 
     * Release 1.0.0 will be the Release A version of A1
 
+[0.14.1] - 10/2/2019
+::
+
+    * Upgrade rmr to 1.9.0
+    * Upgrade rmr-python to 0.13.2
+    * Use the new helpers module in rmr-python for the rec all functionality
+    * Switch rmr mode to a multithreaded mode that continuously reads from rmr and populates an internal queue of messages with a deterministic queue size (2048) which is better behavior for A1
+    * Fix a memory leak (python obj is garbage collected but not the underlying C memory allocation)
+
+
+
 [0.14.0] - 10/1/2019
 ::
 
index 9ab8cae..01c45b4 100755 (executable)
@@ -1,5 +1,5 @@
 #!/bin/sh
-git clone --branch 1.8.1 https://gerrit.oran-osc.org/r/ric-plt/lib/rmr \
+git clone --branch 1.9.0 https://gerrit.oran-osc.org/r/ric-plt/lib/rmr \
     && cd rmr \
     && mkdir .build; cd .build; cmake .. -DPACK_EXTERNALS=1; sudo make install \
     && cd ../.. \
index 19aae4a..3e8c874 100644 (file)
@@ -16,8 +16,8 @@
 # ==================================================================================
 # install a well known working rmr
 FROM python:3.7-alpine
-RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git 
-RUN git clone --branch 1.8.1 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
+RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git
+RUN git clone --branch 1.9.0 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
     && cd rmr \
     && mkdir build \
     && cd build \
@@ -34,7 +34,7 @@ COPY receiver.py /
 
 # Install RMr python bindings
 RUN pip install --upgrade pip
-RUN pip install rmr==0.10.8
+RUN pip install rmr==0.13.2
 
 # rmr setups
 RUN mkdir -p /opt/route/
index 142094b..2247fdd 100644 (file)
@@ -1,4 +1,4 @@
 apiVersion: v1
 description: A1 Helm chart for Kubernetes
 name: a1mediator
-version: 0.14.0
+version: 0.14.1
index 75294c6..d73dcd9 100644 (file)
@@ -27,8 +27,7 @@ PORT = os.environ.get("TEST_RCV_PORT", "4560")
 DELAY = int(os.environ.get("TEST_RCV_SEC_DELAY", 0))
 HANDLER_ID = os.environ.get("HANDLER_ID", "test_receiver")
 
-# TODO: should these be made constants?
-mrc = rmr.rmr_init(PORT.encode("utf-8"), rmr.RMR_MAX_RCV_BYTES, 0x00)
+mrc = rmr.rmr_init(PORT.encode("utf-8"), rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
 
 while rmr.rmr_ready(mrc) == 0:
     time.sleep(1)
index 983e130..0aa2b3f 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -18,13 +18,13 @@ from setuptools import setup, find_packages
 
 setup(
     name="a1",
-    version="0.14.0",
+    version="0.14.1",
     packages=find_packages(exclude=["tests.*", "tests"]),
     author="Tommy Carpenter",
     description="RIC A1 Mediator for policy/intent changes",
     url="https://gerrit.o-ran-sc.org/r/admin/repos/ric-plt/a1",
     entry_points={"console_scripts": ["run.py=a1.run:main"]},
     # we require jsonschema, should be in that list, but connexion already requires a specific version of it
-    install_requires=["requests", "Flask", "connexion[swagger-ui]", "gevent", "rmr>=0.10.8"],
+    install_requires=["requests", "Flask", "connexion[swagger-ui]", "gevent", "rmr>=0.13.2"],
     package_data={"a1": ["openapi.yaml"]},
 )
index 88d6f70..2866dd8 100644 (file)
@@ -76,6 +76,13 @@ def _test_put_patch(monkeypatch):
     rmr_mocks.patch_rmr(monkeypatch)
     monkeypatch.setattr("rmr.rmr.rmr_send_msg", rmr_mocks.send_mock_generator(0))  # good sends for this whole batch
 
+    # we need this because free expects a real sbuf
+    # TODO: move this into rmr_mocks
+    def noop(_sbuf):
+        pass
+
+    monkeypatch.setattr("rmr.rmr.rmr_free_msg", noop)
+
     # we need to repatch alloc (already patched in patch_rmr) to fix the transactionid, alloc is called in send and recieve
     def fake_alloc(_unused, _alsounused):
         sbuf = rmr_mocks.Rmr_mbuf_t()
index b562f86..e39a8cb 100644 (file)
@@ -49,9 +49,9 @@ commands=
     echo "running ab"
 # run apache bench
     ab -n 100 -c 10 -u putdata -T application/json http://localhost:10000/a1-p/policytypes/20000/policies/admission_control_policy
+commands_post=
 #    echo "log collection"
 #    integration_tests/getlogs.sh
-commands_post=
     echo "teardown"
     helm delete testreceiver
     helm del --purge testreceiver