A1 2.0.0: 07/1907/13 2.0.0
authorTommy Carpenter <tc677g@att.com>
Mon, 2 Dec 2019 16:02:01 +0000 (11:02 -0500)
committerTommy Carpenter <tc677g@att.com>
Mon, 9 Dec 2019 15:00:47 +0000 (10:00 -0500)
    * Implements new logic around when instances are deleted. See flowcharts in docs/. Basically timeouts now trigger to actually delete instances from a1s database, and these timeouts are configurable.
    * Eliminates the barrier to deleting an instance when no xapp evdr replied (via timeouts)
    * Add two new ENV variables that control timeouts
    * Make unit tests more modular so new workflows can be tested easily
    * Fixes the API for ../status to return a richer structure. This is an (albeit tiny) API change.
    * Clean up unused items in the integration tests helm chart
    * Removed "RMR_RCV_RETRY_INTERVAL" leftovers since this isn't used anymore
    * Uses the standard RIC logging library
    * Switch the backend routing scheme to using subscription id with constant message types, per request.
    * Given the above, policy type ids can be any valid 32bit greater than 0
    * Decouple the API between northbound and A1 from A1 with xapps. This is now two seperate OpenAPI files
    * Update example for AC Xapp
    * Updgrade rmr and rmr-python to utilize new features; lots of cleanups because of that

Change-Id: Ie7812607244cbcc484fe14c60fe27371e7e65082
Signed-off-by: Tommy Carpenter <tc677g@att.com>
24 files changed:
Dockerfile
Dockerfile-Unit-Test
a1/a1rmr.py
a1/controller.py
a1/messages.py [new file with mode: 0644]
a1/openapi.yaml
container-tag.yaml
docs/developer-guide.rst
docs/release-notes.rst
integration_tests/Dockerfile-query-receiver [new file with mode: 0644]
integration_tests/Dockerfile-test-delay-receiver [moved from integration_tests/Dockerfile with 94% similarity]
integration_tests/a1mediator/Chart.yaml
integration_tests/getlogs.sh
integration_tests/install_deps.sh
integration_tests/query_tester.py [new file with mode: 0644]
integration_tests/test_a1.tavern.yaml
integration_tests/test_local.rt [deleted file]
integration_tests/testreceiver/templates/config.yaml
integration_tests/testreceiver/templates/deployment.yaml
integration_tests/testreceiver/templates/service.yaml
integration_tests/testreceiver/values.yaml
rmr-version.yaml
setup.py
tests/test_controller.py

index c387a33..2faaca5 100644 (file)
@@ -17,7 +17,7 @@
 # install a well known working rmr
 FROM python:3.7-alpine
 RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git
-RUN git clone --branch 1.10.1 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
+RUN git clone --branch 1.10.2 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
     && cd rmr \
     && mkdir build \
     && cd build \
index b5d524c..dbb5401 100644 (file)
@@ -17,7 +17,7 @@
 # install a well known working rmr
 FROM python:3.7-alpine
 RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git
-RUN git clone --branch 1.10.1 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
+RUN git clone --branch 1.10.2 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
     && cd rmr \
     && mkdir build \
     && cd build \
@@ -44,4 +44,4 @@ COPY setup.py tox.ini /tmp/
 WORKDIR /tmp
 
 # Run the unit tests
-RUN tox -e py37, flake8
+RUN tox -e py37,flake8
index ae2bf00..2e5dace 100644 (file)
@@ -24,7 +24,7 @@ import json
 from threading import Thread
 from rmr import rmr, helpers
 from mdclogpy import Logger
-from a1 import data
+from a1 import data, messages
 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
 
 mdc_logger = Logger(name=__name__)
@@ -53,7 +53,7 @@ class _RmrLoop:
         self.keep_going = True
         self.rcv_func = None
         self.last_ran = time.time()
-        self.work_queue = queue.Queue()  # thread safe queue https://docs.python.org/3/library/queue.html
+        self.instance_send_queue = queue.Queue()  # thread safe queue https://docs.python.org/3/library/queue.html
 
         # intialize rmr context
         if init_func_override:
@@ -68,15 +68,52 @@ class _RmrLoop:
                 time.sleep(0.5)
 
         # set the receive function
-        # TODO: when policy query is implemented, add A1_POLICY_QUERY
         self.rcv_func = (
-            rcv_func_override if rcv_func_override else lambda: helpers.rmr_rcvall_msgs(self.mrc, [A1_POLICY_RESPONSE])
+            rcv_func_override
+            if rcv_func_override
+            else lambda: helpers.rmr_rcvall_msgs_raw(self.mrc, [A1_POLICY_RESPONSE, A1_POLICY_QUERY])
         )
 
         # start the work loop
         self.thread = Thread(target=self.loop)
         self.thread.start()
 
