Threading pt 1 77/1177/5 1.0.2
authorTommy Carpenter <tc677g@att.com>
Thu, 17 Oct 2019 17:35:59 +0000 (13:35 -0400)
committerTommy Carpenter <tc677g@att.com>
Fri, 18 Oct 2019 15:41:35 +0000 (11:41 -0400)
* a1 now has a seperate, continuous polling thread
  this will enable operations like database cleanup (based on ACKs) and external notifications in real time,
  rather than when the API is invoked
* all rmr send and receive operations are now in this thread
* introduces a thread safe job queue between the two threads
* Not done yet: database cleanups in the thread
* Bump rmr python version

Change-Id: I29ca7843ffe7497c84891920f8aee332ac676591
Signed-off-by: Tommy Carpenter <tc677g@att.com>
a1/a1rmr.py
a1/controller.py
a1/data.py
a1/run.py
container-tag.yaml
docs/release-notes.rst
integration_tests/a1mediator/Chart.yaml
setup.py
tests/conftest.py
tests/test_controller.py
tox.ini

index 51b5694..d6114bf 100644 (file)
 #   limitations under the License.
 # ==================================================================================
 import os
-import gevent
+import queue
+import time
+import json
 from rmr import rmr, helpers
 from a1 import get_module_logger
+from a1 import data
+from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
 
 logger = get_module_logger(__name__)
 
 
 RETRY_TIMES = int(os.environ.get("RMR_RETRY_TIMES", 4))
-MRC = None
 
+_SEND_QUEUE = queue.Queue()  # thread safe queue https://docs.python.org/3/library/queue.html
 
-def init_rmr():
+
+def _init_rmr():
     """
-    called from run; not called for unit tests
+    init an rmr context
+    This gets monkeypatched out for unit testing
     """
-    global MRC
     # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
     # internal ring of messages, and receive calls read from that
     # currently the size is 2048 messages, so this is fine for the foreseeable future
-    MRC = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
+    mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
+
+    while rmr.rmr_ready(mrc) == 0:
+        time.sleep(0.5)
 
-    while rmr.rmr_ready(MRC) == 0:
-        gevent.sleep(1)
-        logger.debug("not yet ready")
+    return mrc
 
 
-def send(payload, message_type=0):
+def _send(mrc, payload, message_type=0):
     """
     Sends a message up to RETRY_TIMES
     If the message is sent successfully, it returns the transactionid
     Does nothing otherwise
     """
-    # we may be called many times in asynchronous loops, so for now, it is safer not to share buffers. We can investigate later whether this is really a problem.
-    sbuf = rmr.rmr_alloc_msg(MRC, 4096)
+    # TODO: investigate moving this below and allocating the space based on the payload size
+    sbuf = rmr.rmr_alloc_msg(mrc, 4096)
     payload = payload if isinstance(payload, bytes) else payload.encode("utf-8")
 
     # retry RETRY_TIMES to send the message
@@ -63,7 +69,7 @@ def send(payload, message_type=0):
         transaction_id = pre_send_summary["transaction id"]  # save the transactionid because we need it later
 
         # send
-        sbuf = rmr.rmr_send_msg(MRC, sbuf)
+        sbuf = rmr.rmr_send_msg(mrc, sbuf)
         post_send_summary = rmr.message_summary(sbuf)
         logger.debug("Post message send summary: %s", rmr.message_summary(sbuf))
 
@@ -80,8 +86,85 @@ def send(payload, message_type=0):
     return None
 
 
-def dequeue_all_waiting_messages(filter_type=[]):
+def _update_all_statuses(mrc):
+    """
+    get all waiting messages, and try to parse them as status updates
+    (currently, those are the only messages a1 should get, this may have to be revisited later)
+    """
+    for msg in helpers.rmr_rcvall_msgs(mrc, [21024]):
+        try:
+            pay = json.loads(msg["payload"])
+            data.set_status(pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"])
+        except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError):
+            logger.debug("Dropping malformed or non applicable message")
+            logger.debug(msg)
+
+
+# Public
+
+
+def queue_work(item):
+    """
+    push an item into the work queue
+    currently the only type of work is to send out messages
+    """
+    _SEND_QUEUE.put(item)
+
+
+class RmrLoop:
     """
-    dequeue all waiting rmr messages from rmr
+    class represents an rmr loop meant to be called as a longstanding separate thread
     """
