A1 2.0.0: 07/1907/13 2.0.0
authorTommy Carpenter <tc677g@att.com>
Mon, 2 Dec 2019 16:02:01 +0000 (11:02 -0500)
committerTommy Carpenter <tc677g@att.com>
Mon, 9 Dec 2019 15:00:47 +0000 (10:00 -0500)
    * Implements new logic around when instances are deleted. See flowcharts in docs/. Basically timeouts now trigger to actually delete instances from a1s database, and these timeouts are configurable.
    * Eliminates the barrier to deleting an instance when no xapp evdr replied (via timeouts)
    * Add two new ENV variables that control timeouts
    * Make unit tests more modular so new workflows can be tested easily
    * Fixes the API for ../status to return a richer structure. This is an (albeit tiny) API change.
    * Clean up unused items in the integration tests helm chart
    * Removed "RMR_RCV_RETRY_INTERVAL" leftovers since this isn't used anymore
    * Uses the standard RIC logging library
    * Switch the backend routing scheme to using subscription id with constant message types, per request.
    * Given the above, policy type ids can be any valid 32bit greater than 0
    * Decouple the API between northbound and A1 from A1 with xapps. This is now two seperate OpenAPI files
    * Update example for AC Xapp
    * Updgrade rmr and rmr-python to utilize new features; lots of cleanups because of that

Change-Id: Ie7812607244cbcc484fe14c60fe27371e7e65082
Signed-off-by: Tommy Carpenter <tc677g@att.com>
24 files changed:
Dockerfile
Dockerfile-Unit-Test
a1/a1rmr.py
a1/controller.py
a1/messages.py [new file with mode: 0644]
a1/openapi.yaml
container-tag.yaml
docs/developer-guide.rst
docs/release-notes.rst
integration_tests/Dockerfile-query-receiver [new file with mode: 0644]
integration_tests/Dockerfile-test-delay-receiver [moved from integration_tests/Dockerfile with 94% similarity]
integration_tests/a1mediator/Chart.yaml
integration_tests/getlogs.sh
integration_tests/install_deps.sh
integration_tests/query_tester.py [new file with mode: 0644]
integration_tests/test_a1.tavern.yaml
integration_tests/test_local.rt [deleted file]
integration_tests/testreceiver/templates/config.yaml
integration_tests/testreceiver/templates/deployment.yaml
integration_tests/testreceiver/templates/service.yaml
integration_tests/testreceiver/values.yaml
rmr-version.yaml
setup.py
tests/test_controller.py

index c387a33..2faaca5 100644 (file)
@@ -17,7 +17,7 @@
 # install a well known working rmr
 FROM python:3.7-alpine
 RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git
 # install a well known working rmr
 FROM python:3.7-alpine
 RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git
-RUN git clone --branch 1.10.1 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
+RUN git clone --branch 1.10.2 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
     && cd rmr \
     && mkdir build \
     && cd build \
     && cd rmr \
     && mkdir build \
     && cd build \
index b5d524c..dbb5401 100644 (file)
@@ -17,7 +17,7 @@
 # install a well known working rmr
 FROM python:3.7-alpine
 RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git
 # install a well known working rmr
 FROM python:3.7-alpine
 RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git
-RUN git clone --branch 1.10.1 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
+RUN git clone --branch 1.10.2 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
     && cd rmr \
     && mkdir build \
     && cd build \
     && cd rmr \
     && mkdir build \
     && cd build \
@@ -44,4 +44,4 @@ COPY setup.py tox.ini /tmp/
 WORKDIR /tmp
 
 # Run the unit tests
 WORKDIR /tmp
 
 # Run the unit tests
-RUN tox -e py37, flake8
+RUN tox -e py37,flake8
index ae2bf00..2e5dace 100644 (file)
@@ -24,7 +24,7 @@ import json
 from threading import Thread
 from rmr import rmr, helpers
 from mdclogpy import Logger
 from threading import Thread
 from rmr import rmr, helpers
 from mdclogpy import Logger
-from a1 import data
+from a1 import data, messages
 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
 
 mdc_logger = Logger(name=__name__)
 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
 
 mdc_logger = Logger(name=__name__)
@@ -53,7 +53,7 @@ class _RmrLoop:
         self.keep_going = True
         self.rcv_func = None
         self.last_ran = time.time()
         self.keep_going = True
         self.rcv_func = None
         self.last_ran = time.time()
-        self.work_queue = queue.Queue()  # thread safe queue https://docs.python.org/3/library/queue.html
+        self.instance_send_queue = queue.Queue()  # thread safe queue https://docs.python.org/3/library/queue.html
 
         # intialize rmr context
         if init_func_override:
 
         # intialize rmr context
         if init_func_override:
@@ -68,15 +68,52 @@ class _RmrLoop:
                 time.sleep(0.5)
 
         # set the receive function
                 time.sleep(0.5)
 
         # set the receive function
-        # TODO: when policy query is implemented, add A1_POLICY_QUERY
         self.rcv_func = (
         self.rcv_func = (
-            rcv_func_override if rcv_func_override else lambda: helpers.rmr_rcvall_msgs(self.mrc, [A1_POLICY_RESPONSE])
+            rcv_func_override
+            if rcv_func_override
+            else lambda: helpers.rmr_rcvall_msgs_raw(self.mrc, [A1_POLICY_RESPONSE, A1_POLICY_QUERY])
         )
 
         # start the work loop
         self.thread = Thread(target=self.loop)
         self.thread.start()
 
         )
 
         # start the work loop
         self.thread = Thread(target=self.loop)
         self.thread.start()
 
