Threading pt 2 (of 3, likely) 93/1193/5 1.0.3
authorTommy Carpenter <tc677g@att.com>
Mon, 21 Oct 2019 20:07:31 +0000 (16:07 -0400)
committerTommy Carpenter <tc677g@att.com>
Wed, 23 Oct 2019 13:36:42 +0000 (09:36 -0400)
* Move database cleanup (e.g., deleting instances based on statuses) into the polling loop
* Rework how unit testing works with the polling loop; prior, exceptions were being thrown silently from the thread but not printed. The polling thread has now been paramaterized with override functions for the purposes of testing
* Make type cleanup more efficient since we know exactly what instances were touched, and it's inefficient to iterate over all instances if they were not
* Bump rmr-python version, and bump rmr version
* Still an item left to do in this work; refactor the thread slightly to tie in a healthcheck with a1s healthcheck. We need k8s to restart a1 if that thread dies too.

Change-Id: Ia7c4f29c9fd4de4287f17ec0d88c6129a06a5a87
Signed-off-by: Tommy Carpenter <tc677g@att.com>
13 files changed:
Dockerfile
Dockerfile-Unit-Test
a1/a1rmr.py
a1/data.py
a1/run.py
container-tag.yaml
docs/developer-guide.rst
docs/release-notes.rst
integration_tests/Dockerfile
integration_tests/a1mediator/Chart.yaml
integration_tests/test_a1.tavern.yaml
setup.py
tests/test_controller.py

index 988cdfd..c387a33 100644 (file)
@@ -17,7 +17,7 @@
 # install a well known working rmr
 FROM python:3.7-alpine
 RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git
-RUN git clone --branch 1.9.0 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
+RUN git clone --branch 1.10.1 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
     && cd rmr \
     && mkdir build \
     && cd build \
index 074b44a..6be53d5 100644 (file)
@@ -17,7 +17,7 @@
 # install a well known working rmr
 FROM python:3.7-alpine
 RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git
-RUN git clone --branch 1.9.0 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
+RUN git clone --branch 1.10.1 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
     && cd rmr \
     && mkdir build \
     && cd build \
index d6114bf..359d2e0 100644 (file)
@@ -18,6 +18,7 @@ import os
 import queue
 import time
 import json
+from threading import Thread
 from rmr import rmr, helpers
 from a1 import get_module_logger
 from a1 import data
@@ -39,8 +40,8 @@ def _init_rmr():
     # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
     # internal ring of messages, and receive calls read from that
     # currently the size is 2048 messages, so this is fine for the foreseeable future
+    logger.debug("Waiting for rmr to initialize..")
     mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
-
     while rmr.rmr_ready(mrc) == 0:
         time.sleep(0.5)
 
@@ -86,20 +87,6 @@ def _send(mrc, payload, message_type=0):
     return None
 
 
-def _update_all_statuses(mrc):
-    """
-    get all waiting messages, and try to parse them as status updates
-    (currently, those are the only messages a1 should get, this may have to be revisited later)
-    """
-    for msg in helpers.rmr_rcvall_msgs(mrc, [21024]):
-        try:
-            pay = json.loads(msg["payload"])
-            data.set_status(pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"])
-        except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError):
-            logger.debug("Dropping malformed or non applicable message")
-            logger.debug(msg)
-
-
 # Public
 
 
@@ -116,10 +103,12 @@ class RmrLoop:
     class represents an rmr loop meant to be called as a longstanding separate thread
     """
 
-    def __init__(self, real_init=True):
+    def __init__(self, _init_func_override=None, rcv_func_override=None):
         self._rmr_is_ready = False
         self._keep_going = True
-        self._real_init = real_init  # useful for unit testing to turn off initialization
+        self._init_func_override = _init_func_override  # useful for unit testing
+        self._rcv_func_override = rcv_func_override  # useful for unit testing to mock certain recieve scenarios
+        self._rcv_func = None
 
     def rmr_is_ready(self):
         """returns whether rmr has been initialized"""
@@ -138,33 +127,53 @@ class RmrLoop:
         """
 
         # get a context
