* Move database cleanup (e.g., deleting instances based on statuses) into the polling loop
* Rework how unit testing works with the polling loop; prior, exceptions were being thrown silently from the thread but not printed. The polling thread has now been paramaterized with override functions for the purposes of testing
* Make type cleanup more efficient since we know exactly what instances were touched, and it's inefficient to iterate over all instances if they were not
* Bump rmr-python version, and bump rmr version
* Still an item left to do in this work; refactor the thread slightly to tie in a healthcheck with a1s healthcheck. We need k8s to restart a1 if that thread dies too.
Change-Id: Ia7c4f29c9fd4de4287f17ec0d88c6129a06a5a87
Signed-off-by: Tommy Carpenter <tc677g@att.com>
# install a well known working rmr
FROM python:3.7-alpine
RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git
-RUN git clone --branch 1.9.0 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
+RUN git clone --branch 1.10.1 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
&& cd rmr \
&& mkdir build \
&& cd build \
# install a well known working rmr
FROM python:3.7-alpine
RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git
-RUN git clone --branch 1.9.0 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
+RUN git clone --branch 1.10.1 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
&& cd rmr \
&& mkdir build \
&& cd build \
import queue
import time
import json
+from threading import Thread
from rmr import rmr, helpers
from a1 import get_module_logger
from a1 import data
# rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
# internal ring of messages, and receive calls read from that
# currently the size is 2048 messages, so this is fine for the foreseeable future
+ logger.debug("Waiting for rmr to initialize..")
mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
-
while rmr.rmr_ready(mrc) == 0:
time.sleep(0.5)
return None
-def _update_all_statuses(mrc):
- """
- get all waiting messages, and try to parse them as status updates
- (currently, those are the only messages a1 should get, this may have to be revisited later)
- """
- for msg in helpers.rmr_rcvall_msgs(mrc, [21024]):
- try:
- pay = json.loads(msg["payload"])
- data.set_status(pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"])
- except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError):
- logger.debug("Dropping malformed or non applicable message")
- logger.debug(msg)
-
-
# Public
class represents an rmr loop meant to be called as a longstanding separate thread
"""
- def __init__(self, real_init=True):
+ def __init__(self, _init_func_override=None, rcv_func_override=None):
self._rmr_is_ready = False
self._keep_going = True
- self._real_init = real_init # useful for unit testing to turn off initialization
+ self._init_func_override = _init_func_override # useful for unit testing
+ self._rcv_func_override = rcv_func_override # useful for unit testing to mock certain recieve scenarios
+ self._rcv_func = None
def rmr_is_ready(self):
"""returns whether rmr has been initialized"""
"""
# get a context
- mrc = None
- logger.debug("Waiting for rmr to initialize...")
- if self._real_init:
- mrc = _init_rmr()
+ mrc = self._init_func_override() if self._init_func_override else _init_rmr()
self._rmr_is_ready = True
logger.debug("Rmr is ready")
+ # set the receive function called below
+ self._rcv_func = (
+ self._rcv_func_override if self._rcv_func_override else lambda: helpers.rmr_rcvall_msgs(mrc, [21024])
+ )
+
# loop forever
logger.debug("Work loop starting")
while self._keep_going:
- """
- We never raise an exception here. Log and keep moving
- Bugs will eventually be caught be examining logs.
- """
- try:
- # First, send out all messages waiting for us
- while not _SEND_QUEUE.empty():
- work_item = _SEND_QUEUE.get(block=False, timeout=None)
- _send(mrc, payload=work_item["payload"], message_type=work_item["msg type"])
-
- # Next, update all statuses waiting in a1s mailbox
- _update_all_statuses(mrc)
-
- # TODO: next body of work is to try to clean up the database for any updated statuses
-
- except Exception as e:
- logger.debug("Polling thread encountered an unexpected exception, but it will continue:")
- logger.exception(e)
-
- time.sleep(1)
+ # send out all messages waiting for us
+ while not _SEND_QUEUE.empty():
+ work_item = _SEND_QUEUE.get(block=False, timeout=None)
+ _send(mrc, payload=work_item["payload"], message_type=work_item["msg type"])
+
+ # read our mailbox and update statuses
+ updated_instances = set()
+ for msg in self._rcv_func():
+ try:
+ pay = json.loads(msg["payload"])
+ pti = pay["policy_type_id"]
+ pii = pay["policy_instance_id"]
+ data.set_status(pti, pii, pay["handler_id"], pay["status"])
+ updated_instances.add((pti, pii))
+ except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError, json.decoder.JSONDecodeError):
+ # TODO: in the future we may also have to catch SDL errors
+ logger.debug(("Dropping malformed or non applicable message", msg))
+
+ # for all updated instances, see if we can trigger a delete
+ # should be no catch needed here, since the status update would have failed if it was a bad pair
+ for ut in updated_instances:
+ data.clean_up_instance(ut[0], ut[1])
+
+ # TODO: what's a reasonable sleep time? we don't want to hammer redis too much, and a1 isn't a real time component
+ time.sleep(1)
+
+
+def start_rmr_thread(init_func_override=None, rcv_func_override=None):
+ """
+ Start a1s rmr thread
+ Also called during unit testing
+ """
+ rmr_loop = RmrLoop(init_func_override, rcv_func_override)
+ thread = Thread(target=rmr_loop.loop)
+ thread.start()
+ while not rmr_loop.rmr_is_ready():
+ time.sleep(0.5)
+ return rmr_loop # return the handle; useful during unit testing
SDL.delete(k)
-def _clean_up_type(policy_type_id):
- """
- for all instances of type, see if it can be deleted
- """
- type_is_valid(policy_type_id)
- for policy_instance_id in _get_instance_list(policy_type_id):
- # see if we can delete
- vector = _get_statuses(policy_type_id, policy_instance_id)
-
- """
- TODO: not being able to delete if the list is [] is prolematic.
- There are cases, such as a bad routing file, where this type will never be able to be deleted because it never went to any xapps
- However, A1 cannot distinguish between the case where [] was never going to work, and the case where it hasn't worked *yet*
-
- However, removing this constraint also leads to problems.
- Deleting the instance when the vector is empty, for example doing so “shortly after” the PUT, can lead to a worse race condition where the xapps get the policy after that, implement it, but because the DELETE triggered “too soon”, you can never get the status or do the delete on it again, so the xapps are all implementing the instance roguely.
-
- This requires some thought to address.
- For now we stick with the "less bad problem".
- """
- if vector != []:
- all_deleted = True
- for i in vector:
- if i != "DELETED":
- all_deleted = False
- break # have at least one not DELETED, do nothing
-
- # blow away from a1 db
- if all_deleted:
- _clear_handlers(policy_type_id, policy_instance_id) # delete all the handlers
- SDL.delete(_generate_instance_key(policy_type_id, policy_instance_id)) # delete instance
-
-
# Types
"""
Retrieve a policy instance
"""
- _clean_up_type(policy_type_id)
instance_is_valid(policy_type_id, policy_instance_id)
return SDL.get(_generate_instance_key(policy_type_id, policy_instance_id))
"""
Retrieve the status vector for a policy instance
"""
- _clean_up_type(policy_type_id)
return _get_statuses(policy_type_id, policy_instance_id)
"""
retrieve all instance ids for a type
"""
- _clean_up_type(policy_type_id)
return _get_instance_list(policy_type_id)
type_is_valid(policy_type_id)
instance_is_valid(policy_type_id, policy_instance_id)
SDL.set(_generate_handler_key(policy_type_id, policy_instance_id, handler_id), status)
+
+
+def clean_up_instance(policy_type_id, policy_instance_id):
+ """
+ see if we can delete an instance based on it's status
+ """
+ type_is_valid(policy_type_id)
+ instance_is_valid(policy_type_id, policy_instance_id)
+
+ """
+ TODO: not being able to delete if the list is [] is prolematic.
+ There are cases, such as a bad routing file, where this type will never be able to be deleted because it never went to any xapps
+ However, A1 cannot distinguish between the case where [] was never going to work, and the case where it hasn't worked *yet*
+
+ However, removing this constraint also leads to problems.
+ Deleting the instance when the vector is empty, for example doing so “shortly after” the PUT, can lead to a worse race condition where the xapps get the policy after that, implement it, but because the DELETE triggered “too soon”, you can never get the status or do the delete on it again, so the xapps are all implementing the instance roguely.
+
+ This requires some thought to address.
+ For now we stick with the "less bad problem".
+ """
+
+ vector = _get_statuses(policy_type_id, policy_instance_id)
+ if vector != []:
+ all_deleted = True
+ for i in vector:
+ if i != "DELETED":
+ all_deleted = False
+ break # have at least one not DELETED, do nothing
+
+ # blow away from a1 db
+ if all_deleted:
+ _clear_handlers(policy_type_id, policy_instance_id) # delete all the handlers
+ SDL.delete(_generate_instance_key(policy_type_id, policy_instance_id)) # delete instance
+ logger.debug("type %s instance %s deleted", policy_type_id, policy_instance_id)
# See the License for the specific language governing permissions and
# limitations under the License.
# ==================================================================================
-import time
-from threading import Thread
from gevent.pywsgi import WSGIServer
from a1 import get_module_logger, app
from a1 import a1rmr
logger = get_module_logger(__name__)
-def start_rmr_thread(real_init=True):
- """
- Start a1s rmr thread
- Also called during unit testing
- """
- rmr_loop = a1rmr.RmrLoop(real_init)
- thread = Thread(target=rmr_loop.loop)
- thread.start()
- while not rmr_loop.rmr_is_ready():
- time.sleep(0.5)
- return rmr_loop # return the handle; useful during unit testing
-
-
def main():
"""Entrypoint"""
# start rmr thread
logger.debug("Initializing rmr thread. A1s webserver will not start until rmr initialization is complete.")
- start_rmr_thread()
+ a1rmr.start_rmr_thread()
# start webserver
logger.debug("Starting gevent server")
# The Jenkins job uses this string for the tag in the image name
# for example nexus3.o-ran-sc.org:10004/my-image-name:my-tag
---
-tag: 1.0.2
+tag: 1.0.3
7) in the it/dep repo that contains a1 helm chart, ``values.yaml``, ``Chart.yml``
-Version bumping rmr-python
-==========================
-rmr-python is a critial dependency of A1. Bumping the rmr version dependency requires changes in:
-1) ``setup.py``
+Version bumping rmr
+====================
+rmr is a critical dependency of A1. Bumping the rmr version dependency requires changes in:
+
+1) ``Dockerfile``
-2) ``Dockerfile``
+2) ``Dockerfile-Unit-Test``
3) ``integration_tests/Dockerfile``
+rmr-python is the python binding to rmr . Installing rmr per the above does not install it.
+Bumping the rmr python version dependency requires changes in:
+
+1) ``setup.py``
+
+2) ``integration_tests/Dockerfile``
+
Run the integration tests after attempting this.
+
Unit Testing
============
Note, before this will work, for the first time on the machine running the tests, run ``./install_deps.sh``. This is only needed once on the machine.
set -x LD_LIBRARY_PATH /usr/local/lib/; set -x RMR_SEED_RT /opt/route/local.rt ; set -x TEST_RCV_PORT 4563; set -x TEST_RCV_RETURN_MINT 10001; set -x TEST_RCV_SEC_DELAY 5; set -x TEST_RCV_RETURN_PAYLOAD '{"ACK_FROM": "DELAYED_TEST", "status": "SUCCESS"}' ; python receiver.py
To test the async nature of A1, trigger a call to ``test_policy``, which
-will target the delayed receicer, then immediately call
+will target the delayed receiver, then immediately call
``control_admission``. The ``control_admission`` policy return should be
returned immediately, whereas the ``test_policy`` should return after
about ``TEST_RCV_SEC_DELAY 5``. The ``test_policy`` should not block A1
* Represents a resillent version of 1.0.0 that uses Redis for persistence
+[1.0.3] - 10/22/2019
+
+::
+
+ * Move database cleanup (e.g., deleting instances based on statuses) into the polling loop
+ * Rework how unit testing works with the polling loop; prior, exceptions were being thrown silently from the thread but not printed. The polling thread has now been paramaterized with override functions for the purposes of testing
+ * Make type cleanup more efficient since we know exactly what instances were touched, and it's inefficient to iterate over all instances if they were not
+ * Bump rmr-python version, and bump rmr version
+ * Still an item left to do in this work; refactor the thread slightly to tie in a healthcheck with a1s healthcheck. We need k8s to restart a1 if that thread dies too.
+
[1.0.2] - 10/17/2019
::
# install a well known working rmr
FROM python:3.7-alpine
RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git
-RUN git clone --branch 1.9.0 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
+RUN git clone --branch 1.10.1 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
&& cd rmr \
&& mkdir build \
&& cd build \
# Install RMr python bindings
RUN pip install --upgrade pip
-RUN pip install rmr==0.13.2
+RUN pip install rmr==0.13.5
# rmr setups
RUN mkdir -p /opt/route/
apiVersion: v1
description: A1 Helm chart for Kubernetes
name: a1mediator
-version: 1.0.2
+version: 1.0.3
# DELETE the instance and make sure subsequent GETs return properly
- name: delete the instance
+ delay_after: 3 # give it a few seconds for rmr
request:
url: http://localhost:10000/a1-p/policytypes/20000/policies/admission_control_policy
method: DELETE
body: []
- name: cant get instance status
- delay_before: 3 # give it a few seconds for rmr
request:
url: http://localhost:10000/a1-p/policytypes/20000/policies/admission_control_policy/status
method: GET
setup(
name="a1",
- version="1.0.2",
+ version="1.0.3",
packages=find_packages(exclude=["tests.*", "tests"]),
author="Tommy Carpenter",
description="RIC A1 Mediator for policy/intent changes",
url="https://gerrit.o-ran-sc.org/r/admin/repos/ric-plt/a1",
entry_points={"console_scripts": ["run.py=a1.run:main"]},
# we require jsonschema, should be in that list, but connexion already requires a specific version of it
- install_requires=["requests", "Flask", "connexion[swagger-ui]", "gevent", "msgpack", "rmr>=0.13.3"],
+ install_requires=["requests", "Flask", "connexion[swagger-ui]", "gevent", "msgpack", "rmr>=0.13.5"],
package_data={"a1": ["openapi.yaml"]},
)
import time
from rmr.rmr_mocks import rmr_mocks
-from a1 import run
+from a1 import a1rmr
ADM_CTRL = "admission_control_policy"
TEST_TYPE = "/a1-p/policytypes/20001"
-def _fake_dequeue(_mrc, _filter_type):
+def _fake_dequeue():
"""for monkeypatching with a good status"""
- fake_msg = {}
pay = b'{"policy_type_id": 20000, "policy_instance_id": "admission_control_policy", "handler_id": "test_receiver", "status": "OK"}'
- fake_msg["payload"] = pay
- new_messages = [fake_msg]
- return new_messages
+ fake_msg = {"payload": pay}
+ return [fake_msg]
-def _fake_dequeue_none(_mrc, _filter_type):
+def _fake_dequeue_none():
"""for monkeypatching with no waiting messages"""
return []
-def _fake_dequeue_deleted(_mrc, _filter_type):
+def _fake_dequeue_deleted():
"""for monkeypatching with a DELETED status"""
new_msgs = []
fake_msg = {"payload": pay}
new_msgs.append(fake_msg)
+ # insert a bad one with a malformed body to make sure we keep going
+ fake_msg = {"payload": "asdf"}
+ new_msgs.append(fake_msg)
+
pay = b'{"policy_type_id": 20000, "policy_instance_id": "admission_control_policy", "handler_id": "test_receiver", "status": "DELETED"}'
fake_msg = {"payload": pay}
new_msgs.append(fake_msg)
def setup_module():
"""module level setup"""
global RMR_THREAD
- RMR_THREAD = run.start_rmr_thread(real_init=False)
+
+ def noop():
+ pass
+
+ # launch the thread with a fake init func and a patched rcv func; we will "repatch" later
+ RMR_THREAD = a1rmr.start_rmr_thread(init_func_override=noop, rcv_func_override=_fake_dequeue_none)
# Actual Tests
-def test_workflow_nothing_there_yet(client, monkeypatch, adm_type_good, adm_instance_good):
+def test_workflow_nothing_there_yet(client):
""" test policy put good"""
- monkeypatch.setattr("rmr.helpers.rmr_rcvall_msgs", _fake_dequeue_none)
# no type there yet
res = client.get(ADM_CTRL_TYPE)
assert res.status_code == 404
"""
test a full A1 workflow
"""
- monkeypatch.setattr("rmr.helpers.rmr_rcvall_msgs", _fake_dequeue_none)
# put the type
res = client.put(ADM_CTRL_TYPE, json=adm_type_good)
assert res.status_code == 201
get_instance_good("NOT IN EFFECT")
# now pretend we did get a good ACK
- monkeypatch.setattr("rmr.helpers.rmr_rcvall_msgs", _fake_dequeue)
+ RMR_THREAD._rcv_func = _fake_dequeue
time.sleep(1) # wait for the rmr thread
get_instance_good("IN EFFECT")
assert res.status_code == 202
# status after a delete, but there are no messages yet, should still return
- monkeypatch.setattr("rmr.helpers.rmr_rcvall_msgs", _fake_dequeue)
time.sleep(1) # wait for the rmr thread
get_instance_good("IN EFFECT")
# now pretend we deleted successfully
- monkeypatch.setattr("rmr.helpers.rmr_rcvall_msgs", _fake_dequeue_deleted)
+ RMR_THREAD._rcv_func = _fake_dequeue_deleted
time.sleep(1) # wait for the rmr thread
- res = client.get(ADM_CTRL_INSTANCE_STATUS) # cant get status
- assert res.status_code == 404
- res = client.get(ADM_CTRL_INSTANCE) # cant get instance
- assert res.status_code == 404
-
# list still 200 but no instance
res = client.get(ADM_CTRL_POLICIES)
assert res.status_code == 200
assert res.json == []
+ res = client.get(ADM_CTRL_INSTANCE_STATUS) # cant get status
+ assert res.status_code == 404
+ res = client.get(ADM_CTRL_INSTANCE) # cant get instance
+ assert res.status_code == 404
# delete the type
res = client.delete(ADM_CTRL_TYPE)
assert res.status_code == 404
# get a non existent instance
- monkeypatch.setattr("rmr.helpers.rmr_rcvall_msgs", _fake_dequeue)
+ RMR_THREAD._rcv_func = _fake_dequeue
time.sleep(1)
res = client.get(ADM_CTRL_INSTANCE + "DARKNESS")
assert res.status_code == 404