+    def _assert_good_send(self, sbuf, pre_send_summary):
+        """
+        common helper function for _send_msg and _rts_msg
+        """
+        post_send_summary = rmr.message_summary(sbuf)
+        if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
+            return True
+        mdc_logger.debug("Message NOT sent!")
+        mdc_logger.debug("Pre-send summary: {0}, Post-send summary: {1}".format(pre_send_summary, post_send_summary))
+        return False
+
+    def _send_msg(self, pay, mtype, subid):
+        """
+        sends a msg
+        """
+        for _ in range(0, RETRY_TIMES):
+            sbuf = rmr.rmr_alloc_msg(self.mrc, len(pay), payload=pay, gen_transaction_id=True, mtype=mtype, sub_id=subid)
+            sbuf.contents.sub_id = subid
+            pre_send_summary = rmr.message_summary(sbuf)
+            sbuf = rmr.rmr_send_msg(self.mrc, sbuf)  # send
+            if self._assert_good_send(sbuf, pre_send_summary):
+                rmr.rmr_free_msg(sbuf)  # free
+                break
+
+    def _rts_msg(self, pay, sbuf_rts, mtype):
+        """
+        sends a message using rts
+        we do not call free here because we may rts many times; it is called after the rts loop
+        """
+        for _ in range(0, RETRY_TIMES):
+            pre_send_summary = rmr.message_summary(sbuf_rts)
+            sbuf_rts = rmr.rmr_rts_msg(self.mrc, sbuf_rts, payload=pay, mtype=mtype)
+            if self._assert_good_send(sbuf_rts, pre_send_summary):
+                break
+        return sbuf_rts  # in some cases rts may return a new sbuf
+
     def loop(self):
         """
         This loop runs forever, and has 3 jobs:
     def loop(self):
         """
         This loop runs forever, and has 3 jobs:
@@ -89,38 +126,52 @@ class _RmrLoop:
         while self.keep_going:
 
             # send out all messages waiting for us
         while self.keep_going:
 
             # send out all messages waiting for us
-            while not self.work_queue.empty():
-                work_item = self.work_queue.get(block=False, timeout=None)
-
-                pay = work_item["payload"].encode("utf-8")
-                for _ in range(0, RETRY_TIMES):
-                    # Waiting on an rmr bugfix regarding the over-allocation: https://rancodev.atlassian.net/browse/RICPLT-2490
-                    sbuf = rmr.rmr_alloc_msg(self.mrc, 4096, pay, True, A1_POLICY_REQUEST)
-                    # TODO: after next rmr is released, this can be done in the alloc call. but that's not avail in pypi yet
-                    sbuf.contents.sub_id = work_item["ptid"]
-                    pre_send_summary = rmr.message_summary(sbuf)
-                    sbuf = rmr.rmr_send_msg(self.mrc, sbuf)  # send
-                    post_send_summary = rmr.message_summary(sbuf)
-                    mdc_logger.debug(
-                        "Pre-send summary: {0}, Post-send summary: {1}".format(pre_send_summary, post_send_summary)
-                    )
-                    rmr.rmr_free_msg(sbuf)  # free
-                    if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
-                        mdc_logger.debug("Message sent successfully!")
-                        break
-
-            # read our mailbox and update statuses
-            for msg in self.rcv_func():
+            while not self.instance_send_queue.empty():
+                work_item = self.instance_send_queue.get(block=False, timeout=None)
+                payload = json.dumps(messages.a1_to_handler(*work_item)).encode("utf-8")
+                self._send_msg(payload, A1_POLICY_REQUEST, work_item[1])
+
+            # read our mailbox
+            for (msg, sbuf) in self.rcv_func():
+                # TODO: in the future we may also have to catch SDL errors
                 try:
                 try:
-                    pay = json.loads(msg["payload"])
-                    pti = pay["policy_type_id"]
-                    pii = pay["policy_instance_id"]
-                    data.set_policy_instance_status(pti, pii, pay["handler_id"], pay["status"])
-                except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError, TypeError, json.decoder.JSONDecodeError):
-                    # TODO: in the future we may also have to catch SDL errors
-                    mdc_logger.debug("Dropping malformed or non applicable message: {0}".format(msg))
-
-            # TODO: what's a reasonable sleep time? we don't want to hammer redis too much, and a1 isn't a real time component
+                    mtype = msg["message type"]
+                except (KeyError, TypeError, json.decoder.JSONDecodeError):
+                    mdc_logger.debug("Dropping malformed policy ack/query message: {0}".format(msg))
+
+                if mtype == A1_POLICY_RESPONSE:
+                    try:
+                        # got a policy response, update status
+                        pay = json.loads(msg["payload"])
+                        data.set_policy_instance_status(
+                            pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]
+                        )
+                        mdc_logger.debug("Successfully received status update: {0}".format(pay))
+                    except (PolicyTypeNotFound, PolicyInstanceNotFound):
+                        mdc_logger.debug("Received a response  for a non-existent instance")
+                    except (KeyError, TypeError, json.decoder.JSONDecodeError):
+                        mdc_logger.debug("Dropping malformed policy ack message: {0}".format(msg))
+
+                elif mtype == A1_POLICY_QUERY:
+                    try:
+                        # got a query, do a lookup and send out all instances
+                        pti = json.loads(msg["payload"])["policy_type_id"]
+                        mdc_logger.debug("Received query for: {0}".format(pti))
+                        for pii in data.get_instance_list(pti):
+                            instance = data.get_policy_instance(pti, pii)
+                            payload = json.dumps(messages.a1_to_handler("CREATE", pti, pii, instance)).encode("utf-8")
+                            sbuf = self._rts_msg(payload, sbuf, A1_POLICY_REQUEST)
+                    except (PolicyTypeNotFound, PolicyInstanceNotFound):
+                        mdc_logger.debug("Received a query for a non-existent type: {0}".format(msg))
+                    except (KeyError, TypeError, json.decoder.JSONDecodeError):
+                        mdc_logger.debug("Dropping malformed policy query message: {0}".format(msg))
+
+                else:
+                    mdc_logger.debug("Received message type {0} but A1 does not handle this".format(mtype))
+
+                # we must free each sbuf
+                rmr.rmr_free_msg(sbuf)
+
             self.last_ran = time.time()
             time.sleep(1)
 
             self.last_ran = time.time()
             time.sleep(1)
 
@@ -144,12 +195,12 @@ def stop_rmr_thread():
     __RMR_LOOP__.keep_going = False
 
 
     __RMR_LOOP__.keep_going = False
 
 