-        mrc = None
-        logger.debug("Waiting for rmr to initialize...")
-        if self._real_init:
-            mrc = _init_rmr()
+        mrc = self._init_func_override() if self._init_func_override else _init_rmr()
         self._rmr_is_ready = True
         logger.debug("Rmr is ready")
 
+        # set the receive function called below
+        self._rcv_func = (
+            self._rcv_func_override if self._rcv_func_override else lambda: helpers.rmr_rcvall_msgs(mrc, [21024])
+        )
+
         # loop forever
         logger.debug("Work loop starting")
         while self._keep_going:
-            """
-            We never raise an exception here. Log and keep moving
-            Bugs will eventually be caught be examining logs.
-            """
-            try:
-                # First, send out all messages waiting for us
-                while not _SEND_QUEUE.empty():
-                    work_item = _SEND_QUEUE.get(block=False, timeout=None)
-                    _send(mrc, payload=work_item["payload"], message_type=work_item["msg type"])
-
-                # Next, update all statuses waiting in a1s mailbox
-                _update_all_statuses(mrc)
-
-                # TODO: next body of work is to try to clean up the database for any updated statuses
-
-            except Exception as e:
-                logger.debug("Polling thread encountered an unexpected exception, but it will continue:")
-                logger.exception(e)
-
-            time.sleep(1)
+            # send out all messages waiting for us
+            while not _SEND_QUEUE.empty():
+                work_item = _SEND_QUEUE.get(block=False, timeout=None)
+                _send(mrc, payload=work_item["payload"], message_type=work_item["msg type"])
+
+            # read our mailbox and update statuses
+            updated_instances = set()
+            for msg in self._rcv_func():
+                try:
+                    pay = json.loads(msg["payload"])
+                    pti = pay["policy_type_id"]
+                    pii = pay["policy_instance_id"]
+                    data.set_status(pti, pii, pay["handler_id"], pay["status"])
+                    updated_instances.add((pti, pii))
+                except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError, json.decoder.JSONDecodeError):
+                    # TODO: in the future we may also have to catch SDL errors
+                    logger.debug(("Dropping malformed or non applicable message", msg))
+
+            # for all updated instances, see if we can trigger a delete
+            # should be no catch needed here, since the status update would have failed if it was a bad pair
+            for ut in updated_instances:
+                data.clean_up_instance(ut[0], ut[1])
+
+        # TODO: what's a reasonable sleep time? we don't want to hammer redis too much, and a1 isn't a real time component
+        time.sleep(1)
+
+
+def start_rmr_thread(init_func_override=None, rcv_func_override=None):
+    """
+    Start a1s rmr thread
+    Also called during unit testing
+    """
+    rmr_loop = RmrLoop(init_func_override, rcv_func_override)
+    thread = Thread(target=rmr_loop.loop)
+    thread.start()
+    while not rmr_loop.rmr_is_ready():
+        time.sleep(0.5)
+    return rmr_loop  # return the handle; useful during unit testing
index 135d853..074f326 100644 (file)
@@ -128,39 +128,6 @@ def _clear_handlers(policy_type_id, policy_instance_id):
         SDL.delete(k)
 
 
-def _clean_up_type(policy_type_id):
-    """
-    for all instances of type, see if it can be deleted
-    """
-    type_is_valid(policy_type_id)
-    for policy_instance_id in _get_instance_list(policy_type_id):
-        # see if we can delete
-        vector = _get_statuses(policy_type_id, policy_instance_id)
-
-        """
-        TODO: not being able to delete if the list is [] is prolematic.
-        There are cases, such as a bad routing file, where this type will never be able to be deleted because it never went to any xapps
-        However, A1 cannot distinguish between the case where [] was never going to work, and the case where it hasn't worked *yet*
-
-        However, removing this constraint also leads to problems.
-        Deleting the instance when the vector is empty, for example doing so “shortly after” the PUT, can lead to a worse race condition where the xapps get the policy after that, implement it, but because the DELETE triggered “too soon”, you can never get the status or do the delete on it again, so the xapps are all implementing the instance roguely.
-
-        This requires some thought to address.
-        For now we stick with the "less bad problem".
-        """
-        if vector != []:
-            all_deleted = True
-            for i in vector:
-                if i != "DELETED":
-                    all_deleted = False
-                    break  # have at least one not DELETED, do nothing
-
-            # blow away from a1 db
-            if all_deleted:
-                _clear_handlers(policy_type_id, policy_instance_id)  # delete all the handlers
-                SDL.delete(_generate_instance_key(policy_type_id, policy_instance_id))  # delete instance
-
-
 # Types
 
 