-    return helpers.rmr_rcvall_msgs(MRC, filter_type)
+
+    def __init__(self, real_init=True):
+        self._rmr_is_ready = False
+        self._keep_going = True
+        self._real_init = real_init  # useful for unit testing to turn off initialization
+
+    def rmr_is_ready(self):
+        """returns whether rmr has been initialized"""
+        return self._rmr_is_ready
+
+    def stop(self):
+        """sets a flag for the loop to end"""
+        self._keep_going = False
+
+    def loop(self):
+        """
+        This loop runs in an a1 thread forever, and has 3 jobs:
+        - send out any messages that have to go out (create instance, delete instance)
+        - read a1s mailbox and update the status of all instances based on acks from downstream policy handlers
+        - clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET)
+        """
+
+        # get a context
+        mrc = None
+        logger.debug("Waiting for rmr to initialize...")
+        if self._real_init:
+            mrc = _init_rmr()
+        self._rmr_is_ready = True
+        logger.debug("Rmr is ready")
+
+        # loop forever
+        logger.debug("Work loop starting")
+        while self._keep_going:
+            """
+            We never raise an exception here. Log and keep moving
+            Bugs will eventually be caught be examining logs.
+            """
+            try:
+                # First, send out all messages waiting for us
+                while not _SEND_QUEUE.empty():
+                    work_item = _SEND_QUEUE.get(block=False, timeout=None)
+                    _send(mrc, payload=work_item["payload"], message_type=work_item["msg type"])
+
+                # Next, update all statuses waiting in a1s mailbox
+                _update_all_statuses(mrc)
+
+                # TODO: next body of work is to try to clean up the database for any updated statuses
+
+            except Exception as e:
+                logger.debug("Polling thread encountered an unexpected exception, but it will continue:")
+                logger.exception(e)
+
+            time.sleep(1)
index b0ae31a..b3ce88e 100644 (file)
@@ -35,11 +35,9 @@ def _try_func_return(func):
     """
     try:
         return func()
-    except (ValidationError, exceptions.PolicyTypeAlreadyExists, exceptions.CantDeleteNonEmptyType) as exc:
-        logger.exception(exc)
+    except (ValidationError, exceptions.PolicyTypeAlreadyExists, exceptions.CantDeleteNonEmptyType):
         return "", 400
-    except (exceptions.PolicyTypeNotFound, exceptions.PolicyInstanceNotFound) as exc:
-        logger.exception(exc)
+    except (exceptions.PolicyTypeNotFound, exceptions.PolicyInstanceNotFound):
         return "", 404
     except BaseException as exc:
         # catch all, should never happen...
@@ -170,7 +168,7 @@ def create_or_replace_policy_instance(policy_type_id, policy_instance_id):
 
         # send rmr (best effort)
         body = _gen_body_to_handler("CREATE", policy_type_id, policy_instance_id, payload=instance)
-        a1rmr.send(json.dumps(body), message_type=policy_type_id)
+        a1rmr.queue_work({"payload": json.dumps(body), "msg type": policy_type_id})
 
         return "", 202
 
@@ -190,7 +188,7 @@ def delete_policy_instance(policy_type_id, policy_instance_id):
 
         # send rmr (best effort)
         body = _gen_body_to_handler("DELETE", policy_type_id, policy_instance_id)
-        a1rmr.send(json.dumps(body), message_type=policy_type_id)
+        a1rmr.queue_work({"payload": json.dumps(body), "msg type": policy_type_id})
 
         return "", 202
 
index 631d2c6..135d853 100644 (file)
@@ -23,11 +23,9 @@ Hopefully, the access functions are a good api so nothing else has to change whe
 For now, the database is in memory.
 We use dict data structures (KV) with the expectation of having to move this into Redis
 """