+    def _assert_good_send(self, sbuf, pre_send_summary):
+        """
+        common helper function for _send_msg and _rts_msg
+        """
+        post_send_summary = rmr.message_summary(sbuf)
+        if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
+            return True
+        mdc_logger.debug("Message NOT sent!")
+        mdc_logger.debug("Pre-send summary: {0}, Post-send summary: {1}".format(pre_send_summary, post_send_summary))
+        return False
+
+    def _send_msg(self, pay, mtype, subid):
+        """
+        sends a msg
+        """
+        for _ in range(0, RETRY_TIMES):
+            sbuf = rmr.rmr_alloc_msg(self.mrc, len(pay), payload=pay, gen_transaction_id=True, mtype=mtype, sub_id=subid)
+            sbuf.contents.sub_id = subid
+            pre_send_summary = rmr.message_summary(sbuf)
+            sbuf = rmr.rmr_send_msg(self.mrc, sbuf)  # send
+            if self._assert_good_send(sbuf, pre_send_summary):
+                rmr.rmr_free_msg(sbuf)  # free
+                break
+
+    def _rts_msg(self, pay, sbuf_rts, mtype):
+        """
+        sends a message using rts
+        we do not call free here because we may rts many times; it is called after the rts loop
+        """
+        for _ in range(0, RETRY_TIMES):
+            pre_send_summary = rmr.message_summary(sbuf_rts)
+            sbuf_rts = rmr.rmr_rts_msg(self.mrc, sbuf_rts, payload=pay, mtype=mtype)
+            if self._assert_good_send(sbuf_rts, pre_send_summary):
+                break
+        return sbuf_rts  # in some cases rts may return a new sbuf
+
     def loop(self):
         """
         This loop runs forever, and has 3 jobs:
@@ -89,38 +126,52 @@ class _RmrLoop:
         while self.keep_going:
 
             # send out all messages waiting for us
-            while not self.work_queue.empty():
-                work_item = self.work_queue.get(block=False, timeout=None)
-
-                pay = work_item["payload"].encode("utf-8")
-                for _ in range(0, RETRY_TIMES):
-                    # Waiting on an rmr bugfix regarding the over-allocation: https://rancodev.atlassian.net/browse/RICPLT-2490
-                    sbuf = rmr.rmr_alloc_msg(self.mrc, 4096, pay, True, A1_POLICY_REQUEST)
-                    # TODO: after next rmr is released, this can be done in the alloc call. but that's not avail in pypi yet
-                    sbuf.contents.sub_id = work_item["ptid"]
-                    pre_send_summary = rmr.message_summary(sbuf)
-                    sbuf = rmr.rmr_send_msg(self.mrc, sbuf)  # send
-                    post_send_summary = rmr.message_summary(sbuf)
-                    mdc_logger.debug(
-                        "Pre-send summary: {0}, Post-send summary: {1}".format(pre_send_summary, post_send_summary)
-                    )
-                    rmr.rmr_free_msg(sbuf)  # free
-                    if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
-                        mdc_logger.debug("Message sent successfully!")
-                        break
-
-            # read our mailbox and update statuses
-            for msg in self.rcv_func():
+            while not self.instance_send_queue.empty():
+                work_item = self.instance_send_queue.get(block=False, timeout=None)
+                payload = json.dumps(messages.a1_to_handler(*work_item)).encode("utf-8")
+                self._send_msg(payload, A1_POLICY_REQUEST, work_item[1])
+
+            # read our mailbox
+            for (msg, sbuf) in self.rcv_func():
+                # TODO: in the future we may also have to catch SDL errors
                 try:
-                    pay = json.loads(msg["payload"])
-                    pti = pay["policy_type_id"]
-                    pii = pay["policy_instance_id"]
-                    data.set_policy_instance_status(pti, pii, pay["handler_id"], pay["status"])
-                except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError, TypeError, json.decoder.JSONDecodeError):
-                    # TODO: in the future we may also have to catch SDL errors
-                    mdc_logger.debug("Dropping malformed or non applicable message: {0}".format(msg))
-
-            # TODO: what's a reasonable sleep time? we don't want to hammer redis too much, and a1 isn't a real time component
+                    mtype = msg["message type"]
+                except (KeyError, TypeError, json.decoder.JSONDecodeError):
+                    mdc_logger.debug("Dropping malformed policy ack/query message: {0}".format(msg))
+
+                if mtype == A1_POLICY_RESPONSE:
+                    try:
+                        # got a policy response, update status
+                        pay = json.loads(msg["payload"])
+                        data.set_policy_instance_status(
+                            pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]
+                        )
+                        mdc_logger.debug("Successfully received status update: {0}".format(pay))
+                    except (PolicyTypeNotFound, PolicyInstanceNotFound):
+                        mdc_logger.debug("Received a response  for a non-existent instance")
+                    except (KeyError, TypeError, json.decoder.JSONDecodeError):
+                        mdc_logger.debug("Dropping malformed policy ack message: {0}".format(msg))
+
+                elif mtype == A1_POLICY_QUERY:
+                    try:
+                        # got a query, do a lookup and send out all instances
+                        pti = json.loads(msg["payload"])["policy_type_id"]
+                        mdc_logger.debug("Received query for: {0}".format(pti))
+                        for pii in data.get_instance_list(pti):
+                            instance = data.get_policy_instance(pti, pii)
+                            payload = json.dumps(messages.a1_to_handler("CREATE", pti, pii, instance)).encode("utf-8")
+                            sbuf = self._rts_msg(payload, sbuf, A1_POLICY_REQUEST)
+                    except (PolicyTypeNotFound, PolicyInstanceNotFound):
+                        mdc_logger.debug("Received a query for a non-existent type: {0}".format(msg))
+                    except (KeyError, TypeError, json.decoder.JSONDecodeError):
+                        mdc_logger.debug("Dropping malformed policy query message: {0}".format(msg))
+
+                else:
+                    mdc_logger.debug("Received message type {0} but A1 does not handle this".format(mtype))
+
+                # we must free each sbuf
+                rmr.rmr_free_msg(sbuf)
+
             self.last_ran = time.time()
             time.sleep(1)
 
@@ -144,12 +195,12 @@ def stop_rmr_thread():
     __RMR_LOOP__.keep_going = False
 
 
-def queue_work(item):
+def queue_instance_send(item):
     """
     push an item into the work queue
     currently the only type of work is to send out messages
     """
-    __RMR_LOOP__.work_queue.put(item)
+    __RMR_LOOP__.instance_send_queue.put(item)
 
 
 def healthcheck_rmr_thread(seconds=30):
index 1022320..cde7483 100644 (file)
@@ -17,7 +17,6 @@ Main a1 controller
 #   See the License for the specific language governing permissions and
 #   limitations under the License.
 # ==================================================================================
-import json
 from flask import Response
 from jsonschema import validate
 from jsonschema.exceptions import ValidationError
@@ -45,18 +44,6 @@ def _try_func_return(func):
         return Response(status=500)
 
 
-def _gen_body_to_handler(operation, policy_type_id, policy_instance_id, payload=None):
-    """
-    used to create the payloads that get sent to downstream policy handlers
-    """
-    return {
-        "operation": operation,
-        "policy_type_id": policy_type_id,
-        "policy_instance_id": policy_instance_id,
-        "payload": payload,
-    }
-
-
 # Healthcheck
 
 
@@ -164,9 +151,8 @@ def create_or_replace_policy_instance(policy_type_id, policy_instance_id):
         # store the instance
         data.store_policy_instance(policy_type_id, policy_instance_id, instance)
 
-        # send rmr (best effort)
-        body = _gen_body_to_handler("CREATE", policy_type_id, policy_instance_id, payload=instance)
-        a1rmr.queue_work({"payload": json.dumps(body), "ptid": policy_type_id})
+        # queue rmr send (best effort)
+        a1rmr.queue_instance_send(("CREATE", policy_type_id, policy_instance_id, instance))
 
         return "", 202
 
@@ -185,9 +171,8 @@ def delete_policy_instance(policy_type_id, policy_instance_id):
         """
         data.delete_policy_instance(policy_type_id, policy_instance_id)
 