@@ -238,7 +205,6 @@ def get_policy_instance(policy_type_id, policy_instance_id):
     """
     Retrieve a policy instance
     """
-    _clean_up_type(policy_type_id)
     instance_is_valid(policy_type_id, policy_instance_id)
     return SDL.get(_generate_instance_key(policy_type_id, policy_instance_id))
 
@@ -247,7 +213,6 @@ def get_policy_instance_statuses(policy_type_id, policy_instance_id):
     """
     Retrieve the status vector for a policy instance
     """
-    _clean_up_type(policy_type_id)
     return _get_statuses(policy_type_id, policy_instance_id)
 
 
@@ -255,7 +220,6 @@ def get_instance_list(policy_type_id):
     """
     retrieve all instance ids for a type
     """
-    _clean_up_type(policy_type_id)
     return _get_instance_list(policy_type_id)
 
 
@@ -270,3 +234,37 @@ def set_status(policy_type_id, policy_instance_id, handler_id, status):
     type_is_valid(policy_type_id)
     instance_is_valid(policy_type_id, policy_instance_id)
     SDL.set(_generate_handler_key(policy_type_id, policy_instance_id, handler_id), status)
+
+
+def clean_up_instance(policy_type_id, policy_instance_id):
+    """
+    see if we can delete an instance based on it's status
+    """
+    type_is_valid(policy_type_id)
+    instance_is_valid(policy_type_id, policy_instance_id)
+
+    """
+    TODO: not being able to delete if the list is [] is prolematic.
+    There are cases, such as a bad routing file, where this type will never be able to be deleted because it never went to any xapps
+    However, A1 cannot distinguish between the case where [] was never going to work, and the case where it hasn't worked *yet*
+
+    However, removing this constraint also leads to problems.
+    Deleting the instance when the vector is empty, for example doing so “shortly after” the PUT, can lead to a worse race condition where the xapps get the policy after that, implement it, but because the DELETE triggered “too soon”, you can never get the status or do the delete on it again, so the xapps are all implementing the instance roguely.
+
+    This requires some thought to address.
+    For now we stick with the "less bad problem".
+    """
+
+    vector = _get_statuses(policy_type_id, policy_instance_id)
+    if vector != []:
+        all_deleted = True
+        for i in vector:
+            if i != "DELETED":
+                all_deleted = False
+                break  # have at least one not DELETED, do nothing
+
+        # blow away from a1 db
+        if all_deleted:
+            _clear_handlers(policy_type_id, policy_instance_id)  # delete all the handlers
+            SDL.delete(_generate_instance_key(policy_type_id, policy_instance_id))  # delete instance
+            logger.debug("type %s instance %s deleted", policy_type_id, policy_instance_id)
index 67f63d6..c15b644 100644 (file)
--- a/a1/run.py
+++ b/a1/run.py
@@ -17,8 +17,6 @@ A1 entrypoint
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 # ==================================================================================
-import time
-from threading import Thread
 from gevent.pywsgi import WSGIServer
 from a1 import get_module_logger, app
 from a1 import a1rmr
@@ -27,24 +25,11 @@ from a1 import a1rmr
 logger = get_module_logger(__name__)
 
 
