*.log
NOTES.txt
+docs/*
+rmr/*
+
.pytest_cache/
xunit-results.xml
.DS_Store
# install a well known working rmr
FROM python:3.7-alpine
RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git
-RUN git clone --branch 1.8.1 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
+RUN git clone --branch 1.9.0 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
&& cd rmr \
&& mkdir build \
&& cd build \
# ==================================================================================
# install a well known working rmr
FROM python:3.7-alpine
-RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git
-RUN git clone --branch 1.3.0 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
+RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git
+RUN git clone --branch 1.9.0 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
&& cd rmr \
&& mkdir build \
&& cd build \
# ==================================================================================
import os
import gevent
-from rmr import rmr
+from rmr import rmr, helpers
from a1 import get_module_logger
logger = get_module_logger(__name__)
called from run; not called for unit tests
"""
global MRC
- MRC = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, 0x00)
+ # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
+ # internal ring of messages, and receive calls read from that
+ # currently the size is 2048 messages, so this is fine for the foreseeable future
+ MRC = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
while rmr.rmr_ready(MRC) == 0:
gevent.sleep(1)
If the message is sent successfully, it returns the transactionid
Does nothing otherwise
"""
- # we may be called many times in asyncronous loops, so for now, it is safer not to share buffers. We can investifgate later whether this is really a problem.
+ # we may be called many times in asynchronous loops, so for now, it is safer not to share buffers. We can investigate later whether this is really a problem.
sbuf = rmr.rmr_alloc_msg(MRC, 4096)
payload = payload if isinstance(payload, bytes) else payload.encode("utf-8")
if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
# we are good
logger.debug("Message sent successfully!")
+ rmr.rmr_free_msg(sbuf)
return transaction_id
# we failed all RETRY_TIMES
logger.debug("Send failed all %s times, stopping", RETRY_TIMES)
+ rmr.rmr_free_msg(sbuf)
return None
-def dequeue_all_waiting_messages(filter_type=None):
+def dequeue_all_waiting_messages(filter_type=[]):
"""
dequeue all waiting rmr messages from rmr
- We only add messages of type 21024; we drop other "spam";
- see https://rancodev.atlassian.net/wiki/spaces/RICPLT/pages/60784719/RIC+message+types
"""
- new_messages = []
- sbuf = rmr.rmr_alloc_msg(MRC, 4096)
- while True:
- sbuf = rmr.rmr_torcv_msg(MRC, sbuf, 0) # set the timeout to 0 so this doesn't block!!
- summary = rmr.message_summary(sbuf)
- if summary["message state"] == 12 and summary["message status"] == "RMR_ERR_TIMEOUT":
- # no new messages
- break
- else:
- if (not filter_type) or (summary["message type"] == filter_type):
- # message is relevent
- new_messages.append(summary)
- else:
- # "spam", do nothing with message, effectively dropped
- logger.debug("A message was received by a1, but it was not desired, droping: %s", summary)
-
- return new_messages
+ return helpers.rmr_rcvall_msgs(MRC, filter_type)
For now, the database is in memory.
We use dict data structures (KV) with the expectation of having to move this into Redis
"""
+import json
from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound, PolicyTypeAlreadyExists
from a1 import get_module_logger
from a1 import a1rmr
-import json
logger = get_module_logger(__name__)
pops a1s waiting mailbox
"""
# pop through a1s mailbox, updating a1s db of all policy statuses
- for msg in a1rmr.dequeue_all_waiting_messages(21024):
+ for msg in a1rmr.dequeue_all_waiting_messages([21024]):
# try to parse the messages as responses. Drop those that are malformed
# NOTE: we don't use the parameters "policy_type_id, policy_instance" from above here,
# because we are popping the whole mailbox, which might include other statuses
# The Jenkins job uses this string for the tag in the image name
# for example nexus3.o-ran-sc.org:10004/my-image-name:my-tag
---
-tag: 0.14.0-NOT_FOR_USE_YET
+tag: 0.14.1-NOT_FOR_USE_YET
* Release 1.0.0 will be the Release A version of A1
+[0.14.1] - 10/2/2019
+::
+
+ * Upgrade rmr to 1.9.0
+ * Upgrade rmr-python to 0.13.2
+ * Use the new helpers module in rmr-python for the rec all functionality
+ * Switch rmr mode to a multithreaded mode that continuously reads from rmr and populates an internal queue of messages with a deterministic queue size (2048) which is better behavior for A1
+ * Fix a memory leak (python obj is garbage collected but not the underlying C memory allocation)
+
+
+
[0.14.0] - 10/1/2019
::
#!/bin/sh
-git clone --branch 1.8.1 https://gerrit.oran-osc.org/r/ric-plt/lib/rmr \
+git clone --branch 1.9.0 https://gerrit.oran-osc.org/r/ric-plt/lib/rmr \
&& cd rmr \
&& mkdir .build; cd .build; cmake .. -DPACK_EXTERNALS=1; sudo make install \
&& cd ../.. \
# ==================================================================================
# install a well known working rmr
FROM python:3.7-alpine
-RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git
-RUN git clone --branch 1.8.1 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
+RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git
+RUN git clone --branch 1.9.0 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
&& cd rmr \
&& mkdir build \
&& cd build \
# Install RMr python bindings
RUN pip install --upgrade pip
-RUN pip install rmr==0.10.8
+RUN pip install rmr==0.13.2
# rmr setups
RUN mkdir -p /opt/route/
apiVersion: v1
description: A1 Helm chart for Kubernetes
name: a1mediator
-version: 0.14.0
+version: 0.14.1
DELAY = int(os.environ.get("TEST_RCV_SEC_DELAY", 0))
HANDLER_ID = os.environ.get("HANDLER_ID", "test_receiver")
-# TODO: should these be made constants?
-mrc = rmr.rmr_init(PORT.encode("utf-8"), rmr.RMR_MAX_RCV_BYTES, 0x00)
+mrc = rmr.rmr_init(PORT.encode("utf-8"), rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
while rmr.rmr_ready(mrc) == 0:
time.sleep(1)
setup(
name="a1",
- version="0.14.0",
+ version="0.14.1",
packages=find_packages(exclude=["tests.*", "tests"]),
author="Tommy Carpenter",
description="RIC A1 Mediator for policy/intent changes",
url="https://gerrit.o-ran-sc.org/r/admin/repos/ric-plt/a1",
entry_points={"console_scripts": ["run.py=a1.run:main"]},
# we require jsonschema, should be in that list, but connexion already requires a specific version of it
- install_requires=["requests", "Flask", "connexion[swagger-ui]", "gevent", "rmr>=0.10.8"],
+ install_requires=["requests", "Flask", "connexion[swagger-ui]", "gevent", "rmr>=0.13.2"],
package_data={"a1": ["openapi.yaml"]},
)
rmr_mocks.patch_rmr(monkeypatch)
monkeypatch.setattr("rmr.rmr.rmr_send_msg", rmr_mocks.send_mock_generator(0)) # good sends for this whole batch
+ # we need this because free expects a real sbuf
+ # TODO: move this into rmr_mocks
+ def noop(_sbuf):
+ pass
+
+ monkeypatch.setattr("rmr.rmr.rmr_free_msg", noop)
+
# we need to repatch alloc (already patched in patch_rmr) to fix the transactionid, alloc is called in send and recieve
def fake_alloc(_unused, _alsounused):
sbuf = rmr_mocks.Rmr_mbuf_t()
echo "running ab"
# run apache bench
ab -n 100 -c 10 -u putdata -T application/json http://localhost:10000/a1-p/policytypes/20000/policies/admission_control_policy
+commands_post=
# echo "log collection"
# integration_tests/getlogs.sh
-commands_post=
echo "teardown"
helm delete testreceiver
helm del --purge testreceiver