-        # send rmr (best effort)
-        body = _gen_body_to_handler("DELETE", policy_type_id, policy_instance_id)
-        a1rmr.queue_work({"payload": json.dumps(body), "ptid": policy_type_id})
+        # queue rmr send (best effort)
+        a1rmr.queue_instance_send(("DELETE", policy_type_id, policy_instance_id, ""))
 
         return "", 202
 
diff --git a/a1/messages.py b/a1/messages.py
new file mode 100644 (file)
index 0000000..cbcea08
--- /dev/null
@@ -0,0 +1,31 @@
+"""
+rmr messages
+"""
+# ==================================================================================
+#       Copyright (c) 2019 Nokia
+#       Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+# ==================================================================================
+
+
+def a1_to_handler(operation, policy_type_id, policy_instance_id, payload=None):
+    """
+    used to create the payloads that get sent to downstream policy handlers
+    """
+    return {
+        "operation": operation,
+        "policy_type_id": policy_type_id,
+        "policy_instance_id": policy_instance_id,
+        "payload": payload,
+    }
index c8c0bd9..f35a742 100644 (file)
@@ -16,7 +16,7 @@
 # ==================================================================================
 openapi: 3.0.0
 info:
-  version: 1.1.0
+  version: 2.0.0
   title: RIC A1
 paths:
   '/a1-p/healthcheck':
index 2bb40a9..34c7e0c 100644 (file)
@@ -1,4 +1,4 @@
 # The Jenkins job uses this string for the tag in the image name
 # for example nexus3.o-ran-sc.org:10004/my-image-name:my-tag
 ---
-tag: 1.0.4
+tag: 2.0.0
index 38722da..e3e47c5 100644 (file)
@@ -42,17 +42,22 @@ rmr is a critical dependency of A1. Bumping the rmr version dependency requires
 
 2) ``Dockerfile-Unit-Test``
 
-3) ``integration_tests/Dockerfile``
+3) ``integration_tests/Dockerfile-test-delay-receiver``
+
+4) ``integration_tests/Dockerfile-query-receiver``
+
+5) ``rmr-version.yaml``
 
 rmr-python is the python binding to rmr . Installing rmr per the above does not install it.
 Bumping the rmr python version dependency requires changes in:
 
 1) ``setup.py``
 
-2) ``integration_tests/Dockerfile``
+2) ``integration_tests/Dockerfile-test-delay-receiver``
 
-Run the integration tests after attempting this.
+3) ``integration_tests/Dockerfile-query-receiver``
 