-def start_rmr_thread(real_init=True):
-    """
-    Start a1s rmr thread
-    Also called during unit testing
-    """
-    rmr_loop = a1rmr.RmrLoop(real_init)
-    thread = Thread(target=rmr_loop.loop)
-    thread.start()
-    while not rmr_loop.rmr_is_ready():
-        time.sleep(0.5)
-    return rmr_loop  # return the handle; useful during unit testing
-
-
 def main():
     """Entrypoint"""
     # start rmr thread
     logger.debug("Initializing rmr thread. A1s webserver will not start until rmr initialization is complete.")
-    start_rmr_thread()
+    a1rmr.start_rmr_thread()
 
     # start webserver
     logger.debug("Starting gevent server")
index efad040..554b483 100644 (file)
@@ -1,4 +1,4 @@
 # The Jenkins job uses this string for the tag in the image name
 # for example nexus3.o-ran-sc.org:10004/my-image-name:my-tag
 ---
-tag: 1.0.2
+tag: 1.0.3
index 84662a5..ca92214 100644 (file)
@@ -43,18 +43,27 @@ This project follows semver. When changes are made, the versions are in:
 
 7) in the it/dep repo that contains a1 helm chart, ``values.yaml``, ``Chart.yml``
 
-Version bumping rmr-python
-==========================
-rmr-python is a critial dependency of A1. Bumping the rmr version dependency requires changes in:
 
-1) ``setup.py``
+Version bumping rmr
+====================
+rmr is a critical dependency of A1. Bumping the rmr version dependency requires changes in:
+
+1) ``Dockerfile``
 
-2) ``Dockerfile``
+2) ``Dockerfile-Unit-Test``
 
 3) ``integration_tests/Dockerfile``
 
+rmr-python is the python binding to rmr . Installing rmr per the above does not install it.
+Bumping the rmr python version dependency requires changes in:
+
+1) ``setup.py``
+
+2) ``integration_tests/Dockerfile``
+
 Run the integration tests after attempting this.
 
+
 Unit Testing
 ============
 Note,  before this will work, for the first time on the machine running the tests, run ``./install_deps.sh``. This is only needed once on the machine.
@@ -152,7 +161,7 @@ several env variables as follows:
    set -x LD_LIBRARY_PATH /usr/local/lib/; set -x RMR_SEED_RT /opt/route/local.rt ; set -x TEST_RCV_PORT 4563; set -x TEST_RCV_RETURN_MINT 10001; set -x TEST_RCV_SEC_DELAY 5; set -x TEST_RCV_RETURN_PAYLOAD '{"ACK_FROM": "DELAYED_TEST", "status": "SUCCESS"}' ; python receiver.py
 
 To test the async nature of A1, trigger a call to ``test_policy``, which
-will target the delayed receicer, then immediately call
+will target the delayed receiver, then immediately call
 ``control_admission``. The ``control_admission`` policy return should be
 returned immediately, whereas the ``test_policy`` should return after
 about ``TEST_RCV_SEC_DELAY 5``. The ``test_policy`` should not block A1
index a4af2b6..59676d0 100644 (file)
@@ -29,6 +29,16 @@ and this project adheres to `Semantic Versioning <http://semver.org/>`__.
 
     * Represents a resillent version of 1.0.0 that uses Redis for persistence
 
+[1.0.3] - 10/22/2019
+
+::
+
+    * Move database cleanup (e.g., deleting instances based on statuses) into the polling loop
+    * Rework how unit testing works with the polling loop; prior, exceptions were being thrown silently from the thread but not printed. The polling thread has now been paramaterized with override functions for the purposes of testing
+    * Make type cleanup more efficient since we know exactly what instances were touched, and it's inefficient to iterate over all instances if they were not
+    * Bump rmr-python version, and bump rmr version
+    * Still an item left to do in this work; refactor the thread slightly to tie in a healthcheck with a1s healthcheck. We need k8s to restart a1 if that thread dies too.
+
 [1.0.2] - 10/17/2019
 
 ::