-def queue_work(item):
+def queue_instance_send(item):
     """
     push an item into the work queue
     currently the only type of work is to send out messages
     """
     """
     push an item into the work queue
     currently the only type of work is to send out messages
     """
-    __RMR_LOOP__.work_queue.put(item)
+    __RMR_LOOP__.instance_send_queue.put(item)
 
 
 def healthcheck_rmr_thread(seconds=30):
 
 
 def healthcheck_rmr_thread(seconds=30):
index 1022320..cde7483 100644 (file)
@@ -17,7 +17,6 @@ Main a1 controller
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 # ==================================================================================
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 # ==================================================================================
-import json
 from flask import Response
 from jsonschema import validate
 from jsonschema.exceptions import ValidationError
 from flask import Response
 from jsonschema import validate
 from jsonschema.exceptions import ValidationError
@@ -45,18 +44,6 @@ def _try_func_return(func):
         return Response(status=500)
 
 
         return Response(status=500)
 
 
-def _gen_body_to_handler(operation, policy_type_id, policy_instance_id, payload=None):
-    """
-    used to create the payloads that get sent to downstream policy handlers
-    """
-    return {
-        "operation": operation,
-        "policy_type_id": policy_type_id,
-        "policy_instance_id": policy_instance_id,
-        "payload": payload,
-    }
-
-
 # Healthcheck
 
 
 # Healthcheck
 
 
@@ -164,9 +151,8 @@ def create_or_replace_policy_instance(policy_type_id, policy_instance_id):
         # store the instance
         data.store_policy_instance(policy_type_id, policy_instance_id, instance)
 
         # store the instance
         data.store_policy_instance(policy_type_id, policy_instance_id, instance)
 
-        # send rmr (best effort)
-        body = _gen_body_to_handler("CREATE", policy_type_id, policy_instance_id, payload=instance)
-        a1rmr.queue_work({"payload": json.dumps(body), "ptid": policy_type_id})
+        # queue rmr send (best effort)
+        a1rmr.queue_instance_send(("CREATE", policy_type_id, policy_instance_id, instance))
 
         return "", 202
 
 
         return "", 202
 
@@ -185,9 +171,8 @@ def delete_policy_instance(policy_type_id, policy_instance_id):
         """
         data.delete_policy_instance(policy_type_id, policy_instance_id)
 
         """
         data.delete_policy_instance(policy_type_id, policy_instance_id)
 
-        # send rmr (best effort)
-        body = _gen_body_to_handler("DELETE", policy_type_id, policy_instance_id)
-        a1rmr.queue_work({"payload": json.dumps(body), "ptid": policy_type_id})
+        # queue rmr send (best effort)
+        a1rmr.queue_instance_send(("DELETE", policy_type_id, policy_instance_id, ""))
 
         return "", 202
 
 
         return "", 202
 
diff --git a/a1/messages.py b/a1/messages.py
new file mode 100644 (file)
index 0000000..cbcea08
--- /dev/null
@@ -0,0 +1,31 @@
+"""
+rmr messages
+"""
+# ==================================================================================
+#       Copyright (c) 2019 Nokia
+#       Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+# ==================================================================================
+
+
+def a1_to_handler(operation, policy_type_id, policy_instance_id, payload=None):
+    """
+    used to create the payloads that get sent to downstream policy handlers
+    """
+    return {
+        "operation": operation,
+        "policy_type_id": policy_type_id,
+        "policy_instance_id": policy_instance_id,
+        "payload": payload,
+    }
index c8c0bd9..f35a742 100644 (file)
@@ -16,7 +16,7 @@
 # ==================================================================================
 openapi: 3.0.0
 info:
 # ==================================================================================
 openapi: 3.0.0
 info:
-  version: 1.1.0
+  version: 2.0.0
   title: RIC A1
 paths:
   '/a1-p/healthcheck':
   title: RIC A1
 paths:
   '/a1-p/healthcheck':
index 2bb40a9..34c7e0c 100644 (file)
@@ -1,4 +1,4 @@
 # The Jenkins job uses this string for the tag in the image name
 # for example nexus3.o-ran-sc.org:10004/my-image-name:my-tag
 ---
 # The Jenkins job uses this string for the tag in the image name
 # for example nexus3.o-ran-sc.org:10004/my-image-name:my-tag
 ---
-tag: 1.0.4
+tag: 2.0.0
index 38722da..e3e47c5 100644 (file)
@@ -42,17 +42,22 @@ rmr is a critical dependency of A1. Bumping the rmr version dependency requires
 
 2) ``Dockerfile-Unit-Test``
 
 
 2) ``Dockerfile-Unit-Test``
 
-3) ``integration_tests/Dockerfile``
+3) ``integration_tests/Dockerfile-test-delay-receiver``
+
+4) ``integration_tests/Dockerfile-query-receiver``
+
+5) ``rmr-version.yaml``
 
 rmr-python is the python binding to rmr . Installing rmr per the above does not install it.
 Bumping the rmr python version dependency requires changes in:
 
 1) ``setup.py``
 
 
 rmr-python is the python binding to rmr . Installing rmr per the above does not install it.
 Bumping the rmr python version dependency requires changes in:
 
 1) ``setup.py``
 
-2) ``integration_tests/Dockerfile``
+2) ``integration_tests/Dockerfile-test-delay-receiver``
 
 
-Run the integration tests after attempting this.
+3) ``integration_tests/Dockerfile-query-receiver``
 
 
+Run the integration tests after attempting this.
 
 Unit Testing
 ------------
 
 Unit Testing
 ------------