+Run the integration tests after attempting this.
 
 Unit Testing
 ------------
@@ -72,94 +77,24 @@ Alternatively, you can run the unit tests in Docker (this is somewhat less nice
 
 Integration testing
 -------------------
-This tests A1’s external API with two test receivers. This depends on helm+k8s, meaning you cannot run this if this is not installed.
+This tests A1’s external API with three test receivers. This depends on helm+k8s.
 
-Unlike the unit tests, however, this does not require rmr to be installed on the base system, as everything
-runs in Docker, and the Dockerfiles provide/install rmr.
+Build all the containers:
 
-First, build the latest A1 you are testing (from the root):
 ::
 
-    docker build  --no-cache -t a1:latest .
+    docker build  -t a1:latest .; cd integration_tests/; docker build  -t testreceiver:latest . -f Dockerfile-test-delay-receiver; docker build -t queryreceiver:latest . -f Dockerfile-query-receiver; cd ..
 
-Note that this step also runs the unit tests, since running the unit tests are part of the Dockerfile for A1.
 
-If you've never run the integration tests before, build the test receiver, which is referenced in the helm chart:
-::
-
-    cd integration_tests
-    docker build  --no-cache -t testreceiver:latest .
+Then, run all the tests from the root (this requires the python packages ``tox``, ``pytest``, and ``tavern``).
 
-Finally, run all the tests from the root (this requires the python packages ``tox``, ``pytest``, and ``tavern``).
 ::
 
    tox -c tox-integration.ini
 
 This script:
-1. Deploys 3 helm charts into a local kubernetes installation
+1. Deploys 2 helm charts (4 containers) into a local kubernetes installation
 2. Port forwards a pod ClusterIP to localhost
 3. Uses “tavern” to run some tests against the server
 4. Barrages the server with apache bench
 5. Tears everything down
-
-Unless you're a core A1 developer, you should probably stop here. The below instructions
-are for running A1 locally, without docker, and is much more involved (however useful when developing a1).
-
-Running locally
----------------
-
-1. Before this will work, for the first time on that machine, run ``./install_deps.sh``
-
-2. It also requires rmr-python installed. (The dockerfile does this)
-
-3. Create a ``local.rt`` file and copy it into ``/opt/route/local.rt``.
-   Note, the example one in ``integration_tests`` will need to be modified for
-   your scenario and machine.
-
-4. Copy a ric manifest into ``/opt/ricmanifest.json`` and an rmr mapping
-   table into ``/opt/rmr_string_int_mapping.txt``. You can use the test
-   ones packaged if you want:
-
-   ::
-
-     cp tests/fixtures/ricmanifest.json /opt/ricmanifest.json
-     cp tests/fixtures/rmr_string_int_mapping.txt /opt/rmr_string_int_mapping.txt
-
-5. Then:
-
-   ::
-
-     sudo pip install -e .
-     set -x LD_LIBRARY_PATH /usr/local/lib/; set -x RMR_SEED_RT /opt/route/local.rt ; set -x RMR_RCV_RETRY_INTERVAL 500; set -x RMR_RETRY_TIMES 10;
-     /usr/bin/run.py
-
-
-There are also two test receivers in ``integration_tests`` you can run locally.
-The first is meant to be used with the ``control_admission`` policy
-(that comes in test fixture ric manifest):
-
-::
-
-   set -x LD_LIBRARY_PATH /usr/local/lib/; set -x RMR_SEED_RT /opt/route/local.rt ; python receiver.py
-
-The second can be used against the ``test_policy`` policy to test the
-async nature of A1, and to test race conditions. You can start it with
-several env variables as follows:
-
-::
-
-   set -x LD_LIBRARY_PATH /usr/local/lib/; set -x RMR_SEED_RT /opt/route/local.rt ; set -x TEST_RCV_PORT 4563; set -x TEST_RCV_RETURN_MINT 10001; set -x TEST_RCV_SEC_DELAY 5; set -x TEST_RCV_RETURN_PAYLOAD '{"ACK_FROM": "DELAYED_TEST", "status": "SUCCESS"}' ; python receiver.py
-
-To test the async nature of A1, trigger a call to ``test_policy``, which
-will target the delayed receiver, then immediately call
-``control_admission``. The ``control_admission`` policy return should be
-returned immediately, whereas the ``test_policy`` should return after
-about ``TEST_RCV_SEC_DELAY 5``. The ``test_policy`` should not block A1
-while it is sleeping, and both responses should be correct.
-
-::
-
-   curl -v -X PUT -H "Content-Type: application/json" -d '{}' localhost:10000/ric/policies/test_policy
-   curl -v -X PUT -H "Content-Type: application/json" -d '{ "enforce":true, "window_length":10, "blocking_rate":20, "trigger_threshold":10 }' localhost:10000/ric/policies/admission_control_policy
-   curl -v localhost:10000/ric/policies/admission_control_policy
-   curl -v localhost:10000/a1-p/healthcheck
index 485f5a8..002be1e 100644 (file)
@@ -14,16 +14,16 @@ and this project adheres to `Semantic Versioning <http://semver.org/>`__.
    :depth: 3
    :local:
 
-[1.x.x] - TBD
+[x.x.x] - TBD
 -------------
 
 ::
 
-    * Represents a resillent version of 1.0.0 that uses Redis for persistence
+    * Represents a resillent version of x.x.x that uses Redis for persistence
 
 
-[x.x.x] - TBD
--------------
+[2.0.0] - 12/9/2019
+-------------------
 
 ::
 
@@ -31,7 +31,7 @@ and this project adheres to `Semantic Versioning <http://semver.org/>`__.
     * Eliminates the barrier to deleting an instance when no xapp evdr replied (via timeouts)
     * Add two new ENV variables that control timeouts
     * Make unit tests more modular so new workflows can be tested easily
-    * Changes the API for ../status to return a richer structure
+    * Fixes the API for ../status to return a richer structure. This is an (albeit tiny) API change.
     * Clean up unused items in the integration tests helm chart
     * Removed "RMR_RCV_RETRY_INTERVAL" leftovers since this isn't used anymore
     * Uses the standard RIC logging library
@@ -39,6 +39,7 @@ and this project adheres to `Semantic Versioning <http://semver.org/>`__.
     * Given the above, policy type ids can be any valid 32bit greater than 0
     * Decouple the API between northbound and A1 from A1 with xapps. This is now two seperate OpenAPI files
     * Update example for AC Xapp
+    * Updgrade rmr and rmr-python to utilize new features; lots of cleanups because of that
 
 [1.0.4]
 -------
diff --git a/integration_tests/Dockerfile-query-receiver b/integration_tests/Dockerfile-query-receiver
new file mode 100644 (file)
index 0000000..3c5ac28
--- /dev/null
@@ -0,0 +1,45 @@
+# ==================================================================================
+#       Copyright (c) 2019 Nokia
+#       Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+# ==================================================================================
+# install a well known working rmr
+FROM python:3.7-alpine
+RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git
+RUN git clone --branch 1.10.2 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
+    && cd rmr \
+    && mkdir build \
+    && cd build \
+    && cmake .. -DPACK_EXTERNALS=1 \
+    && make install
+
+# stage2
+FROM python:3.7-alpine
+
+# copies
+COPY --from=0 /usr/local/lib64/libnng.so /usr/local/lib64/libnng.so
+COPY --from=0 /usr/local/lib64/librmr_nng.so /usr/local/lib64/librmr_nng.so
+COPY query_tester.py /
+
+# Install RMr python bindings
+RUN pip install --upgrade pip
+RUN pip install rmr==2.2.0
+
+# rmr setups
+RUN mkdir -p /opt/route/
+ENV LD_LIBRARY_PATH /usr/local/lib:/usr/local/lib64
+ENV RMR_SEED_RT /opt/route/local.rt
+
+WORKDIR /
+CMD ["python","-u","query_tester.py"]
similarity index 94%
rename from integration_tests/Dockerfile
rename to integration_tests/Dockerfile-test-delay-receiver
index 87a8dce..4d09d1b 100644 (file)
@@ -17,7 +17,7 @@
 # install a well known working rmr
 FROM python:3.7-alpine
 RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git
-RUN git clone --branch 1.10.1 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
+RUN git clone --branch 1.10.2 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
     && cd rmr \
     && mkdir build \
     && cd build \
@@ -34,7 +34,7 @@ COPY receiver.py /
 
 # Install RMr python bindings
 RUN pip install --upgrade pip
-RUN pip install rmr==1.0.0
+RUN pip install rmr==2.2.0
 
 # rmr setups
 RUN mkdir -p /opt/route/
index e59dc91..28d5951 100644 (file)
@@ -1,4 +1,4 @@
 apiVersion: v1
 description: A1 Helm chart for Kubernetes
 name: a1mediator
-version: 1.0.4
+version: 2.0.0
index 668ab2b..b611b85 100755 (executable)
@@ -1,4 +1,12 @@
 #!/bin/sh
+echo "\n\n a1" >> log.txt
 kubectl get pods --namespace=default | awk '{ print $1 }' | egrep '^a1-a1mediator-' | xargs kubectl logs  > log.txt 2>&1
+
+echo "\n\n test receiver" >> log.txt
 kubectl get pods --namespace=default | awk '{ print $1 }' | egrep '^testreceiver-' | xargs -I X kubectl logs X testreceiver  >> log.txt 2>&1
+
+echo "\n\n delay" >> log.txt
 kubectl get pods --namespace=default | awk '{ print $1 }' | egrep '^testreceiver-' | xargs -I X kubectl logs X delayreceiver  >> log.txt 2>&1
+
+echo "\n\n query" >> log.txt
+kubectl get pods --namespace=default | awk '{ print $1 }' | egrep '^testreceiver-' | xargs -I X kubectl logs X queryreceiver  >> log.txt 2>&1
index 01c45b4..4a17c5d 100755 (executable)
@@ -1,5 +1,5 @@
 #!/bin/sh
-git clone --branch 1.9.0 https://gerrit.oran-osc.org/r/ric-plt/lib/rmr \
+git clone --branch 1.10.2 https://gerrit.oran-osc.org/r/ric-plt/lib/rmr \
     && cd rmr \
     && mkdir .build; cd .build; cmake .. -DPACK_EXTERNALS=1; sudo make install \
     && cd ../.. \
diff --git a/integration_tests/query_tester.py b/integration_tests/query_tester.py
new file mode 100644 (file)
index 0000000..d8cf3ba
--- /dev/null
@@ -0,0 +1,84 @@
+# ==================================================================================
+#       Copyright (c) 2019 Nokia
+#       Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#          http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+# ==================================================================================
+"""
+Test receiver
+"""
+
+import time
+import json
+from rmr import rmr
+
+PORT = "4564"
+
+mrc = rmr.rmr_init(PORT.encode("utf-8"), rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
+test_type = 1006001
+
+while rmr.rmr_ready(mrc) == 0:
+    time.sleep(1)
+    print("not yet ready")
+
+print("listening ON {}".format(PORT))
+
+# loop
+while True:
+
+    # do query
+    pay = {"policy_type_id": test_type}
+    sbuf_send = rmr.rmr_alloc_msg(mrc, 4096, payload=json.dumps(pay).encode("utf-8"), gen_transaction_id=True, mtype=20012)
+    sbuf_send = rmr.rmr_send_msg(mrc, sbuf_send)
+    post_send_summary = rmr.message_summary(sbuf_send)
+
+    if not (post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK"):
+        print("was unable to send query to a1!")
+        time.sleep(1)
+    else:
+        # query worked, wait 2 seconds, then receive everything we have
+        time.sleep(1)
+        print("reading messages")
+
+        # this is a hacked up version of rmr_rcvall_msgs in the rmr package
+        # we need the actual messages, not the summaries, to use rts
+        sbuf_rcv = rmr.rmr_alloc_msg(mrc, 4096)  # allocate buffer to have something for a return status
+        while True:
+            sbuf_rcv = rmr.rmr_torcv_msg(mrc, sbuf_rcv, 0)  # set the timeout to 0 so this doesn't block!!
+
+            summary = rmr.message_summary(sbuf_rcv)
+            if summary["message status"] != "RMR_OK":  # ok indicates msg received, stop on all other states
+                print("no more instances received. will try again in 1s")
+                break
+
+            print("Received: {0}".format(summary))
+
+            received_payload = json.loads(summary["payload"])
+            assert received_payload["policy_type_id"] == test_type
+            assert summary["message type"] == 20010
+
+            payload = {
+                "policy_type_id": received_payload["policy_type_id"],
+                "policy_instance_id": received_payload["policy_instance_id"],
+                "handler_id": "query_tester",
+                "status": "OK",
+            }
+            val = json.dumps(payload).encode("utf-8")
+            rmr.set_payload_and_length(val, sbuf_rcv)  # TODO: extend rmr-python to allow rts to accept this param
+            sbuf_rcv.contents.mtype = 20011  # TODO: extend rmr-python to allow rts to accept this param
+            print("Pre reply summary: {}".format(rmr.message_summary(sbuf_rcv)))
+
+            # send ack
+            sbuf_rcv = rmr.rmr_rts_msg(mrc, sbuf_rcv)
+            post_reply_summary = rmr.message_summary(sbuf_rcv)
+            print("Post reply summary: {}".format(post_reply_summary))
index 4c1dea6..a025643 100644 (file)
@@ -464,6 +464,109 @@ stages:
 
 ---
 
+test_name: test query
+
+stages:
+  - name: type not there yet
+    request:
+      url: http://localhost:10000/a1-p/policytypes/1006001
+      method: GET
+    response:
+      status_code: 404
+
+  - name: put the type
+    request:
+      url: http://localhost:10000/a1-p/policytypes/1006001
+      method: PUT
+      json:
+        name: query test
+        description: test
+        policy_type_id: 1006001
+        create_schema:
+          "$schema": http://json-schema.org/draft-07/schema#
+          type: object
+          additionalProperties: false
+          properties:
+            foo:
+              type: string
+          required:
+            - foo
+    response:
+      status_code: 201
+
+  - name: type there now
+    request:
+      url: http://localhost:10000/a1-p/policytypes/1006001
+      method: GET
+    response:
+      status_code: 200
+
+  - name: instance list 200 but empty
+    request:
+      url: http://localhost:10000/a1-p/policytypes/1006001/policies
+      method: GET
+    response:
+      status_code: 200
+      body: []
+
+  - name: instance 1
+    request:
+      url: http://localhost:10000/a1-p/policytypes/1006001/policies/qt1
+      method: PUT
+      json:
+        foo: "bar1"
+      headers:
+        content-type: application/json
+    response:
+      status_code: 202
+
+  - name: instance 2
+    request:
+      url: http://localhost:10000/a1-p/policytypes/1006001/policies/qt2
+      method: PUT
+      json:
+        foo: "bar2"
+      headers:
+        content-type: application/json
+    response:
+      status_code: 202
+
+  - name: instance list
+    request:
+      url: http://localhost:10000/a1-p/policytypes/1006001/policies
+      method: GET
+    response:
+      status_code: 200
+      body: [qt1, qt2]
+
+  # after the query, a1 should send, query receiver should send back, and the policy should be in effect
+
+  - name: test the query status get
+    max_retries: 3
+    delay_before: 6  # give it a few seconds for rmr ; delay reciever sleeps for 5 seconds by default
+    request:
+      url: http://localhost:10000/a1-p/policytypes/1006001/policies/qt1/status
+      method: GET
+    response:
+      status_code: 200
+      body:
+        instance_status: "IN EFFECT"
+        has_been_deleted: False
+
+  - name: test the query status get 2
+    max_retries: 3
+    delay_before: 6  # give it a few seconds for rmr ; delay reciever sleeps for 5 seconds by default
+    request:
+      url: http://localhost:10000/a1-p/policytypes/1006001/policies/qt2/status
+      method: GET
+    response:
+      status_code: 200
+      body:
+        instance_status: "IN EFFECT"
+        has_been_deleted: False
+
+---
+
 test_name: test bad routing file endpoint
 
 stages:
@@ -554,5 +657,3 @@ stages:
           type: object
     response:
       status_code: 400
-
-
diff --git a/integration_tests/test_local.rt b/integration_tests/test_local.rt
deleted file mode 100644 (file)
index 6e61fef..0000000
+++ /dev/null
@@ -1,5 +0,0 @@
-newrt|start
-mse|20010|6660666|devarchwork:4560
-mse|20010|20001|devarchwork:4563
-rte|20011|devarchwork:4562
-newrt|end
index caf4338..e6f4801 100644 (file)
@@ -18,5 +18,19 @@ metadata:
 data:
   local.rt: |
     newrt|start
+    # we actaully use rts so i dont even think this is used
     rte|20011|a1rmrservice:4562
     newrt|end
+
+---
+
+apiVersion: v1
+kind: ConfigMap
+metadata:
+  name: queryreceiverconf
+data:
+  local.rt: |
+    newrt|start
+    # this query is initiated in the query receiver so this is certainly needed
+    rte|20012|a1rmrservice:4562
+    newrt|end
index 29be456..0a08fa3 100644 (file)
@@ -17,6 +17,22 @@ spec:
         app.kubernetes.io/instance: {{ .Release.Name }}
     spec:
       containers:
+        #query receiver
+        - name: queryreceiver
+          image: queryreceiver:latest
+          imagePullPolicy: Never
+          resources:
+            {{- toYaml .Values.resources | nindent 12 }}
+          volumeMounts:
+            - name: queryreceiverconf
+              mountPath: /opt/route/local.rt
+              subPath: local.rt
+          env:
+            # this sets the source field in messages from a1 to point back to a1s service name, rather than it's random pod name
+            - name: RMR_SRC_ID
+              value: {{ .Values.queryrmrservice.name }}
+
+        # test receiver
         - name: testreceiver
           image: testreceiver:latest
           imagePullPolicy: Never
@@ -26,6 +42,8 @@ spec:
             - name: testreceiverconf
               mountPath: /opt/route/local.rt
               subPath: local.rt
+
+        # test receiver that delays until sending
         - name: delayreceiver
           image: testreceiver:latest
           imagePullPolicy: Never
@@ -50,3 +68,6 @@ spec:
         - name: "delayreceiverconf"
           configMap:
             name: "delayreceiverconf"
+        - name: "queryreceiverconf"
+          configMap:
+            name: "queryreceiverconf"
index 1df7654..ecb90a0 100644 (file)
@@ -31,3 +31,21 @@ spec:
   selector:
     app.kubernetes.io/name: {{ include "testreceiver.name" . }}
     app.kubernetes.io/instance: {{ .Release.Name }}
+
+---
+
+apiVersion: v1
+kind: Service
+metadata:
+  name: {{ .Values.queryrmrservice.name }}
+  labels:
+{{ include "testreceiver.labels" . | indent 4 }}
+spec:
+  type: {{ .Values.queryrmrservice.type }}
+  ports:
+    - port: {{ .Values.queryrmrservice.port }}
+      targetPort: {{ .Values.queryrmrservice.port }}
+      protocol: TCP
+  selector:
+    app.kubernetes.io/name: {{ include "testreceiver.name" . }}
+    app.kubernetes.io/instance: {{ .Release.Name }}
index fcf6536..d53b7c7 100644 (file)
@@ -13,3 +13,8 @@ delayrmrservice:
   name: delayreceiverrmrservice
   type: ClusterIP
   port: 4563
+
+queryrmrservice:
+  name: queryreceiverrmrservice
+  type: ClusterIP
+  port: 4564
index e79b81c..0d9c301 100644 (file)
@@ -1,3 +1,3 @@
 # CI script installs RMR from PackageCloud using this version
 ---
-version: 1.8.1
+version: 1.10.2
index d20a50f..c2dc3f6 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -18,13 +18,13 @@ from setuptools import setup, find_packages
 
 setup(
     name="a1",
-    version="1.0.4",
+    version="2.0.0",
     packages=find_packages(exclude=["tests.*", "tests"]),
     author="Tommy Carpenter",
     description="RIC A1 Mediator for policy/intent changes",
     url="https://gerrit.o-ran-sc.org/r/admin/repos/ric-plt/a1",
     entry_points={"console_scripts": ["run.py=a1.run:main"]},
     # we require jsonschema, should be in that list, but connexion already requires a specific version of it
-    install_requires=["requests", "Flask", "connexion[swagger-ui]", "gevent", "msgpack", "rmr>=1.0.0", "mdclogpy"],
+    install_requires=["requests", "Flask", "connexion[swagger-ui]", "gevent", "msgpack", "rmr>=2.2.0", "mdclogpy"],
     package_data={"a1": ["openapi.yaml"]},
 )
index 971ce3f..971d547 100644 (file)
@@ -30,6 +30,7 @@ ADM_CTRL_POLICIES = "/a1-p/policytypes/{0}/policies".format(ADM_CRTL_TID)
 ADM_CTRL_INSTANCE = ADM_CTRL_POLICIES + "/" + ADM_CTRL_IID
 ADM_CTRL_INSTANCE_STATUS = ADM_CTRL_INSTANCE + "/status"
 ADM_CTRL_TYPE = "/a1-p/policytypes/{0}".format(ADM_CRTL_TID)
+ACK_MT = 20011
 
 
 def _fake_dequeue():
@@ -37,8 +38,8 @@ def _fake_dequeue():
     pay = json.dumps(
         {"policy_type_id": ADM_CRTL_TID, "policy_instance_id": ADM_CTRL_IID, "handler_id": RCV_ID, "status": "OK"}
     ).encode()
-    fake_msg = {"payload": pay}
-    return [fake_msg]
+    fake_msg = {"payload": pay, "message type": ACK_MT}
+    return [(fake_msg, None)]
 
 
 def _fake_dequeue_none():
@@ -49,31 +50,37 @@ def _fake_dequeue_none():
 def _fake_dequeue_deleted():
     """for monkeypatching  with a DELETED status"""
     new_msgs = []
+    good_pay = json.dumps(
+        {"policy_type_id": ADM_CRTL_TID, "policy_instance_id": ADM_CTRL_IID, "handler_id": RCV_ID, "status": "DELETED"}
+    ).encode()
 
-    # non existent type
+    # non existent type id
     pay = json.dumps(
         {"policy_type_id": 911, "policy_instance_id": ADM_CTRL_IID, "handler_id": RCV_ID, "status": "DELETED"}
     ).encode()
-    fake_msg = {"payload": pay}
-    new_msgs.append(fake_msg)
+    fake_msg = {"payload": pay, "message type": ACK_MT}
+    new_msgs.append((fake_msg, None))
 
+    # bad instance id
     pay = json.dumps(
         {"policy_type_id": ADM_CRTL_TID, "policy_instance_id": "darkness", "handler_id": RCV_ID, "status": "DELETED"}
     ).encode()
-    fake_msg = {"payload": pay}
-    new_msgs.append(fake_msg)
+    fake_msg = {"payload": pay, "message type": ACK_MT}
+    new_msgs.append((fake_msg, None))
+
+    # good body but bad message type
+    fake_msg = {"payload": good_pay, "message type": ACK_MT * 3}
+    new_msgs.append((fake_msg, None))
 
     # insert a bad one with a malformed body to make sure we keep going
-    new_msgs.append({"payload": "asdf"})
+    new_msgs.append(({"payload": "asdf", "message type": ACK_MT}, None))
 
     # not even a json
-    new_msgs.append("asdf")
+    new_msgs.append(("asdf", None))
 
-    pay = json.dumps(
-        {"policy_type_id": ADM_CRTL_TID, "policy_instance_id": ADM_CTRL_IID, "handler_id": RCV_ID, "status": "DELETED"}
-    ).encode()
-    fake_msg = {"payload": pay}
-    new_msgs.append(fake_msg)
+    # good
+    fake_msg = {"payload": good_pay, "message type": ACK_MT}
+    new_msgs.append((fake_msg, None))
 
     return new_msgs
 
@@ -83,27 +90,6 @@ def _test_put_patch(monkeypatch):
     # assert that rmr bad states don't cause problems
     monkeypatch.setattr("rmr.rmr.rmr_send_msg", rmr_mocks.send_mock_generator(10))
 
-    # we need this because free expects a real sbuf
-    # TODO: move this into rmr_mocks
-    def noop(_sbuf):
-        pass
-
-    monkeypatch.setattr("rmr.rmr.rmr_free_msg", noop)
-
-    # we need to repatch alloc (already patched in patch_rmr) to fix the transactionid, alloc is called in send and recieve
-    def fake_alloc(_unused1, _unused2, _unused3, _unused4, _unused5):
-        sbuf = rmr_mocks.Rmr_mbuf_t()
-        sbuf.contents.xaction = b"d49b53e478b711e9a1130242ac110002"
-        return sbuf
-
-    # we also need to repatch set, since in the send function, we alloc, then set a new transid
-    def fake_set_transactionid(sbuf):
-        sbuf.contents.xaction = b"d49b53e478b711e9a1130242ac110002"
-
-    # Note, we could have just patched summary, but this patches at a "lower level" so is a better test
-    monkeypatch.setattr("rmr.rmr.rmr_alloc_msg", fake_alloc)
-    monkeypatch.setattr("rmr.rmr.generate_and_set_transaction_id", fake_set_transactionid)
-
 
 def _no_ac(client):
     # no type there yet