index 3e8c874..e7bfe80 100644 (file)
@@ -17,7 +17,7 @@
 # install a well known working rmr
 FROM python:3.7-alpine
 RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git
-RUN git clone --branch 1.9.0 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
+RUN git clone --branch 1.10.1 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
     && cd rmr \
     && mkdir build \
     && cd build \
@@ -34,7 +34,7 @@ COPY receiver.py /
 
 # Install RMr python bindings
 RUN pip install --upgrade pip
-RUN pip install rmr==0.13.2
+RUN pip install rmr==0.13.5
 
 # rmr setups
 RUN mkdir -p /opt/route/
index 58e1e1f..d0c73be 100644 (file)
@@ -1,4 +1,4 @@
 apiVersion: v1
 description: A1 Helm chart for Kubernetes
 name: a1mediator
-version: 1.0.2
+version: 1.0.3
index 55a8937..b4ccd62 100644 (file)
@@ -189,6 +189,7 @@ stages:
 
   # DELETE the instance and make sure subsequent GETs return properly
   - name: delete the instance
+    delay_after: 3 # give it a few seconds for rmr
     request:
       url: http://localhost:10000/a1-p/policytypes/20000/policies/admission_control_policy
       method: DELETE
@@ -204,7 +205,6 @@ stages:
       body: []
 
   - name: cant get instance status
-    delay_before: 3 # give it a few seconds for rmr
     request:
       url: http://localhost:10000/a1-p/policytypes/20000/policies/admission_control_policy/status
       method: GET
index 9894fb8..f8a2923 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -18,13 +18,13 @@ from setuptools import setup, find_packages
 
 setup(
     name="a1",
-    version="1.0.2",
+    version="1.0.3",
     packages=find_packages(exclude=["tests.*", "tests"]),
     author="Tommy Carpenter",
     description="RIC A1 Mediator for policy/intent changes",
     url="https://gerrit.o-ran-sc.org/r/admin/repos/ric-plt/a1",
     entry_points={"console_scripts": ["run.py=a1.run:main"]},
     # we require jsonschema, should be in that list, but connexion already requires a specific version of it
-    install_requires=["requests", "Flask", "connexion[swagger-ui]", "gevent", "msgpack", "rmr>=0.13.3"],
+    install_requires=["requests", "Flask", "connexion[swagger-ui]", "gevent", "msgpack", "rmr>=0.13.5"],
     package_data={"a1": ["openapi.yaml"]},
 )
index e5c3ac7..63831b0 100644 (file)
@@ -17,7 +17,7 @@
 
 import time
 from rmr.rmr_mocks import rmr_mocks
-from a1 import run
+from a1 import a1rmr
 
 
 ADM_CTRL = "admission_control_policy"
@@ -28,21 +28,19 @@ ADM_CTRL_TYPE = "/a1-p/policytypes/20000"
 TEST_TYPE = "/a1-p/policytypes/20001"
 
 
-def _fake_dequeue(_mrc, _filter_type):
+def _fake_dequeue():
     """for monkeypatching with a good status"""
-    fake_msg = {}
     pay = b'{"policy_type_id": 20000, "policy_instance_id": "admission_control_policy", "handler_id": "test_receiver", "status": "OK"}'
-    fake_msg["payload"] = pay
-    new_messages = [fake_msg]
-    return new_messages
+    fake_msg = {"payload": pay}
+    return [fake_msg]
 
 
-def _fake_dequeue_none(_mrc, _filter_type):
+def _fake_dequeue_none():
     """for monkeypatching with no waiting messages"""
     return []
 
 
-def _fake_dequeue_deleted(_mrc, _filter_type):
+def _fake_dequeue_deleted():
     """for monkeypatching  with a DELETED status"""
     new_msgs = []
 
@@ -55,6 +53,10 @@ def _fake_dequeue_deleted(_mrc, _filter_type):
     fake_msg = {"payload": pay}
     new_msgs.append(fake_msg)
 