@@ -72,94 +77,24 @@ Alternatively, you can run the unit tests in Docker (this is somewhat less nice
 
 Integration testing
 -------------------
 
 Integration testing
 -------------------
-This tests A1’s external API with two test receivers. This depends on helm+k8s, meaning you cannot run this if this is not installed.
+This tests A1’s external API with three test receivers. This depends on helm+k8s.
 
 
-Unlike the unit tests, however, this does not require rmr to be installed on the base system, as everything
-runs in Docker, and the Dockerfiles provide/install rmr.
+Build all the containers:
 
 
-First, build the latest A1 you are testing (from the root):
 ::
 
 ::
 
-    docker build  --no-cache -t a1:latest .
+    docker build  -t a1:latest .; cd integration_tests/; docker build  -t testreceiver:latest . -f Dockerfile-test-delay-receiver; docker build -t queryreceiver:latest . -f Dockerfile-query-receiver; cd ..
 
 
-Note that this step also runs the unit tests, since running the unit tests are part of the Dockerfile for A1.
 
 
-If you've never run the integration tests before, build the test receiver, which is referenced in the helm chart:
-::
-
-    cd integration_tests
-    docker build  --no-cache -t testreceiver:latest .
+Then, run all the tests from the root (this requires the python packages ``tox``, ``pytest``, and ``tavern``).
 
 
-Finally, run all the tests from the root (this requires the python packages ``tox``, ``pytest``, and ``tavern``).
 ::
 
    tox -c tox-integration.ini
 
 This script:
 ::
 
    tox -c tox-integration.ini
 
 This script:
-1. Deploys 3 helm charts into a local kubernetes installation
+1. Deploys 2 helm charts (4 containers) into a local kubernetes installation
 2. Port forwards a pod ClusterIP to localhost
 3. Uses “tavern” to run some tests against the server
 4. Barrages the server with apache bench
 5. Tears everything down
 2. Port forwards a pod ClusterIP to localhost
 3. Uses “tavern” to run some tests against the server
 4. Barrages the server with apache bench
 5. Tears everything down
-
-Unless you're a core A1 developer, you should probably stop here. The below instructions
-are for running A1 locally, without docker, and is much more involved (however useful when developing a1).
-
-Running locally
----------------
-
-1. Before this will work, for the first time on that machine, run ``./install_deps.sh``
-
-2. It also requires rmr-python installed. (The dockerfile does this)
-
-3. Create a ``local.rt`` file and copy it into ``/opt/route/local.rt``.
-   Note, the example one in ``integration_tests`` will need to be modified for
-   your scenario and machine.
-
-4. Copy a ric manifest into ``/opt/ricmanifest.json`` and an rmr mapping
-   table into ``/opt/rmr_string_int_mapping.txt``. You can use the test
-   ones packaged if you want:
-
-   ::
-
-     cp tests/fixtures/ricmanifest.json /opt/ricmanifest.json
-     cp tests/fixtures/rmr_string_int_mapping.txt /opt/rmr_string_int_mapping.txt
-
-5. Then:
-
-   ::
-
-     sudo pip install -e .
-     set -x LD_LIBRARY_PATH /usr/local/lib/; set -x RMR_SEED_RT /opt/route/local.rt ; set -x RMR_RCV_RETRY_INTERVAL 500; set -x RMR_RETRY_TIMES 10;
-     /usr/bin/run.py
-
-
-There are also two test receivers in ``integration_tests`` you can run locally.
-The first is meant to be used with the ``control_admission`` policy
-(that comes in test fixture ric manifest):
-
-::
-
-   set -x LD_LIBRARY_PATH /usr/local/lib/; set -x RMR_SEED_RT /opt/route/local.rt ; python receiver.py
-
-The second can be used against the ``test_policy`` policy to test the
-async nature of A1, and to test race conditions. You can start it with
-several env variables as follows:
-
-::
-
-   set -x LD_LIBRARY_PATH /usr/local/lib/; set -x RMR_SEED_RT /opt/route/local.rt ; set -x TEST_RCV_PORT 4563; set -x TEST_RCV_RETURN_MINT 10001; set -x TEST_RCV_SEC_DELAY 5; set -x TEST_RCV_RETURN_PAYLOAD '{"ACK_FROM": "DELAYED_TEST", "status": "SUCCESS"}' ; python receiver.py
-
-To test the async nature of A1, trigger a call to ``test_policy``, which
-will target the delayed receiver, then immediately call
-``control_admission``. The ``control_admission`` policy return should be
-returned immediately, whereas the ``test_policy`` should return after
-about ``TEST_RCV_SEC_DELAY 5``. The ``test_policy`` should not block A1
-while it is sleeping, and both responses should be correct.
-
-::
-
-   curl -v -X PUT -H "Content-Type: application/json" -d '{}' localhost:10000/ric/policies/test_policy
-   curl -v -X PUT -H "Content-Type: application/json" -d '{ "enforce":true, "window_length":10, "blocking_rate":20, "trigger_threshold":10 }' localhost:10000/ric/policies/admission_control_policy
-   curl -v localhost:10000/ric/policies/admission_control_policy
-   curl -v localhost:10000/a1-p/healthcheck
index 485f5a8..002be1e 100644 (file)
@@ -14,16 +14,16 @@ and this project adheres to `Semantic Versioning <http://semver.org/>`__.
    :depth: 3
    :local:
 
    :depth: 3
    :local:
 
-[1.x.x] - TBD
+[x.x.x] - TBD
 -------------
 
 ::
 
 -------------
 
 ::
 
-    * Represents a resillent version of 1.0.0 that uses Redis for persistence
+    * Represents a resillent version of x.x.x that uses Redis for persistence
 
 
 
 
-[x.x.x] - TBD
--------------
+[2.0.0] - 12/9/2019
+-------------------
 
 ::
 
 
 ::
 
@@ -31,7 +31,7 @@ and this project adheres to `Semantic Versioning <http://semver.org/>`__.
     * Eliminates the barrier to deleting an instance when no xapp evdr replied (via timeouts)
     * Add two new ENV variables that control timeouts
     * Make unit tests more modular so new workflows can be tested easily
     * Eliminates the barrier to deleting an instance when no xapp evdr replied (via timeouts)
     * Add two new ENV variables that control timeouts
     * Make unit tests more modular so new workflows can be tested easily
-    * Changes the API for ../status to return a richer structure
+    * Fixes the API for ../status to return a richer structure. This is an (albeit tiny) API change.
     * Clean up unused items in the integration tests helm chart
     * Removed "RMR_RCV_RETRY_INTERVAL" leftovers since this isn't used anymore
     * Uses the standard RIC logging library
     * Clean up unused items in the integration tests helm chart
     * Removed "RMR_RCV_RETRY_INTERVAL" leftovers since this isn't used anymore
     * Uses the standard RIC logging library
@@ -39,6 +39,7 @@ and this project adheres to `Semantic Versioning <http://semver.org/>`__.
     * Given the above, policy type ids can be any valid 32bit greater than 0
     * Decouple the API between northbound and A1 from A1 with xapps. This is now two seperate OpenAPI files
     * Update example for AC Xapp
     * Given the above, policy type ids can be any valid 32bit greater than 0
     * Decouple the API between northbound and A1 from A1 with xapps. This is now two seperate OpenAPI files
     * Update example for AC Xapp
+    * Updgrade rmr and rmr-python to utilize new features; lots of cleanups because of that
 
 [1.0.4]
 -------
 
 [1.0.4]
 -------
diff --git a/integration_tests/Dockerfile-query-receiver b/integration_tests/Dockerfile-query-receiver
new file mode 100644 (file)
index 0000000..3c5ac28
--- /dev/null
@@ -0,0 +1,45 @@
+# ==================================================================================
+#       Copyright (c) 2019 Nokia
+#       Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+# ==================================================================================
+# install a well known working rmr
+FROM python:3.7-alpine
+RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git
+RUN git clone --branch 1.10.2 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
+    && cd rmr \
+    && mkdir build \
+    && cd build \
+    && cmake .. -DPACK_EXTERNALS=1 \
+    && make install
+
+# stage2
+FROM python:3.7-alpine
+
+# copies
+COPY --from=0 /usr/local/lib64/libnng.so /usr/local/lib64/libnng.so
+COPY --from=0 /usr/local/lib64/librmr_nng.so /usr/local/lib64/librmr_nng.so
+COPY query_tester.py /
+
+# Install RMr python bindings
+RUN pip install --upgrade pip
+RUN pip install rmr==2.2.0
+
+# rmr setups
+RUN mkdir -p /opt/route/
+ENV LD_LIBRARY_PATH /usr/local/lib:/usr/local/lib64
+ENV RMR_SEED_RT /opt/route/local.rt
+
+WORKDIR /
+CMD ["python","-u","query_tester.py"]
similarity index 94%
rename from integration_tests/Dockerfile
rename to integration_tests/Dockerfile-test-delay-receiver
index 87a8dce..4d09d1b 100644 (file)
@@ -17,7 +17,7 @@
 # install a well known working rmr
 FROM python:3.7-alpine
 RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git
 # install a well known working rmr
 FROM python:3.7-alpine
 RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git
-RUN git clone --branch 1.10.1 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
+RUN git clone --branch 1.10.2 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
     && cd rmr \
     && mkdir build \
     && cd build \
     && cd rmr \
     && mkdir build \
     && cd build \
@@ -34,7 +34,7 @@ COPY receiver.py /
 
 # Install RMr python bindings
 RUN pip install --upgrade pip
 
 # Install RMr python bindings
 RUN pip install --upgrade pip
-RUN pip install rmr==1.0.0
+RUN pip install rmr==2.2.0
 
 # rmr setups
 RUN mkdir -p /opt/route/
 
 # rmr setups
 RUN mkdir -p /opt/route/
index e59dc91..28d5951 100644 (file)
@@ -1,4 +1,4 @@
 apiVersion: v1
 description: A1 Helm chart for Kubernetes
 name: a1mediator
 apiVersion: v1
 description: A1 Helm chart for Kubernetes
 name: a1mediator
-version: 1.0.4
+version: 2.0.0
index 668ab2b..b611b85 100755 (executable)
@@ -1,4 +1,12 @@
 #!/bin/sh
 #!/bin/sh
+echo "\n\n a1" >> log.txt
 kubectl get pods --namespace=default | awk '{ print $1 }' | egrep '^a1-a1mediator-' | xargs kubectl logs  > log.txt 2>&1
 kubectl get pods --namespace=default | awk '{ print $1 }' | egrep '^a1-a1mediator-' | xargs kubectl logs  > log.txt 2>&1
+
+echo "\n\n test receiver" >> log.txt
 kubectl get pods --namespace=default | awk '{ print $1 }' | egrep '^testreceiver-' | xargs -I X kubectl logs X testreceiver  >> log.txt 2>&1
 kubectl get pods --namespace=default | awk '{ print $1 }' | egrep '^testreceiver-' | xargs -I X kubectl logs X testreceiver  >> log.txt 2>&1
+
+echo "\n\n delay" >> log.txt
 kubectl get pods --namespace=default | awk '{ print $1 }' | egrep '^testreceiver-' | xargs -I X kubectl logs X delayreceiver  >> log.txt 2>&1
 kubectl get pods --namespace=default | awk '{ print $1 }' | egrep '^testreceiver-' | xargs -I X kubectl logs X delayreceiver  >> log.txt 2>&1
+
+echo "\n\n query" >> log.txt
+kubectl get pods --namespace=default | awk '{ print $1 }' | egrep '^testreceiver-' | xargs -I X kubectl logs X queryreceiver  >> log.txt 2>&1
index 01c45b4..4a17c5d 100755 (executable)
@@ -1,5 +1,5 @@
 #!/bin/sh
 #!/bin/sh
-git clone --branch 1.9.0 https://gerrit.oran-osc.org/r/ric-plt/lib/rmr \
+git clone --branch 1.10.2 https://gerrit.oran-osc.org/r/ric-plt/lib/rmr \
     && cd rmr \
     && mkdir .build; cd .build; cmake .. -DPACK_EXTERNALS=1; sudo make install \
     && cd ../.. \
     && cd rmr \
     && mkdir .build; cd .build; cmake .. -DPACK_EXTERNALS=1; sudo make install \
     && cd ../.. \
diff --git a/integration_tests/query_tester.py b/integration_tests/query_tester.py
new file mode 100644 (file)
index 0000000..d8cf3ba
--- /dev/null
@@ -0,0 +1,84 @@
+# ==================================================================================
+#       Copyright (c) 2019 Nokia
+#       Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+# ==================================================================================
+"""
+Test receiver
+"""
+
+import time
+import json
+from rmr import rmr
+
+PORT = "4564"
+
+mrc = rmr.rmr_init(PORT.encode("utf-8"), rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
+test_type = 1006001
+
+while rmr.rmr_ready(mrc) == 0:
+    time.sleep(1)
+    print("not yet ready")
+
+print("listening ON {}".format(PORT))
+
+# loop
+while True:
+
+    # do query
+    pay = {"policy_type_id": test_type}
+    sbuf_send = rmr.rmr_alloc_msg(mrc, 4096, payload=json.dumps(pay).encode("utf-8"), gen_transaction_id=True, mtype=20012)
+    sbuf_send = rmr.rmr_send_msg(mrc, sbuf_send)
+    post_send_summary = rmr.message_summary(sbuf_send)
+
+    if not (post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK"):
+        print("was unable to send query to a1!")
+        time.sleep(1)
+    else:
+        # query worked, wait 2 seconds, then receive everything we have
+        time.sleep(1)
+        print("reading messages")
+
+        # this is a hacked up version of rmr_rcvall_msgs in the rmr package
+        # we need the actual messages, not the summaries, to use rts
+        sbuf_rcv = rmr.rmr_alloc_msg(mrc, 4096)  # allocate buffer to have something for a return status
+        while True:
+            sbuf_rcv = rmr.rmr_torcv_msg(mrc, sbuf_rcv, 0)  # set the timeout to 0 so this doesn't block!!
+
+            summary = rmr.message_summary(sbuf_rcv)
+            if summary["message status"] != "RMR_OK":  # ok indicates msg received, stop on all other states
+                print("no more instances received. will try again in 1s")
+                break
+
+            print("Received: {0}".format(summary))
+
+            received_payload = json.loads(summary["payload"])
+            assert received_payload["policy_type_id"] == test_type
+            assert summary["message type"] == 20010
+
+            payload = {
+                "policy_type_id": received_payload["policy_type_id"],
+                "policy_instance_id": received_payload["policy_instance_id"],
+                "handler_id": "query_tester",
+                "status": "OK",
+            }
+            val = json.dumps(payload).encode("utf-8")
+            rmr.set_payload_and_length(val, sbuf_rcv)  # TODO: extend rmr-python to allow rts to accept this param
+            sbuf_rcv.contents.mtype = 20011  # TODO: extend rmr-python to allow rts to accept this param
+            print("Pre reply summary: {}".format(rmr.message_summary(sbuf_rcv)))
+
+            # send ack
+            sbuf_rcv = rmr.rmr_rts_msg(mrc, sbuf_rcv)
+            post_reply_summary = rmr.message_summary(sbuf_rcv)
+            print("Post reply summary: {}".format(post_reply_summary))
index 4c1dea6..a025643 100644 (file)
@@ -464,6 +464,109 @@ stages:
 
 ---
 
 
 ---
 
+test_name: test query
+
+stages:
+  - name: type not there yet
+    request:
+      url: http://localhost:10000/a1-p/policytypes/1006001
+      method: GET
+    response:
+      status_code: 404
+
+  - name: put the type
+    request:
+      url: http://localhost:10000/a1-p/policytypes/1006001
+      method: PUT
+      json:
+        name: query test
+        description: test
+        policy_type_id: 1006001
+        create_schema:
+          "$schema": http://json-schema.org/draft-07/schema#
+          type: object
+          additionalProperties: false
+          properties:
+            foo:
+              type: string
+          required:
+            - foo
+    response:
+      status_code: 201
+
+  - name: type there now
+    request:
+      url: http://localhost:10000/a1-p/policytypes/1006001
+      method: GET
+    response:
+      status_code: 200
+
+  - name: instance list 200 but empty
+    request:
+      url: http://localhost:10000/a1-p/policytypes/1006001/policies
+      method: GET
+    response:
+      status_code: 200
+      body: []
+
+  - name: instance 1
+    request:
+      url: http://localhost:10000/a1-p/policytypes/1006001/policies/qt1
+      method: PUT
+      json:
+        foo: "bar1"
+      headers:
+        content-type: application/json
+    response:
+      status_code: 202
+
+  - name: instance 2
+    request:
+      url: http://localhost:10000/a1-p/policytypes/1006001/policies/qt2
+      method: PUT
+      json:
+        foo: "bar2"
+      headers:
+        content-type: application/json
+    response:
+      status_code: 202
+
+  - name: instance list
+    request:
+      url: http://localhost:10000/a1-p/policytypes/1006001/policies
+      method: GET
+    response:
+      status_code: 200
+      body: [qt1, qt2]
+
+  # after the query, a1 should send, query receiver should send back, and the policy should be in effect
+
+  - name: test the query status get
+    max_retries: 3
+    delay_before: 6  # give it a few seconds for rmr ; delay reciever sleeps for 5 seconds by default
+    request:
+      url: http://localhost:10000/a1-p/policytypes/1006001/policies/qt1/status
+      method: GET
+    response:
+      status_code: 200
+      body:
+        instance_status: "IN EFFECT"
+        has_been_deleted: False
+
+  - name: test the query status get 2
+    max_retries: 3
+    delay_before: 6  # give it a few seconds for rmr ; delay reciever sleeps for 5 seconds by default
+    request:
+      url: http://localhost:10000/a1-p/policytypes/1006001/policies/qt2/status
+      method: GET
+    response:
+      status_code: 200
+      body:
+        instance_status: "IN EFFECT"
+        has_been_deleted: False
+
+---
+
 test_name: test bad routing file endpoint
 
 stages:
 test_name: test bad routing file endpoint
 
 stages:
@@ -554,5 +657,3 @@ stages:
           type: object
     response:
       status_code: 400
           type: object
     response:
       status_code: 400
-
-
diff --git a/integration_tests/test_local.rt b/integration_tests/test_local.rt
deleted file mode 100644 (file)
index 6e61fef..0000000
+++ /dev/null
@@ -1,5 +0,0 @@
-newrt|start
-mse|20010|6660666|devarchwork:4560
-mse|20010|20001|devarchwork:4563
-rte|20011|devarchwork:4562
-newrt|end
index caf4338..e6f4801 100644 (file)
@@ -18,5 +18,19 @@ metadata:
 data:
   local.rt: |
     newrt|start
 data:
   local.rt: |
     newrt|start
+    # we actaully use rts so i dont even think this is used
     rte|20011|a1rmrservice:4562
     newrt|end
     rte|20011|a1rmrservice:4562
     newrt|end
+
+---
+
+apiVersion: v1
+kind: ConfigMap
+metadata:
+  name: queryreceiverconf
+data:
+  local.rt: |
+    newrt|start
+    # this query is initiated in the query receiver so this is certainly needed
+    rte|20012|a1rmrservice:4562
+    newrt|end
index 29be456..0a08fa3 100644 (file)
@@ -17,6 +17,22 @@ spec:
         app.kubernetes.io/instance: {{ .Release.Name }}
     spec:
       containers:
         app.kubernetes.io/instance: {{ .Release.Name }}
     spec:
       containers:
+        #query receiver
+        - name: queryreceiver
+          image: queryreceiver:latest
+          imagePullPolicy: Never
+          resources:
+            {{- toYaml .Values.resources | nindent 12 }}
+          volumeMounts:
+            - name: queryreceiverconf
+              mountPath: /opt/route/local.rt
+              subPath: local.rt
+          env:
+            # this sets the source field in messages from a1 to point back to a1s service name, rather than it's random pod name
+            - name: RMR_SRC_ID
+              value: {{ .Values.queryrmrservice.name }}
+
+        # test receiver
         - name: testreceiver
           image: testreceiver:latest
           imagePullPolicy: Never
         - name: testreceiver
           image: testreceiver:latest
           imagePullPolicy: Never
@@ -26,6 +42,8 @@ spec:
             - name: testreceiverconf
               mountPath: /opt/route/local.rt
               subPath: local.rt
             - name: testreceiverconf
               mountPath: /opt/route/local.rt
               subPath: local.rt
+
+        # test receiver that delays until sending
         - name: delayreceiver
           image: testreceiver:latest
           imagePullPolicy: Never
         - name: delayreceiver
           image: testreceiver:latest
           imagePullPolicy: Never
@@ -50,3 +68,6 @@ spec:
         - name: "delayreceiverconf"
           configMap:
             name: "delayreceiverconf"
         - name: "delayreceiverconf"
           configMap:
             name: "delayreceiverconf"
+        - name: "queryreceiverconf"
+          configMap:
+            name: "queryreceiverconf"
index 1df7654..ecb90a0 100644 (file)
@@ -31,3 +31,21 @@ spec:
   selector:
     app.kubernetes.io/name: {{ include "testreceiver.name" . }}
     app.kubernetes.io/instance: {{ .Release.Name }}
   selector:
     app.kubernetes.io/name: {{ include "testreceiver.name" . }}
     app.kubernetes.io/instance: {{ .Release.Name }}
+
+---
+
+apiVersion: v1
+kind: Service
+metadata:
+  name: {{ .Values.queryrmrservice.name }}
+  labels:
+{{ include "testreceiver.labels" . | indent 4 }}
+spec:
+  type: {{ .Values.queryrmrservice.type }}
+  ports:
+    - port: {{ .Values.queryrmrservice.port }}
+      targetPort: {{ .Values.queryrmrservice.port }}
+      protocol: TCP
+  selector:
+    app.kubernetes.io/name: {{ include "testreceiver.name" . }}
+    app.kubernetes.io/instance: {{ .Release.Name }}
index fcf6536..d53b7c7 100644 (file)
@@ -13,3 +13,8 @@ delayrmrservice:
   name: delayreceiverrmrservice
   type: ClusterIP
   port: 4563
   name: delayreceiverrmrservice
   type: ClusterIP
   port: 4563
+
+queryrmrservice:
+  name: queryreceiverrmrservice
+  type: ClusterIP
+  port: 4564
index e79b81c..0d9c301 100644 (file)
@@ -1,3 +1,3 @@
 # CI script installs RMR from PackageCloud using this version
 ---
 # CI script installs RMR from PackageCloud using this version
 ---
-version: 1.8.1
+version: 1.10.2
index d20a50f..c2dc3f6 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -18,13 +18,13 @@ from setuptools import setup, find_packages
 
 setup(
     name="a1",
 
 setup(
     name="a1",
-    version="1.0.4",
+    version="2.0.0",
     packages=find_packages(exclude=["tests.*", "tests"]),
     author="Tommy Carpenter",
     description="RIC A1 Mediator for policy/intent changes",
     url="https://gerrit.o-ran-sc.org/r/admin/repos/ric-plt/a1",
     entry_points={"console_scripts": ["run.py=a1.run:main"]},
     # we require jsonschema, should be in that list, but connexion already requires a specific version of it
     packages=find_packages(exclude=["tests.*", "tests"]),
     author="Tommy Carpenter",
     description="RIC A1 Mediator for policy/intent changes",
     url="https://gerrit.o-ran-sc.org/r/admin/repos/ric-plt/a1",
     entry_points={"console_scripts": ["run.py=a1.run:main"]},
     # we require jsonschema, should be in that list, but connexion already requires a specific version of it
-    install_requires=["requests", "Flask", "connexion[swagger-ui]", "gevent", "msgpack", "rmr>=1.0.0", "mdclogpy"],
+    install_requires=["requests", "Flask", "connexion[swagger-ui]", "gevent", "msgpack", "rmr>=2.2.0", "mdclogpy"],
     package_data={"a1": ["openapi.yaml"]},
 )
     package_data={"a1": ["openapi.yaml"]},
 )
index 971ce3f..971d547 100644 (file)
@@ -30,6 +30,7 @@ ADM_CTRL_POLICIES = "/a1-p/policytypes/{0}/policies".format(ADM_CRTL_TID)
 ADM_CTRL_INSTANCE = ADM_CTRL_POLICIES + "/" + ADM_CTRL_IID
 ADM_CTRL_INSTANCE_STATUS = ADM_CTRL_INSTANCE + "/status"
 ADM_CTRL_TYPE = "/a1-p/policytypes/{0}".format(ADM_CRTL_TID)
 ADM_CTRL_INSTANCE = ADM_CTRL_POLICIES + "/" + ADM_CTRL_IID
 ADM_CTRL_INSTANCE_STATUS = ADM_CTRL_INSTANCE + "/status"
 ADM_CTRL_TYPE = "/a1-p/policytypes/{0}".format(ADM_CRTL_TID)
+ACK_MT = 20011
 
 
 def _fake_dequeue():
 
 
 def _fake_dequeue():
@@ -37,8 +38,8 @@ def _fake_dequeue():
     pay = json.dumps(
         {"policy_type_id": ADM_CRTL_TID, "policy_instance_id": ADM_CTRL_IID, "handler_id": RCV_ID, "status": "OK"}
     ).encode()
     pay = json.dumps(
         {"policy_type_id": ADM_CRTL_TID, "policy_instance_id": ADM_CTRL_IID, "handler_id": RCV_ID, "status": "OK"}
     ).encode()
-    fake_msg = {"payload": pay}
-    return [fake_msg]
+    fake_msg = {"payload": pay, "message type": ACK_MT}
+    return [(fake_msg, None)]
 
 
 def _fake_dequeue_none():
 
 
 def _fake_dequeue_none():
@@ -49,31 +50,37 @@ def _fake_dequeue_none():
 def _fake_dequeue_deleted():
     """for monkeypatching  with a DELETED status"""
     new_msgs = []
 def _fake_dequeue_deleted():
     """for monkeypatching  with a DELETED status"""
     new_msgs = []
+    good_pay = json.dumps(
+        {"policy_type_id": ADM_CRTL_TID, "policy_instance_id": ADM_CTRL_IID, "handler_id": RCV_ID, "status": "DELETED"}
+    ).encode()
 
 
-    # non existent type
+    # non existent type id
     pay = json.dumps(
         {"policy_type_id": 911, "policy_instance_id": ADM_CTRL_IID, "handler_id": RCV_ID, "status": "DELETED"}
     ).encode()
     pay = json.dumps(
         {"policy_type_id": 911, "policy_instance_id": ADM_CTRL_IID, "handler_id": RCV_ID, "status": "DELETED"}
     ).encode()
-    fake_msg = {"payload": pay}
-    new_msgs.append(fake_msg)
+    fake_msg = {"payload": pay, "message type": ACK_MT}
+    new_msgs.append((fake_msg, None))
 
 
+    # bad instance id
     pay = json.dumps(
         {"policy_type_id": ADM_CRTL_TID, "policy_instance_id": "darkness", "handler_id": RCV_ID, "status": "DELETED"}
     ).encode()
     pay = json.dumps(
         {"policy_type_id": ADM_CRTL_TID, "policy_instance_id": "darkness", "handler_id": RCV_ID, "status": "DELETED"}
     ).encode()
-    fake_msg = {"payload": pay}
-    new_msgs.append(fake_msg)
+    fake_msg = {"payload": pay, "message type": ACK_MT}
+    new_msgs.append((fake_msg, None))
+
+    # good body but bad message type
+    fake_msg = {"payload": good_pay, "message type": ACK_MT * 3}
+    new_msgs.append((fake_msg, None))
 
     # insert a bad one with a malformed body to make sure we keep going
 
     # insert a bad one with a malformed body to make sure we keep going
-    new_msgs.append({"payload": "asdf"})
+    new_msgs.append(({"payload": "asdf", "message type": ACK_MT}, None))
 
     # not even a json
 
     # not even a json
-    new_msgs.append("asdf")
+    new_msgs.append(("asdf", None))
 
 
-    pay = json.dumps(
-        {"policy_type_id": ADM_CRTL_TID, "policy_instance_id": ADM_CTRL_IID, "handler_id": RCV_ID, "status": "DELETED"}
-    ).encode()
-    fake_msg = {"payload": pay}
-    new_msgs.append(fake_msg)
+    # good
+    fake_msg = {"payload": good_pay, "message type": ACK_MT}
+    new_msgs.append((fake_msg, None))
 
     return new_msgs
 
 
     return new_msgs
 
@@ -83,27 +90,6 @@ def _test_put_patch(monkeypatch):
     # assert that rmr bad states don't cause problems
     monkeypatch.setattr("rmr.rmr.rmr_send_msg", rmr_mocks.send_mock_generator(10))
 
     # assert that rmr bad states don't cause problems
     monkeypatch.setattr("rmr.rmr.rmr_send_msg", rmr_mocks.send_mock_generator(10))
 
-    # we need this because free expects a real sbuf
-    # TODO: move this into rmr_mocks
-    def noop(_sbuf):
-        pass
-
-    monkeypatch.setattr("rmr.rmr.rmr_free_msg", noop)
-
-    # we need to repatch alloc (already patched in patch_rmr) to fix the transactionid, alloc is called in send and recieve
-    def fake_alloc(_unused1, _unused2, _unused3, _unused4, _unused5):
-        sbuf = rmr_mocks.Rmr_mbuf_t()
-        sbuf.contents.xaction = b"d49b53e478b711e9a1130242ac110002"
-        return sbuf
-
-    # we also need to repatch set, since in the send function, we alloc, then set a new transid
-    def fake_set_transactionid(sbuf):
-        sbuf.contents.xaction = b"d49b53e478b711e9a1130242ac110002"
-
-    # Note, we could have just patched summary, but this patches at a "lower level" so is a better test
-    monkeypatch.setattr("rmr.rmr.rmr_alloc_msg", fake_alloc)
-    monkeypatch.setattr("rmr.rmr.generate_and_set_transaction_id", fake_set_transactionid)
-
 
 def _no_ac(client):
     # no type there yet
 
 def _no_ac(client):
     # no type there yet