-import json
 import msgpack
 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound, PolicyTypeAlreadyExists, CantDeleteNonEmptyType
 from a1 import get_module_logger
-from a1 import a1rmr
 
 logger = get_module_logger(__name__)
 
@@ -132,36 +130,9 @@ def _clear_handlers(policy_type_id, policy_instance_id):
 
 def _clean_up_type(policy_type_id):
     """
-    pop through a1s mailbox, updating a1s db of all policy statuses
     for all instances of type, see if it can be deleted
     """
     type_is_valid(policy_type_id)
-    for msg in a1rmr.dequeue_all_waiting_messages([21024]):
-        # try to parse the messages as responses. Drop those that are malformed
-        pay = json.loads(msg["payload"])
-        if "policy_type_id" in pay and "policy_instance_id" in pay and "handler_id" in pay and "status" in pay:
-            # We don't use the parameters "policy_type_id, policy_instance" from above here,
-            # because we are popping the whole mailbox, which might include other statuses
-            pti = pay["policy_type_id"]
-            pii = pay["policy_instance_id"]
-
-            try:
-                """
-                can't raise an exception here e.g.:
-                because this is called on many functions; just drop bad status messages.
-                We def don't want bad messages that happen to hit a1s mailbox to blow up anything
-
-                """
-                type_is_valid(pti)
-                instance_is_valid(pti, pii)
-                SDL.set(_generate_handler_key(pti, pii, pay["handler_id"]), pay["status"])
-            except (PolicyTypeNotFound, PolicyInstanceNotFound):
-                pass
-
-        else:
-            logger.debug("Dropping message")
-            logger.debug(pay)
-
     for policy_instance_id in _get_instance_list(policy_type_id):
         # see if we can delete
         vector = _get_statuses(policy_type_id, policy_instance_id)
