From 8bcc51a6d44d40a1a338fb6a721b5ee8f992f323 Mon Sep 17 00:00:00 2001 From: Tommy Carpenter Date: Mon, 21 Oct 2019 16:07:31 -0400 Subject: [PATCH] Threading pt 2 (of 3, likely) * Move database cleanup (e.g., deleting instances based on statuses) into the polling loop * Rework how unit testing works with the polling loop; prior, exceptions were being thrown silently from the thread but not printed. The polling thread has now been paramaterized with override functions for the purposes of testing * Make type cleanup more efficient since we know exactly what instances were touched, and it's inefficient to iterate over all instances if they were not * Bump rmr-python version, and bump rmr version * Still an item left to do in this work; refactor the thread slightly to tie in a healthcheck with a1s healthcheck. We need k8s to restart a1 if that thread dies too. Change-Id: Ia7c4f29c9fd4de4287f17ec0d88c6129a06a5a87 Signed-off-by: Tommy Carpenter --- Dockerfile | 2 +- Dockerfile-Unit-Test | 2 +- a1/a1rmr.py | 91 ++++++++++++++++++--------------- a1/data.py | 70 ++++++++++++------------- a1/run.py | 17 +----- container-tag.yaml | 2 +- docs/developer-guide.rst | 21 +++++--- docs/release-notes.rst | 10 ++++ integration_tests/Dockerfile | 4 +- integration_tests/a1mediator/Chart.yaml | 2 +- integration_tests/test_a1.tavern.yaml | 2 +- setup.py | 4 +- tests/test_controller.py | 45 ++++++++-------- 13 files changed, 143 insertions(+), 129 deletions(-) diff --git a/Dockerfile b/Dockerfile index 988cdfd..c387a33 100644 --- a/Dockerfile +++ b/Dockerfile @@ -17,7 +17,7 @@ # install a well known working rmr FROM python:3.7-alpine RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git -RUN git clone --branch 1.9.0 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \ +RUN git clone --branch 1.10.1 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \ && cd rmr \ && mkdir build \ && cd build \ diff --git a/Dockerfile-Unit-Test b/Dockerfile-Unit-Test index 074b44a..6be53d5 100644 --- a/Dockerfile-Unit-Test +++ b/Dockerfile-Unit-Test @@ -17,7 +17,7 @@ # install a well known working rmr FROM python:3.7-alpine RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git -RUN git clone --branch 1.9.0 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \ +RUN git clone --branch 1.10.1 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \ && cd rmr \ && mkdir build \ && cd build \ diff --git a/a1/a1rmr.py b/a1/a1rmr.py index d6114bf..359d2e0 100644 --- a/a1/a1rmr.py +++ b/a1/a1rmr.py @@ -18,6 +18,7 @@ import os import queue import time import json +from threading import Thread from rmr import rmr, helpers from a1 import get_module_logger from a1 import data @@ -39,8 +40,8 @@ def _init_rmr(): # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an # internal ring of messages, and receive calls read from that # currently the size is 2048 messages, so this is fine for the foreseeable future + logger.debug("Waiting for rmr to initialize..") mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL) - while rmr.rmr_ready(mrc) == 0: time.sleep(0.5) @@ -86,20 +87,6 @@ def _send(mrc, payload, message_type=0): return None -def _update_all_statuses(mrc): - """ - get all waiting messages, and try to parse them as status updates - (currently, those are the only messages a1 should get, this may have to be revisited later) - """ - for msg in helpers.rmr_rcvall_msgs(mrc, [21024]): - try: - pay = json.loads(msg["payload"]) - data.set_status(pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]) - except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError): - logger.debug("Dropping malformed or non applicable message") - logger.debug(msg) - - # Public @@ -116,10 +103,12 @@ class RmrLoop: class represents an rmr loop meant to be called as a longstanding separate thread """ - def __init__(self, real_init=True): + def __init__(self, _init_func_override=None, rcv_func_override=None): self._rmr_is_ready = False self._keep_going = True - self._real_init = real_init # useful for unit testing to turn off initialization + self._init_func_override = _init_func_override # useful for unit testing + self._rcv_func_override = rcv_func_override # useful for unit testing to mock certain recieve scenarios + self._rcv_func = None def rmr_is_ready(self): """returns whether rmr has been initialized""" @@ -138,33 +127,53 @@ class RmrLoop: """ # get a context - mrc = None - logger.debug("Waiting for rmr to initialize...") - if self._real_init: - mrc = _init_rmr() + mrc = self._init_func_override() if self._init_func_override else _init_rmr() self._rmr_is_ready = True logger.debug("Rmr is ready") + # set the receive function called below + self._rcv_func = ( + self._rcv_func_override if self._rcv_func_override else lambda: helpers.rmr_rcvall_msgs(mrc, [21024]) + ) + # loop forever logger.debug("Work loop starting") while self._keep_going: - """ - We never raise an exception here. Log and keep moving - Bugs will eventually be caught be examining logs. - """ - try: - # First, send out all messages waiting for us - while not _SEND_QUEUE.empty(): - work_item = _SEND_QUEUE.get(block=False, timeout=None) - _send(mrc, payload=work_item["payload"], message_type=work_item["msg type"]) - - # Next, update all statuses waiting in a1s mailbox - _update_all_statuses(mrc) - - # TODO: next body of work is to try to clean up the database for any updated statuses - - except Exception as e: - logger.debug("Polling thread encountered an unexpected exception, but it will continue:") - logger.exception(e) - - time.sleep(1) + # send out all messages waiting for us + while not _SEND_QUEUE.empty(): + work_item = _SEND_QUEUE.get(block=False, timeout=None) + _send(mrc, payload=work_item["payload"], message_type=work_item["msg type"]) + + # read our mailbox and update statuses + updated_instances = set() + for msg in self._rcv_func(): + try: + pay = json.loads(msg["payload"]) + pti = pay["policy_type_id"] + pii = pay["policy_instance_id"] + data.set_status(pti, pii, pay["handler_id"], pay["status"]) + updated_instances.add((pti, pii)) + except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError, json.decoder.JSONDecodeError): + # TODO: in the future we may also have to catch SDL errors + logger.debug(("Dropping malformed or non applicable message", msg)) + + # for all updated instances, see if we can trigger a delete + # should be no catch needed here, since the status update would have failed if it was a bad pair + for ut in updated_instances: + data.clean_up_instance(ut[0], ut[1]) + + # TODO: what's a reasonable sleep time? we don't want to hammer redis too much, and a1 isn't a real time component + time.sleep(1) + + +def start_rmr_thread(init_func_override=None, rcv_func_override=None): + """ + Start a1s rmr thread + Also called during unit testing + """ + rmr_loop = RmrLoop(init_func_override, rcv_func_override) + thread = Thread(target=rmr_loop.loop) + thread.start() + while not rmr_loop.rmr_is_ready(): + time.sleep(0.5) + return rmr_loop # return the handle; useful during unit testing diff --git a/a1/data.py b/a1/data.py index 135d853..074f326 100644 --- a/a1/data.py +++ b/a1/data.py @@ -128,39 +128,6 @@ def _clear_handlers(policy_type_id, policy_instance_id): SDL.delete(k) -def _clean_up_type(policy_type_id): - """ - for all instances of type, see if it can be deleted - """ - type_is_valid(policy_type_id) - for policy_instance_id in _get_instance_list(policy_type_id): - # see if we can delete - vector = _get_statuses(policy_type_id, policy_instance_id) - - """ - TODO: not being able to delete if the list is [] is prolematic. - There are cases, such as a bad routing file, where this type will never be able to be deleted because it never went to any xapps - However, A1 cannot distinguish between the case where [] was never going to work, and the case where it hasn't worked *yet* - - However, removing this constraint also leads to problems. - Deleting the instance when the vector is empty, for example doing so “shortly after” the PUT, can lead to a worse race condition where the xapps get the policy after that, implement it, but because the DELETE triggered “too soon”, you can never get the status or do the delete on it again, so the xapps are all implementing the instance roguely. - - This requires some thought to address. - For now we stick with the "less bad problem". - """ - if vector != []: - all_deleted = True - for i in vector: - if i != "DELETED": - all_deleted = False - break # have at least one not DELETED, do nothing - - # blow away from a1 db - if all_deleted: - _clear_handlers(policy_type_id, policy_instance_id) # delete all the handlers - SDL.delete(_generate_instance_key(policy_type_id, policy_instance_id)) # delete instance - - # Types @@ -238,7 +205,6 @@ def get_policy_instance(policy_type_id, policy_instance_id): """ Retrieve a policy instance """ - _clean_up_type(policy_type_id) instance_is_valid(policy_type_id, policy_instance_id) return SDL.get(_generate_instance_key(policy_type_id, policy_instance_id)) @@ -247,7 +213,6 @@ def get_policy_instance_statuses(policy_type_id, policy_instance_id): """ Retrieve the status vector for a policy instance """ - _clean_up_type(policy_type_id) return _get_statuses(policy_type_id, policy_instance_id) @@ -255,7 +220,6 @@ def get_instance_list(policy_type_id): """ retrieve all instance ids for a type """ - _clean_up_type(policy_type_id) return _get_instance_list(policy_type_id) @@ -270,3 +234,37 @@ def set_status(policy_type_id, policy_instance_id, handler_id, status): type_is_valid(policy_type_id) instance_is_valid(policy_type_id, policy_instance_id) SDL.set(_generate_handler_key(policy_type_id, policy_instance_id, handler_id), status) + + +def clean_up_instance(policy_type_id, policy_instance_id): + """ + see if we can delete an instance based on it's status + """ + type_is_valid(policy_type_id) + instance_is_valid(policy_type_id, policy_instance_id) + + """ + TODO: not being able to delete if the list is [] is prolematic. + There are cases, such as a bad routing file, where this type will never be able to be deleted because it never went to any xapps + However, A1 cannot distinguish between the case where [] was never going to work, and the case where it hasn't worked *yet* + + However, removing this constraint also leads to problems. + Deleting the instance when the vector is empty, for example doing so “shortly after” the PUT, can lead to a worse race condition where the xapps get the policy after that, implement it, but because the DELETE triggered “too soon”, you can never get the status or do the delete on it again, so the xapps are all implementing the instance roguely. + + This requires some thought to address. + For now we stick with the "less bad problem". + """ + + vector = _get_statuses(policy_type_id, policy_instance_id) + if vector != []: + all_deleted = True + for i in vector: + if i != "DELETED": + all_deleted = False + break # have at least one not DELETED, do nothing + + # blow away from a1 db + if all_deleted: + _clear_handlers(policy_type_id, policy_instance_id) # delete all the handlers + SDL.delete(_generate_instance_key(policy_type_id, policy_instance_id)) # delete instance + logger.debug("type %s instance %s deleted", policy_type_id, policy_instance_id) diff --git a/a1/run.py b/a1/run.py index 67f63d6..c15b644 100644 --- a/a1/run.py +++ b/a1/run.py @@ -17,8 +17,6 @@ A1 entrypoint # See the License for the specific language governing permissions and # limitations under the License. # ================================================================================== -import time -from threading import Thread from gevent.pywsgi import WSGIServer from a1 import get_module_logger, app from a1 import a1rmr @@ -27,24 +25,11 @@ from a1 import a1rmr logger = get_module_logger(__name__) -def start_rmr_thread(real_init=True): - """ - Start a1s rmr thread - Also called during unit testing - """ - rmr_loop = a1rmr.RmrLoop(real_init) - thread = Thread(target=rmr_loop.loop) - thread.start() - while not rmr_loop.rmr_is_ready(): - time.sleep(0.5) - return rmr_loop # return the handle; useful during unit testing - - def main(): """Entrypoint""" # start rmr thread logger.debug("Initializing rmr thread. A1s webserver will not start until rmr initialization is complete.") - start_rmr_thread() + a1rmr.start_rmr_thread() # start webserver logger.debug("Starting gevent server") diff --git a/container-tag.yaml b/container-tag.yaml index efad040..554b483 100644 --- a/container-tag.yaml +++ b/container-tag.yaml @@ -1,4 +1,4 @@ # The Jenkins job uses this string for the tag in the image name # for example nexus3.o-ran-sc.org:10004/my-image-name:my-tag --- -tag: 1.0.2 +tag: 1.0.3 diff --git a/docs/developer-guide.rst b/docs/developer-guide.rst index 84662a5..ca92214 100644 --- a/docs/developer-guide.rst +++ b/docs/developer-guide.rst @@ -43,18 +43,27 @@ This project follows semver. When changes are made, the versions are in: 7) in the it/dep repo that contains a1 helm chart, ``values.yaml``, ``Chart.yml`` -Version bumping rmr-python -========================== -rmr-python is a critial dependency of A1. Bumping the rmr version dependency requires changes in: -1) ``setup.py`` +Version bumping rmr +==================== +rmr is a critical dependency of A1. Bumping the rmr version dependency requires changes in: + +1) ``Dockerfile`` -2) ``Dockerfile`` +2) ``Dockerfile-Unit-Test`` 3) ``integration_tests/Dockerfile`` +rmr-python is the python binding to rmr . Installing rmr per the above does not install it. +Bumping the rmr python version dependency requires changes in: + +1) ``setup.py`` + +2) ``integration_tests/Dockerfile`` + Run the integration tests after attempting this. + Unit Testing ============ Note, before this will work, for the first time on the machine running the tests, run ``./install_deps.sh``. This is only needed once on the machine. @@ -152,7 +161,7 @@ several env variables as follows: set -x LD_LIBRARY_PATH /usr/local/lib/; set -x RMR_SEED_RT /opt/route/local.rt ; set -x TEST_RCV_PORT 4563; set -x TEST_RCV_RETURN_MINT 10001; set -x TEST_RCV_SEC_DELAY 5; set -x TEST_RCV_RETURN_PAYLOAD '{"ACK_FROM": "DELAYED_TEST", "status": "SUCCESS"}' ; python receiver.py To test the async nature of A1, trigger a call to ``test_policy``, which -will target the delayed receicer, then immediately call +will target the delayed receiver, then immediately call ``control_admission``. The ``control_admission`` policy return should be returned immediately, whereas the ``test_policy`` should return after about ``TEST_RCV_SEC_DELAY 5``. The ``test_policy`` should not block A1 diff --git a/docs/release-notes.rst b/docs/release-notes.rst index a4af2b6..59676d0 100644 --- a/docs/release-notes.rst +++ b/docs/release-notes.rst @@ -29,6 +29,16 @@ and this project adheres to `Semantic Versioning `__. * Represents a resillent version of 1.0.0 that uses Redis for persistence +[1.0.3] - 10/22/2019 + +:: + + * Move database cleanup (e.g., deleting instances based on statuses) into the polling loop + * Rework how unit testing works with the polling loop; prior, exceptions were being thrown silently from the thread but not printed. The polling thread has now been paramaterized with override functions for the purposes of testing + * Make type cleanup more efficient since we know exactly what instances were touched, and it's inefficient to iterate over all instances if they were not + * Bump rmr-python version, and bump rmr version + * Still an item left to do in this work; refactor the thread slightly to tie in a healthcheck with a1s healthcheck. We need k8s to restart a1 if that thread dies too. + [1.0.2] - 10/17/2019 :: diff --git a/integration_tests/Dockerfile b/integration_tests/Dockerfile index 3e8c874..e7bfe80 100644 --- a/integration_tests/Dockerfile +++ b/integration_tests/Dockerfile @@ -17,7 +17,7 @@ # install a well known working rmr FROM python:3.7-alpine RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git -RUN git clone --branch 1.9.0 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \ +RUN git clone --branch 1.10.1 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \ && cd rmr \ && mkdir build \ && cd build \ @@ -34,7 +34,7 @@ COPY receiver.py / # Install RMr python bindings RUN pip install --upgrade pip -RUN pip install rmr==0.13.2 +RUN pip install rmr==0.13.5 # rmr setups RUN mkdir -p /opt/route/ diff --git a/integration_tests/a1mediator/Chart.yaml b/integration_tests/a1mediator/Chart.yaml index 58e1e1f..d0c73be 100644 --- a/integration_tests/a1mediator/Chart.yaml +++ b/integration_tests/a1mediator/Chart.yaml @@ -1,4 +1,4 @@ apiVersion: v1 description: A1 Helm chart for Kubernetes name: a1mediator -version: 1.0.2 +version: 1.0.3 diff --git a/integration_tests/test_a1.tavern.yaml b/integration_tests/test_a1.tavern.yaml index 55a8937..b4ccd62 100644 --- a/integration_tests/test_a1.tavern.yaml +++ b/integration_tests/test_a1.tavern.yaml @@ -189,6 +189,7 @@ stages: # DELETE the instance and make sure subsequent GETs return properly - name: delete the instance + delay_after: 3 # give it a few seconds for rmr request: url: http://localhost:10000/a1-p/policytypes/20000/policies/admission_control_policy method: DELETE @@ -204,7 +205,6 @@ stages: body: [] - name: cant get instance status - delay_before: 3 # give it a few seconds for rmr request: url: http://localhost:10000/a1-p/policytypes/20000/policies/admission_control_policy/status method: GET diff --git a/setup.py b/setup.py index 9894fb8..f8a2923 100644 --- a/setup.py +++ b/setup.py @@ -18,13 +18,13 @@ from setuptools import setup, find_packages setup( name="a1", - version="1.0.2", + version="1.0.3", packages=find_packages(exclude=["tests.*", "tests"]), author="Tommy Carpenter", description="RIC A1 Mediator for policy/intent changes", url="https://gerrit.o-ran-sc.org/r/admin/repos/ric-plt/a1", entry_points={"console_scripts": ["run.py=a1.run:main"]}, # we require jsonschema, should be in that list, but connexion already requires a specific version of it - install_requires=["requests", "Flask", "connexion[swagger-ui]", "gevent", "msgpack", "rmr>=0.13.3"], + install_requires=["requests", "Flask", "connexion[swagger-ui]", "gevent", "msgpack", "rmr>=0.13.5"], package_data={"a1": ["openapi.yaml"]}, ) diff --git a/tests/test_controller.py b/tests/test_controller.py index e5c3ac7..63831b0 100644 --- a/tests/test_controller.py +++ b/tests/test_controller.py @@ -17,7 +17,7 @@ import time from rmr.rmr_mocks import rmr_mocks -from a1 import run +from a1 import a1rmr ADM_CTRL = "admission_control_policy" @@ -28,21 +28,19 @@ ADM_CTRL_TYPE = "/a1-p/policytypes/20000" TEST_TYPE = "/a1-p/policytypes/20001" -def _fake_dequeue(_mrc, _filter_type): +def _fake_dequeue(): """for monkeypatching with a good status""" - fake_msg = {} pay = b'{"policy_type_id": 20000, "policy_instance_id": "admission_control_policy", "handler_id": "test_receiver", "status": "OK"}' - fake_msg["payload"] = pay - new_messages = [fake_msg] - return new_messages + fake_msg = {"payload": pay} + return [fake_msg] -def _fake_dequeue_none(_mrc, _filter_type): +def _fake_dequeue_none(): """for monkeypatching with no waiting messages""" return [] -def _fake_dequeue_deleted(_mrc, _filter_type): +def _fake_dequeue_deleted(): """for monkeypatching with a DELETED status""" new_msgs = [] @@ -55,6 +53,10 @@ def _fake_dequeue_deleted(_mrc, _filter_type): fake_msg = {"payload": pay} new_msgs.append(fake_msg) + # insert a bad one with a malformed body to make sure we keep going + fake_msg = {"payload": "asdf"} + new_msgs.append(fake_msg) + pay = b'{"policy_type_id": 20000, "policy_instance_id": "admission_control_policy", "handler_id": "test_receiver", "status": "DELETED"}' fake_msg = {"payload": pay} new_msgs.append(fake_msg) @@ -98,15 +100,19 @@ RMR_THREAD = None def setup_module(): """module level setup""" global RMR_THREAD - RMR_THREAD = run.start_rmr_thread(real_init=False) + + def noop(): + pass + + # launch the thread with a fake init func and a patched rcv func; we will "repatch" later + RMR_THREAD = a1rmr.start_rmr_thread(init_func_override=noop, rcv_func_override=_fake_dequeue_none) # Actual Tests -def test_workflow_nothing_there_yet(client, monkeypatch, adm_type_good, adm_instance_good): +def test_workflow_nothing_there_yet(client): """ test policy put good""" - monkeypatch.setattr("rmr.helpers.rmr_rcvall_msgs", _fake_dequeue_none) # no type there yet res = client.get(ADM_CTRL_TYPE) assert res.status_code == 404 @@ -125,7 +131,6 @@ def test_workflow(client, monkeypatch, adm_type_good, adm_instance_good): """ test a full A1 workflow """ - monkeypatch.setattr("rmr.helpers.rmr_rcvall_msgs", _fake_dequeue_none) # put the type res = client.put(ADM_CTRL_TYPE, json=adm_type_good) assert res.status_code == 201 @@ -183,7 +188,7 @@ def test_workflow(client, monkeypatch, adm_type_good, adm_instance_good): get_instance_good("NOT IN EFFECT") # now pretend we did get a good ACK - monkeypatch.setattr("rmr.helpers.rmr_rcvall_msgs", _fake_dequeue) + RMR_THREAD._rcv_func = _fake_dequeue time.sleep(1) # wait for the rmr thread get_instance_good("IN EFFECT") @@ -198,22 +203,20 @@ def test_workflow(client, monkeypatch, adm_type_good, adm_instance_good): assert res.status_code == 202 # status after a delete, but there are no messages yet, should still return - monkeypatch.setattr("rmr.helpers.rmr_rcvall_msgs", _fake_dequeue) time.sleep(1) # wait for the rmr thread get_instance_good("IN EFFECT") # now pretend we deleted successfully - monkeypatch.setattr("rmr.helpers.rmr_rcvall_msgs", _fake_dequeue_deleted) + RMR_THREAD._rcv_func = _fake_dequeue_deleted time.sleep(1) # wait for the rmr thread - res = client.get(ADM_CTRL_INSTANCE_STATUS) # cant get status - assert res.status_code == 404 - res = client.get(ADM_CTRL_INSTANCE) # cant get instance - assert res.status_code == 404 - # list still 200 but no instance res = client.get(ADM_CTRL_POLICIES) assert res.status_code == 200 assert res.json == [] + res = client.get(ADM_CTRL_INSTANCE_STATUS) # cant get status + assert res.status_code == 404 + res = client.get(ADM_CTRL_INSTANCE) # cant get instance + assert res.status_code == 404 # delete the type res = client.delete(ADM_CTRL_TYPE) @@ -248,7 +251,7 @@ def test_bad_instances(client, monkeypatch, adm_type_good): assert res.status_code == 404 # get a non existent instance - monkeypatch.setattr("rmr.helpers.rmr_rcvall_msgs", _fake_dequeue) + RMR_THREAD._rcv_func = _fake_dequeue time.sleep(1) res = client.get(ADM_CTRL_INSTANCE + "DARKNESS") assert res.status_code == 404 -- 2.16.6