+    # insert a bad one with a malformed body to make sure we keep going
+    fake_msg = {"payload": "asdf"}
+    new_msgs.append(fake_msg)
+
     pay = b'{"policy_type_id": 20000, "policy_instance_id": "admission_control_policy", "handler_id": "test_receiver", "status": "DELETED"}'
     fake_msg = {"payload": pay}
     new_msgs.append(fake_msg)
@@ -98,15 +100,19 @@ RMR_THREAD = None
 def setup_module():
     """module level setup"""
     global RMR_THREAD
-    RMR_THREAD = run.start_rmr_thread(real_init=False)
+
+    def noop():
+        pass
+
+    # launch the thread with a fake init func and a patched rcv func; we will "repatch" later
+    RMR_THREAD = a1rmr.start_rmr_thread(init_func_override=noop, rcv_func_override=_fake_dequeue_none)
 
 
 # Actual Tests
 
 
-def test_workflow_nothing_there_yet(client, monkeypatch, adm_type_good, adm_instance_good):
+def test_workflow_nothing_there_yet(client):
     """ test policy put good"""
-    monkeypatch.setattr("rmr.helpers.rmr_rcvall_msgs", _fake_dequeue_none)
     # no type there yet
     res = client.get(ADM_CTRL_TYPE)
     assert res.status_code == 404
@@ -125,7 +131,6 @@ def test_workflow(client, monkeypatch, adm_type_good, adm_instance_good):
     """
     test a full A1 workflow
     """
-    monkeypatch.setattr("rmr.helpers.rmr_rcvall_msgs", _fake_dequeue_none)
     # put the type
     res = client.put(ADM_CTRL_TYPE, json=adm_type_good)
     assert res.status_code == 201
@@ -183,7 +188,7 @@ def test_workflow(client, monkeypatch, adm_type_good, adm_instance_good):
     get_instance_good("NOT IN EFFECT")
 
     # now pretend we did get a good ACK
-    monkeypatch.setattr("rmr.helpers.rmr_rcvall_msgs", _fake_dequeue)
+    RMR_THREAD._rcv_func = _fake_dequeue
     time.sleep(1)  # wait for the rmr thread
     get_instance_good("IN EFFECT")
 
@@ -198,22 +203,20 @@ def test_workflow(client, monkeypatch, adm_type_good, adm_instance_good):
     assert res.status_code == 202
 
     # status after a delete, but there are no messages yet, should still return
-    monkeypatch.setattr("rmr.helpers.rmr_rcvall_msgs", _fake_dequeue)
     time.sleep(1)  # wait for the rmr thread
     get_instance_good("IN EFFECT")
 
     # now pretend we deleted successfully
-    monkeypatch.setattr("rmr.helpers.rmr_rcvall_msgs", _fake_dequeue_deleted)
+    RMR_THREAD._rcv_func = _fake_dequeue_deleted
     time.sleep(1)  # wait for the rmr thread
-    res = client.get(ADM_CTRL_INSTANCE_STATUS)  # cant get status
-    assert res.status_code == 404
-    res = client.get(ADM_CTRL_INSTANCE)  # cant get instance
-    assert res.status_code == 404
-
     # list still 200 but no instance
     res = client.get(ADM_CTRL_POLICIES)
     assert res.status_code == 200
     assert res.json == []
+    res = client.get(ADM_CTRL_INSTANCE_STATUS)  # cant get status
+    assert res.status_code == 404
+    res = client.get(ADM_CTRL_INSTANCE)  # cant get instance
+    assert res.status_code == 404
 
     # delete the type
     res = client.delete(ADM_CTRL_TYPE)
@@ -248,7 +251,7 @@ def test_bad_instances(client, monkeypatch, adm_type_good):
     assert res.status_code == 404
 
     # get a non existent instance
-    monkeypatch.setattr("rmr.helpers.rmr_rcvall_msgs", _fake_dequeue)
+    RMR_THREAD._rcv_func = _fake_dequeue
     time.sleep(1)
     res = client.get(ADM_CTRL_INSTANCE + "DARKNESS")
     assert res.status_code == 404