@@ -286,3 +257,16 @@ def get_instance_list(policy_type_id):
     """
     _clean_up_type(policy_type_id)
     return _get_instance_list(policy_type_id)
+
+
+# Statuses
+
+
+def set_status(policy_type_id, policy_instance_id, handler_id, status):
+    """
+    update the database status for a handler
+    called from a1's rmr thread
+    """
+    type_is_valid(policy_type_id)
+    instance_is_valid(policy_type_id, policy_instance_id)
+    SDL.set(_generate_handler_key(policy_type_id, policy_instance_id, handler_id), status)
index 9eb59f5..67f63d6 100644 (file)
--- a/a1/run.py
+++ b/a1/run.py
@@ -1,3 +1,6 @@
+"""
+A1 entrypoint
+"""
 # ==================================================================================
 #       Copyright (c) 2019 Nokia
 #       Copyright (c) 2018-2019 AT&T Intellectual Property.
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 # ==================================================================================
+import time
+from threading import Thread
 from gevent.pywsgi import WSGIServer
 from a1 import get_module_logger, app
-from a1.a1rmr import init_rmr
+from a1 import a1rmr
 
 
 logger = get_module_logger(__name__)
 
 
+def start_rmr_thread(real_init=True):
+    """
+    Start a1s rmr thread
+    Also called during unit testing
+    """
+    rmr_loop = a1rmr.RmrLoop(real_init)
+    thread = Thread(target=rmr_loop.loop)
+    thread.start()
+    while not rmr_loop.rmr_is_ready():
+        time.sleep(0.5)
+    return rmr_loop  # return the handle; useful during unit testing
+
+
 def main():
     """Entrypoint"""
-    logger.debug("Initializing rmr")
-    init_rmr()
+    # start rmr thread
+    logger.debug("Initializing rmr thread. A1s webserver will not start until rmr initialization is complete.")
+    start_rmr_thread()
+
+    # start webserver
     logger.debug("Starting gevent server")
     http_server = WSGIServer(("", 10000), app)
     http_server.serve_forever()
index 4072098..efad040 100644 (file)
@@ -1,4 +1,4 @@
 # The Jenkins job uses this string for the tag in the image name
 # for example nexus3.o-ran-sc.org:10004/my-image-name:my-tag
 ---
-tag: 1.0.1
+tag: 1.0.2
index 2060a2a..a4af2b6 100644 (file)
@@ -29,7 +29,20 @@ and this project adheres to `Semantic Versioning <http://semver.org/>`__.
 
     * Represents a resillent version of 1.0.0 that uses Redis for persistence
 
-[1.0.1]
+[1.0.2] - 10/17/2019
+
+::
+
+    * a1 now has a seperate, continuous polling thread
+      this will enable operations like database cleanup (based on ACKs) and external notifications in real time,
+      rather than when the API is invoked
+    * all rmr send and receive operations are now in this thread
+    * introduces a thread safe job queue between the two threads
+    * Not done yet: database cleanups in the thread
+    * Bump rmr python version
+    * Clean up some logging
+
+[1.0.1] - 10/15/2019
 
 ::
 
index 792818d..58e1e1f 100644 (file)
@@ -1,4 +1,4 @@
 apiVersion: v1
 description: A1 Helm chart for Kubernetes
 name: a1mediator
-version: 1.0.1
+version: 1.0.2
index 9c242be..9894fb8 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -18,13 +18,13 @@ from setuptools import setup, find_packages
 
 setup(
     name="a1",
-    version="1.0.1",
+    version="1.0.2",
     packages=find_packages(exclude=["tests.*", "tests"]),
     author="Tommy Carpenter",
     description="RIC A1 Mediator for policy/intent changes",
     url="https://gerrit.o-ran-sc.org/r/admin/repos/ric-plt/a1",
     entry_points={"console_scripts": ["run.py=a1.run:main"]},
     # we require jsonschema, should be in that list, but connexion already requires a specific version of it
-    install_requires=["requests", "Flask", "connexion[swagger-ui]", "gevent", "msgpack", "rmr>=0.13.2"],
+    install_requires=["requests", "Flask", "connexion[swagger-ui]", "gevent", "msgpack", "rmr>=0.13.3"],
     package_data={"a1": ["openapi.yaml"]},
 )
index c39b962..73573d0 100644 (file)
@@ -1,8 +1,49 @@
+"""
+pytest conftest
+"""
+# ==================================================================================
+#       Copyright (c) 2019 Nokia
+#       Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+# ==================================================================================
+import tempfile
+import os
 import pytest
+from a1 import app
+
+
+@pytest.fixture
+def client():
+    """
+    http://flask.pocoo.org/docs/1.0/testing/
+    """
+
+    db_fd, app.app.config["DATABASE"] = tempfile.mkstemp()
+    app.app.config["TESTING"] = True
+    cl = app.app.test_client()
+
+    yield cl
+
+    os.close(db_fd)
+    os.unlink(app.app.config["DATABASE"])
 
 
 @pytest.fixture
 def adm_type_good():
+    """
+    represents a good put for adm control type
+    """
     return {
         "name": "Admission Control",
         "description": "various parameters to control admission of dual connection",
@@ -41,4 +82,7 @@ def adm_type_good():
 
 @pytest.fixture
 def adm_instance_good():
+    """
+    represents a good put for adm control instance
+    """
     return {"enforce": True, "window_length": 10, "blocking_rate": 20, "trigger_threshold": 10}
index f4e0ae2..e5c3ac7 100644 (file)
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 # ==================================================================================
-import tempfile
-import os
 
+import time
 from rmr.rmr_mocks import rmr_mocks
-from a1 import app
-import pytest
+from a1 import run
 
 
 ADM_CTRL = "admission_control_policy"
@@ -30,23 +28,8 @@ ADM_CTRL_TYPE = "/a1-p/policytypes/20000"
 TEST_TYPE = "/a1-p/policytypes/20001"
 
 
-# http://flask.pocoo.org/docs/1.0/testing/
-@pytest.fixture
-def client():
-    db_fd, app.app.config["DATABASE"] = tempfile.mkstemp()
-    app.app.config["TESTING"] = True
-    cl = app.app.test_client()
-
-    yield cl
-
-    os.close(db_fd)
-    os.unlink(app.app.config["DATABASE"])
-
-
-def _fake_dequeue(_filter_type):
-    """
-    for monkeypatching a1rmnr.dequeue_all_messages with a good status
-    """
+def _fake_dequeue(_mrc, _filter_type):
+    """for monkeypatching with a good status"""
     fake_msg = {}
     pay = b'{"policy_type_id": 20000, "policy_instance_id": "admission_control_policy", "handler_id": "test_receiver", "status": "OK"}'
     fake_msg["payload"] = pay
@@ -54,17 +37,13 @@ def _fake_dequeue(_filter_type):
     return new_messages
 
 
-def _fake_dequeue_none(_filter_type):
-    """
-    for monkeypatching a1rmnr.dequeue_all_messages with no waiting messages
-    """
+def _fake_dequeue_none(_mrc, _filter_type):
+    """for monkeypatching with no waiting messages"""
     return []
 
 
-def _fake_dequeue_deleted(_filter_type):
-    """
-    for monkeypatching a1rmnr.dequeue_all_messages with a DELETED status
-    """
+def _fake_dequeue_deleted(_mrc, _filter_type):
+    """for monkeypatching  with a DELETED status"""
     new_msgs = []
 
     # insert some that don't exist to make sure nothing blows up
@@ -85,7 +64,8 @@ def _fake_dequeue_deleted(_filter_type):
 
 def _test_put_patch(monkeypatch):
     rmr_mocks.patch_rmr(monkeypatch)
-    monkeypatch.setattr("rmr.rmr.rmr_send_msg", rmr_mocks.send_mock_generator(0))  # good sends for this whole batch
+    # assert that rmr bad states don't cause problems
+    monkeypatch.setattr("rmr.rmr.rmr_send_msg", rmr_mocks.send_mock_generator(10))
 
     # we need this because free expects a real sbuf
     # TODO: move this into rmr_mocks
@@ -109,12 +89,24 @@ def _test_put_patch(monkeypatch):
     monkeypatch.setattr("rmr.rmr.generate_and_set_transaction_id", fake_set_transactionid)
 
 
+# Module level Hack
+
+
+RMR_THREAD = None
+
+
+def setup_module():
+    """module level setup"""
+    global RMR_THREAD
+    RMR_THREAD = run.start_rmr_thread(real_init=False)
+
+
 # Actual Tests
 
 
 def test_workflow_nothing_there_yet(client, monkeypatch, adm_type_good, adm_instance_good):
     """ test policy put good"""
-
+    monkeypatch.setattr("rmr.helpers.rmr_rcvall_msgs", _fake_dequeue_none)
     # no type there yet
     res = client.get(ADM_CTRL_TYPE)
     assert res.status_code == 404
@@ -125,12 +117,15 @@ def test_workflow_nothing_there_yet(client, monkeypatch, adm_type_good, adm_inst
     assert res.json == []
 
     # instance 404 because type not there yet
-    monkeypatch.setattr("a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue_none)
     res = client.get(ADM_CTRL_POLICIES)
     assert res.status_code == 404
 
 
 def test_workflow(client, monkeypatch, adm_type_good, adm_instance_good):
+    """
+    test a full A1 workflow
+    """
+    monkeypatch.setattr("rmr.helpers.rmr_rcvall_msgs", _fake_dequeue_none)
     # put the type
     res = client.put(ADM_CTRL_TYPE, json=adm_type_good)
     assert res.status_code == 201
@@ -148,7 +143,6 @@ def test_workflow(client, monkeypatch, adm_type_good, adm_instance_good):
     assert res.json == [20000]
 
     # instance 200 but empty list
-    monkeypatch.setattr("a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue_none)
     res = client.get(ADM_CTRL_POLICIES)
     assert res.status_code == 200
     assert res.json == []
@@ -161,8 +155,6 @@ def test_workflow(client, monkeypatch, adm_type_good, adm_instance_good):
 
     # create a good instance
     _test_put_patch(monkeypatch)
-    # assert that rmr bad states don't cause problems
-    monkeypatch.setattr("rmr.rmr.rmr_send_msg", rmr_mocks.send_mock_generator(10))
     res = client.put(ADM_CTRL_INSTANCE, json=adm_instance_good)
     assert res.status_code == 202
 
@@ -186,12 +178,13 @@ def test_workflow(client, monkeypatch, adm_type_good, adm_instance_good):
         assert res.status_code == 200
         assert res.get_data(as_text=True) == expected
 
-    # try a status get but pretend we didn't get any ACKs yet to test NOT IN EFFECT
-    monkeypatch.setattr("a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue_none)
+    # try a status get but we didn't get any ACKs yet to test NOT IN EFFECT
+    time.sleep(1)  # wait for the rmr thread
     get_instance_good("NOT IN EFFECT")
 
     # now pretend we did get a good ACK
-    monkeypatch.setattr("a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue)
+    monkeypatch.setattr("rmr.helpers.rmr_rcvall_msgs", _fake_dequeue)
+    time.sleep(1)  # wait for the rmr thread
     get_instance_good("IN EFFECT")
 
     # cant delete type until there are no instances
@@ -205,11 +198,13 @@ def test_workflow(client, monkeypatch, adm_type_good, adm_instance_good):
     assert res.status_code == 202
 
     # status after a delete, but there are no messages yet, should still return
-    monkeypatch.setattr("a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue)
+    monkeypatch.setattr("rmr.helpers.rmr_rcvall_msgs", _fake_dequeue)
+    time.sleep(1)  # wait for the rmr thread
     get_instance_good("IN EFFECT")
 
     # now pretend we deleted successfully
-    monkeypatch.setattr("a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue_deleted)
+    monkeypatch.setattr("rmr.helpers.rmr_rcvall_msgs", _fake_dequeue_deleted)
+    time.sleep(1)  # wait for the rmr thread
     res = client.get(ADM_CTRL_INSTANCE_STATUS)  # cant get status
     assert res.status_code == 404
     res = client.get(ADM_CTRL_INSTANCE)  # cant get instance
@@ -253,7 +248,8 @@ def test_bad_instances(client, monkeypatch, adm_type_good):
     assert res.status_code == 404
 
     # get a non existent instance
-    monkeypatch.setattr("a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue)
+    monkeypatch.setattr("rmr.helpers.rmr_rcvall_msgs", _fake_dequeue)
+    time.sleep(1)
     res = client.get(ADM_CTRL_INSTANCE + "DARKNESS")
     assert res.status_code == 404
 
@@ -278,3 +274,8 @@ def test_healthcheck(client):
     """
     res = client.get("/a1-p/healthcheck")
     assert res.status_code == 200
+
+
+def teardown_module():
+    """module teardown"""
+    RMR_THREAD.stop()
diff --git a/tox.ini b/tox.ini
index 70389b6..8a47c3c 100644 (file)
--- a/tox.ini
+++ b/tox.ini
@@ -29,6 +29,7 @@ setenv =
 
 # Note, before this will work, for the first time on that machine, run ./install_deps.sh
 commands =
+# sometimes the -s flag is helpful; add -s after pytest; which streams the logs as they come in, rather than saving them all for the end of tests
     pytest --junitxml xunit-results.xml --cov a1 --cov-report xml --cov-report term-missing --cov-report html --cov-fail-under=70
     coverage xml -i