From 9d5ad7161f64346c17e9c466bf1992e4257e9bde Mon Sep 17 00:00:00 2001 From: Tommy Carpenter Date: Mon, 2 Dec 2019 11:02:01 -0500 Subject: [PATCH] A1 2.0.0: * Implements new logic around when instances are deleted. See flowcharts in docs/. Basically timeouts now trigger to actually delete instances from a1s database, and these timeouts are configurable. * Eliminates the barrier to deleting an instance when no xapp evdr replied (via timeouts) * Add two new ENV variables that control timeouts * Make unit tests more modular so new workflows can be tested easily * Fixes the API for ../status to return a richer structure. This is an (albeit tiny) API change. * Clean up unused items in the integration tests helm chart * Removed "RMR_RCV_RETRY_INTERVAL" leftovers since this isn't used anymore * Uses the standard RIC logging library * Switch the backend routing scheme to using subscription id with constant message types, per request. * Given the above, policy type ids can be any valid 32bit greater than 0 * Decouple the API between northbound and A1 from A1 with xapps. This is now two seperate OpenAPI files * Update example for AC Xapp * Updgrade rmr and rmr-python to utilize new features; lots of cleanups because of that Change-Id: Ie7812607244cbcc484fe14c60fe27371e7e65082 Signed-off-by: Tommy Carpenter --- Dockerfile | 2 +- Dockerfile-Unit-Test | 4 +- a1/a1rmr.py | 125 +++++++++++++++------ a1/controller.py | 23 +--- a1/messages.py | 31 +++++ a1/openapi.yaml | 2 +- container-tag.yaml | 2 +- docs/developer-guide.rst | 91 +++------------ docs/release-notes.rst | 11 +- integration_tests/Dockerfile-query-receiver | 45 ++++++++ .../{Dockerfile => Dockerfile-test-delay-receiver} | 4 +- integration_tests/a1mediator/Chart.yaml | 2 +- integration_tests/getlogs.sh | 8 ++ integration_tests/install_deps.sh | 2 +- integration_tests/query_tester.py | 84 ++++++++++++++ integration_tests/test_a1.tavern.yaml | 105 ++++++++++++++++- integration_tests/test_local.rt | 5 - .../testreceiver/templates/config.yaml | 14 +++ .../testreceiver/templates/deployment.yaml | 21 ++++ .../testreceiver/templates/service.yaml | 18 +++ integration_tests/testreceiver/values.yaml | 5 + rmr-version.yaml | 2 +- setup.py | 4 +- tests/test_controller.py | 56 ++++----- 24 files changed, 473 insertions(+), 193 deletions(-) create mode 100644 a1/messages.py create mode 100644 integration_tests/Dockerfile-query-receiver rename integration_tests/{Dockerfile => Dockerfile-test-delay-receiver} (94%) create mode 100644 integration_tests/query_tester.py delete mode 100644 integration_tests/test_local.rt diff --git a/Dockerfile b/Dockerfile index c387a33..2faaca5 100644 --- a/Dockerfile +++ b/Dockerfile @@ -17,7 +17,7 @@ # install a well known working rmr FROM python:3.7-alpine RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git -RUN git clone --branch 1.10.1 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \ +RUN git clone --branch 1.10.2 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \ && cd rmr \ && mkdir build \ && cd build \ diff --git a/Dockerfile-Unit-Test b/Dockerfile-Unit-Test index b5d524c..dbb5401 100644 --- a/Dockerfile-Unit-Test +++ b/Dockerfile-Unit-Test @@ -17,7 +17,7 @@ # install a well known working rmr FROM python:3.7-alpine RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git -RUN git clone --branch 1.10.1 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \ +RUN git clone --branch 1.10.2 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \ && cd rmr \ && mkdir build \ && cd build \ @@ -44,4 +44,4 @@ COPY setup.py tox.ini /tmp/ WORKDIR /tmp # Run the unit tests -RUN tox -e py37, flake8 +RUN tox -e py37,flake8 diff --git a/a1/a1rmr.py b/a1/a1rmr.py index ae2bf00..2e5dace 100644 --- a/a1/a1rmr.py +++ b/a1/a1rmr.py @@ -24,7 +24,7 @@ import json from threading import Thread from rmr import rmr, helpers from mdclogpy import Logger -from a1 import data +from a1 import data, messages from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound mdc_logger = Logger(name=__name__) @@ -53,7 +53,7 @@ class _RmrLoop: self.keep_going = True self.rcv_func = None self.last_ran = time.time() - self.work_queue = queue.Queue() # thread safe queue https://docs.python.org/3/library/queue.html + self.instance_send_queue = queue.Queue() # thread safe queue https://docs.python.org/3/library/queue.html # intialize rmr context if init_func_override: @@ -68,15 +68,52 @@ class _RmrLoop: time.sleep(0.5) # set the receive function - # TODO: when policy query is implemented, add A1_POLICY_QUERY self.rcv_func = ( - rcv_func_override if rcv_func_override else lambda: helpers.rmr_rcvall_msgs(self.mrc, [A1_POLICY_RESPONSE]) + rcv_func_override + if rcv_func_override + else lambda: helpers.rmr_rcvall_msgs_raw(self.mrc, [A1_POLICY_RESPONSE, A1_POLICY_QUERY]) ) # start the work loop self.thread = Thread(target=self.loop) self.thread.start() + def _assert_good_send(self, sbuf, pre_send_summary): + """ + common helper function for _send_msg and _rts_msg + """ + post_send_summary = rmr.message_summary(sbuf) + if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK": + return True + mdc_logger.debug("Message NOT sent!") + mdc_logger.debug("Pre-send summary: {0}, Post-send summary: {1}".format(pre_send_summary, post_send_summary)) + return False + + def _send_msg(self, pay, mtype, subid): + """ + sends a msg + """ + for _ in range(0, RETRY_TIMES): + sbuf = rmr.rmr_alloc_msg(self.mrc, len(pay), payload=pay, gen_transaction_id=True, mtype=mtype, sub_id=subid) + sbuf.contents.sub_id = subid + pre_send_summary = rmr.message_summary(sbuf) + sbuf = rmr.rmr_send_msg(self.mrc, sbuf) # send + if self._assert_good_send(sbuf, pre_send_summary): + rmr.rmr_free_msg(sbuf) # free + break + + def _rts_msg(self, pay, sbuf_rts, mtype): + """ + sends a message using rts + we do not call free here because we may rts many times; it is called after the rts loop + """ + for _ in range(0, RETRY_TIMES): + pre_send_summary = rmr.message_summary(sbuf_rts) + sbuf_rts = rmr.rmr_rts_msg(self.mrc, sbuf_rts, payload=pay, mtype=mtype) + if self._assert_good_send(sbuf_rts, pre_send_summary): + break + return sbuf_rts # in some cases rts may return a new sbuf + def loop(self): """ This loop runs forever, and has 3 jobs: @@ -89,38 +126,52 @@ class _RmrLoop: while self.keep_going: # send out all messages waiting for us - while not self.work_queue.empty(): - work_item = self.work_queue.get(block=False, timeout=None) - - pay = work_item["payload"].encode("utf-8") - for _ in range(0, RETRY_TIMES): - # Waiting on an rmr bugfix regarding the over-allocation: https://rancodev.atlassian.net/browse/RICPLT-2490 - sbuf = rmr.rmr_alloc_msg(self.mrc, 4096, pay, True, A1_POLICY_REQUEST) - # TODO: after next rmr is released, this can be done in the alloc call. but that's not avail in pypi yet - sbuf.contents.sub_id = work_item["ptid"] - pre_send_summary = rmr.message_summary(sbuf) - sbuf = rmr.rmr_send_msg(self.mrc, sbuf) # send - post_send_summary = rmr.message_summary(sbuf) - mdc_logger.debug( - "Pre-send summary: {0}, Post-send summary: {1}".format(pre_send_summary, post_send_summary) - ) - rmr.rmr_free_msg(sbuf) # free - if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK": - mdc_logger.debug("Message sent successfully!") - break - - # read our mailbox and update statuses - for msg in self.rcv_func(): + while not self.instance_send_queue.empty(): + work_item = self.instance_send_queue.get(block=False, timeout=None) + payload = json.dumps(messages.a1_to_handler(*work_item)).encode("utf-8") + self._send_msg(payload, A1_POLICY_REQUEST, work_item[1]) + + # read our mailbox + for (msg, sbuf) in self.rcv_func(): + # TODO: in the future we may also have to catch SDL errors try: - pay = json.loads(msg["payload"]) - pti = pay["policy_type_id"] - pii = pay["policy_instance_id"] - data.set_policy_instance_status(pti, pii, pay["handler_id"], pay["status"]) - except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError, TypeError, json.decoder.JSONDecodeError): - # TODO: in the future we may also have to catch SDL errors - mdc_logger.debug("Dropping malformed or non applicable message: {0}".format(msg)) - - # TODO: what's a reasonable sleep time? we don't want to hammer redis too much, and a1 isn't a real time component + mtype = msg["message type"] + except (KeyError, TypeError, json.decoder.JSONDecodeError): + mdc_logger.debug("Dropping malformed policy ack/query message: {0}".format(msg)) + + if mtype == A1_POLICY_RESPONSE: + try: + # got a policy response, update status + pay = json.loads(msg["payload"]) + data.set_policy_instance_status( + pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"] + ) + mdc_logger.debug("Successfully received status update: {0}".format(pay)) + except (PolicyTypeNotFound, PolicyInstanceNotFound): + mdc_logger.debug("Received a response for a non-existent instance") + except (KeyError, TypeError, json.decoder.JSONDecodeError): + mdc_logger.debug("Dropping malformed policy ack message: {0}".format(msg)) + + elif mtype == A1_POLICY_QUERY: + try: + # got a query, do a lookup and send out all instances + pti = json.loads(msg["payload"])["policy_type_id"] + mdc_logger.debug("Received query for: {0}".format(pti)) + for pii in data.get_instance_list(pti): + instance = data.get_policy_instance(pti, pii) + payload = json.dumps(messages.a1_to_handler("CREATE", pti, pii, instance)).encode("utf-8") + sbuf = self._rts_msg(payload, sbuf, A1_POLICY_REQUEST) + except (PolicyTypeNotFound, PolicyInstanceNotFound): + mdc_logger.debug("Received a query for a non-existent type: {0}".format(msg)) + except (KeyError, TypeError, json.decoder.JSONDecodeError): + mdc_logger.debug("Dropping malformed policy query message: {0}".format(msg)) + + else: + mdc_logger.debug("Received message type {0} but A1 does not handle this".format(mtype)) + + # we must free each sbuf + rmr.rmr_free_msg(sbuf) + self.last_ran = time.time() time.sleep(1) @@ -144,12 +195,12 @@ def stop_rmr_thread(): __RMR_LOOP__.keep_going = False -def queue_work(item): +def queue_instance_send(item): """ push an item into the work queue currently the only type of work is to send out messages """ - __RMR_LOOP__.work_queue.put(item) + __RMR_LOOP__.instance_send_queue.put(item) def healthcheck_rmr_thread(seconds=30): diff --git a/a1/controller.py b/a1/controller.py index 1022320..cde7483 100644 --- a/a1/controller.py +++ b/a1/controller.py @@ -17,7 +17,6 @@ Main a1 controller # See the License for the specific language governing permissions and # limitations under the License. # ================================================================================== -import json from flask import Response from jsonschema import validate from jsonschema.exceptions import ValidationError @@ -45,18 +44,6 @@ def _try_func_return(func): return Response(status=500) -def _gen_body_to_handler(operation, policy_type_id, policy_instance_id, payload=None): - """ - used to create the payloads that get sent to downstream policy handlers - """ - return { - "operation": operation, - "policy_type_id": policy_type_id, - "policy_instance_id": policy_instance_id, - "payload": payload, - } - - # Healthcheck @@ -164,9 +151,8 @@ def create_or_replace_policy_instance(policy_type_id, policy_instance_id): # store the instance data.store_policy_instance(policy_type_id, policy_instance_id, instance) - # send rmr (best effort) - body = _gen_body_to_handler("CREATE", policy_type_id, policy_instance_id, payload=instance) - a1rmr.queue_work({"payload": json.dumps(body), "ptid": policy_type_id}) + # queue rmr send (best effort) + a1rmr.queue_instance_send(("CREATE", policy_type_id, policy_instance_id, instance)) return "", 202 @@ -185,9 +171,8 @@ def delete_policy_instance(policy_type_id, policy_instance_id): """ data.delete_policy_instance(policy_type_id, policy_instance_id) - # send rmr (best effort) - body = _gen_body_to_handler("DELETE", policy_type_id, policy_instance_id) - a1rmr.queue_work({"payload": json.dumps(body), "ptid": policy_type_id}) + # queue rmr send (best effort) + a1rmr.queue_instance_send(("DELETE", policy_type_id, policy_instance_id, "")) return "", 202 diff --git a/a1/messages.py b/a1/messages.py new file mode 100644 index 0000000..cbcea08 --- /dev/null +++ b/a1/messages.py @@ -0,0 +1,31 @@ +""" +rmr messages +""" +# ================================================================================== +# Copyright (c) 2019 Nokia +# Copyright (c) 2018-2019 AT&T Intellectual Property. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ================================================================================== + + +def a1_to_handler(operation, policy_type_id, policy_instance_id, payload=None): + """ + used to create the payloads that get sent to downstream policy handlers + """ + return { + "operation": operation, + "policy_type_id": policy_type_id, + "policy_instance_id": policy_instance_id, + "payload": payload, + } diff --git a/a1/openapi.yaml b/a1/openapi.yaml index c8c0bd9..f35a742 100644 --- a/a1/openapi.yaml +++ b/a1/openapi.yaml @@ -16,7 +16,7 @@ # ================================================================================== openapi: 3.0.0 info: - version: 1.1.0 + version: 2.0.0 title: RIC A1 paths: '/a1-p/healthcheck': diff --git a/container-tag.yaml b/container-tag.yaml index 2bb40a9..34c7e0c 100644 --- a/container-tag.yaml +++ b/container-tag.yaml @@ -1,4 +1,4 @@ # The Jenkins job uses this string for the tag in the image name # for example nexus3.o-ran-sc.org:10004/my-image-name:my-tag --- -tag: 1.0.4 +tag: 2.0.0 diff --git a/docs/developer-guide.rst b/docs/developer-guide.rst index 38722da..e3e47c5 100644 --- a/docs/developer-guide.rst +++ b/docs/developer-guide.rst @@ -42,17 +42,22 @@ rmr is a critical dependency of A1. Bumping the rmr version dependency requires 2) ``Dockerfile-Unit-Test`` -3) ``integration_tests/Dockerfile`` +3) ``integration_tests/Dockerfile-test-delay-receiver`` + +4) ``integration_tests/Dockerfile-query-receiver`` + +5) ``rmr-version.yaml`` rmr-python is the python binding to rmr . Installing rmr per the above does not install it. Bumping the rmr python version dependency requires changes in: 1) ``setup.py`` -2) ``integration_tests/Dockerfile`` +2) ``integration_tests/Dockerfile-test-delay-receiver`` -Run the integration tests after attempting this. +3) ``integration_tests/Dockerfile-query-receiver`` +Run the integration tests after attempting this. Unit Testing ------------ @@ -72,94 +77,24 @@ Alternatively, you can run the unit tests in Docker (this is somewhat less nice Integration testing ------------------- -This tests A1’s external API with two test receivers. This depends on helm+k8s, meaning you cannot run this if this is not installed. +This tests A1’s external API with three test receivers. This depends on helm+k8s. -Unlike the unit tests, however, this does not require rmr to be installed on the base system, as everything -runs in Docker, and the Dockerfiles provide/install rmr. +Build all the containers: -First, build the latest A1 you are testing (from the root): :: - docker build --no-cache -t a1:latest . + docker build -t a1:latest .; cd integration_tests/; docker build -t testreceiver:latest . -f Dockerfile-test-delay-receiver; docker build -t queryreceiver:latest . -f Dockerfile-query-receiver; cd .. -Note that this step also runs the unit tests, since running the unit tests are part of the Dockerfile for A1. -If you've never run the integration tests before, build the test receiver, which is referenced in the helm chart: -:: - - cd integration_tests - docker build --no-cache -t testreceiver:latest . +Then, run all the tests from the root (this requires the python packages ``tox``, ``pytest``, and ``tavern``). -Finally, run all the tests from the root (this requires the python packages ``tox``, ``pytest``, and ``tavern``). :: tox -c tox-integration.ini This script: -1. Deploys 3 helm charts into a local kubernetes installation +1. Deploys 2 helm charts (4 containers) into a local kubernetes installation 2. Port forwards a pod ClusterIP to localhost 3. Uses “tavern” to run some tests against the server 4. Barrages the server with apache bench 5. Tears everything down - -Unless you're a core A1 developer, you should probably stop here. The below instructions -are for running A1 locally, without docker, and is much more involved (however useful when developing a1). - -Running locally ---------------- - -1. Before this will work, for the first time on that machine, run ``./install_deps.sh`` - -2. It also requires rmr-python installed. (The dockerfile does this) - -3. Create a ``local.rt`` file and copy it into ``/opt/route/local.rt``. - Note, the example one in ``integration_tests`` will need to be modified for - your scenario and machine. - -4. Copy a ric manifest into ``/opt/ricmanifest.json`` and an rmr mapping - table into ``/opt/rmr_string_int_mapping.txt``. You can use the test - ones packaged if you want: - - :: - - cp tests/fixtures/ricmanifest.json /opt/ricmanifest.json - cp tests/fixtures/rmr_string_int_mapping.txt /opt/rmr_string_int_mapping.txt - -5. Then: - - :: - - sudo pip install -e . - set -x LD_LIBRARY_PATH /usr/local/lib/; set -x RMR_SEED_RT /opt/route/local.rt ; set -x RMR_RCV_RETRY_INTERVAL 500; set -x RMR_RETRY_TIMES 10; - /usr/bin/run.py - - -There are also two test receivers in ``integration_tests`` you can run locally. -The first is meant to be used with the ``control_admission`` policy -(that comes in test fixture ric manifest): - -:: - - set -x LD_LIBRARY_PATH /usr/local/lib/; set -x RMR_SEED_RT /opt/route/local.rt ; python receiver.py - -The second can be used against the ``test_policy`` policy to test the -async nature of A1, and to test race conditions. You can start it with -several env variables as follows: - -:: - - set -x LD_LIBRARY_PATH /usr/local/lib/; set -x RMR_SEED_RT /opt/route/local.rt ; set -x TEST_RCV_PORT 4563; set -x TEST_RCV_RETURN_MINT 10001; set -x TEST_RCV_SEC_DELAY 5; set -x TEST_RCV_RETURN_PAYLOAD '{"ACK_FROM": "DELAYED_TEST", "status": "SUCCESS"}' ; python receiver.py - -To test the async nature of A1, trigger a call to ``test_policy``, which -will target the delayed receiver, then immediately call -``control_admission``. The ``control_admission`` policy return should be -returned immediately, whereas the ``test_policy`` should return after -about ``TEST_RCV_SEC_DELAY 5``. The ``test_policy`` should not block A1 -while it is sleeping, and both responses should be correct. - -:: - - curl -v -X PUT -H "Content-Type: application/json" -d '{}' localhost:10000/ric/policies/test_policy - curl -v -X PUT -H "Content-Type: application/json" -d '{ "enforce":true, "window_length":10, "blocking_rate":20, "trigger_threshold":10 }' localhost:10000/ric/policies/admission_control_policy - curl -v localhost:10000/ric/policies/admission_control_policy - curl -v localhost:10000/a1-p/healthcheck diff --git a/docs/release-notes.rst b/docs/release-notes.rst index 485f5a8..002be1e 100644 --- a/docs/release-notes.rst +++ b/docs/release-notes.rst @@ -14,16 +14,16 @@ and this project adheres to `Semantic Versioning `__. :depth: 3 :local: -[1.x.x] - TBD +[x.x.x] - TBD ------------- :: - * Represents a resillent version of 1.0.0 that uses Redis for persistence + * Represents a resillent version of x.x.x that uses Redis for persistence -[x.x.x] - TBD -------------- +[2.0.0] - 12/9/2019 +------------------- :: @@ -31,7 +31,7 @@ and this project adheres to `Semantic Versioning `__. * Eliminates the barrier to deleting an instance when no xapp evdr replied (via timeouts) * Add two new ENV variables that control timeouts * Make unit tests more modular so new workflows can be tested easily - * Changes the API for ../status to return a richer structure + * Fixes the API for ../status to return a richer structure. This is an (albeit tiny) API change. * Clean up unused items in the integration tests helm chart * Removed "RMR_RCV_RETRY_INTERVAL" leftovers since this isn't used anymore * Uses the standard RIC logging library @@ -39,6 +39,7 @@ and this project adheres to `Semantic Versioning `__. * Given the above, policy type ids can be any valid 32bit greater than 0 * Decouple the API between northbound and A1 from A1 with xapps. This is now two seperate OpenAPI files * Update example for AC Xapp + * Updgrade rmr and rmr-python to utilize new features; lots of cleanups because of that [1.0.4] ------- diff --git a/integration_tests/Dockerfile-query-receiver b/integration_tests/Dockerfile-query-receiver new file mode 100644 index 0000000..3c5ac28 --- /dev/null +++ b/integration_tests/Dockerfile-query-receiver @@ -0,0 +1,45 @@ +# ================================================================================== +# Copyright (c) 2019 Nokia +# Copyright (c) 2018-2019 AT&T Intellectual Property. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ================================================================================== +# install a well known working rmr +FROM python:3.7-alpine +RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git +RUN git clone --branch 1.10.2 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \ + && cd rmr \ + && mkdir build \ + && cd build \ + && cmake .. -DPACK_EXTERNALS=1 \ + && make install + +# stage2 +FROM python:3.7-alpine + +# copies +COPY --from=0 /usr/local/lib64/libnng.so /usr/local/lib64/libnng.so +COPY --from=0 /usr/local/lib64/librmr_nng.so /usr/local/lib64/librmr_nng.so +COPY query_tester.py / + +# Install RMr python bindings +RUN pip install --upgrade pip +RUN pip install rmr==2.2.0 + +# rmr setups +RUN mkdir -p /opt/route/ +ENV LD_LIBRARY_PATH /usr/local/lib:/usr/local/lib64 +ENV RMR_SEED_RT /opt/route/local.rt + +WORKDIR / +CMD ["python","-u","query_tester.py"] diff --git a/integration_tests/Dockerfile b/integration_tests/Dockerfile-test-delay-receiver similarity index 94% rename from integration_tests/Dockerfile rename to integration_tests/Dockerfile-test-delay-receiver index 87a8dce..4d09d1b 100644 --- a/integration_tests/Dockerfile +++ b/integration_tests/Dockerfile-test-delay-receiver @@ -17,7 +17,7 @@ # install a well known working rmr FROM python:3.7-alpine RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git -RUN git clone --branch 1.10.1 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \ +RUN git clone --branch 1.10.2 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \ && cd rmr \ && mkdir build \ && cd build \ @@ -34,7 +34,7 @@ COPY receiver.py / # Install RMr python bindings RUN pip install --upgrade pip -RUN pip install rmr==1.0.0 +RUN pip install rmr==2.2.0 # rmr setups RUN mkdir -p /opt/route/ diff --git a/integration_tests/a1mediator/Chart.yaml b/integration_tests/a1mediator/Chart.yaml index e59dc91..28d5951 100644 --- a/integration_tests/a1mediator/Chart.yaml +++ b/integration_tests/a1mediator/Chart.yaml @@ -1,4 +1,4 @@ apiVersion: v1 description: A1 Helm chart for Kubernetes name: a1mediator -version: 1.0.4 +version: 2.0.0 diff --git a/integration_tests/getlogs.sh b/integration_tests/getlogs.sh index 668ab2b..b611b85 100755 --- a/integration_tests/getlogs.sh +++ b/integration_tests/getlogs.sh @@ -1,4 +1,12 @@ #!/bin/sh +echo "\n\n a1" >> log.txt kubectl get pods --namespace=default | awk '{ print $1 }' | egrep '^a1-a1mediator-' | xargs kubectl logs > log.txt 2>&1 + +echo "\n\n test receiver" >> log.txt kubectl get pods --namespace=default | awk '{ print $1 }' | egrep '^testreceiver-' | xargs -I X kubectl logs X testreceiver >> log.txt 2>&1 + +echo "\n\n delay" >> log.txt kubectl get pods --namespace=default | awk '{ print $1 }' | egrep '^testreceiver-' | xargs -I X kubectl logs X delayreceiver >> log.txt 2>&1 + +echo "\n\n query" >> log.txt +kubectl get pods --namespace=default | awk '{ print $1 }' | egrep '^testreceiver-' | xargs -I X kubectl logs X queryreceiver >> log.txt 2>&1 diff --git a/integration_tests/install_deps.sh b/integration_tests/install_deps.sh index 01c45b4..4a17c5d 100755 --- a/integration_tests/install_deps.sh +++ b/integration_tests/install_deps.sh @@ -1,5 +1,5 @@ #!/bin/sh -git clone --branch 1.9.0 https://gerrit.oran-osc.org/r/ric-plt/lib/rmr \ +git clone --branch 1.10.2 https://gerrit.oran-osc.org/r/ric-plt/lib/rmr \ && cd rmr \ && mkdir .build; cd .build; cmake .. -DPACK_EXTERNALS=1; sudo make install \ && cd ../.. \ diff --git a/integration_tests/query_tester.py b/integration_tests/query_tester.py new file mode 100644 index 0000000..d8cf3ba --- /dev/null +++ b/integration_tests/query_tester.py @@ -0,0 +1,84 @@ +# ================================================================================== +# Copyright (c) 2019 Nokia +# Copyright (c) 2018-2019 AT&T Intellectual Property. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ================================================================================== +""" +Test receiver +""" + +import time +import json +from rmr import rmr + +PORT = "4564" + +mrc = rmr.rmr_init(PORT.encode("utf-8"), rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL) +test_type = 1006001 + +while rmr.rmr_ready(mrc) == 0: + time.sleep(1) + print("not yet ready") + +print("listening ON {}".format(PORT)) + +# loop +while True: + + # do query + pay = {"policy_type_id": test_type} + sbuf_send = rmr.rmr_alloc_msg(mrc, 4096, payload=json.dumps(pay).encode("utf-8"), gen_transaction_id=True, mtype=20012) + sbuf_send = rmr.rmr_send_msg(mrc, sbuf_send) + post_send_summary = rmr.message_summary(sbuf_send) + + if not (post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK"): + print("was unable to send query to a1!") + time.sleep(1) + else: + # query worked, wait 2 seconds, then receive everything we have + time.sleep(1) + print("reading messages") + + # this is a hacked up version of rmr_rcvall_msgs in the rmr package + # we need the actual messages, not the summaries, to use rts + sbuf_rcv = rmr.rmr_alloc_msg(mrc, 4096) # allocate buffer to have something for a return status + while True: + sbuf_rcv = rmr.rmr_torcv_msg(mrc, sbuf_rcv, 0) # set the timeout to 0 so this doesn't block!! + + summary = rmr.message_summary(sbuf_rcv) + if summary["message status"] != "RMR_OK": # ok indicates msg received, stop on all other states + print("no more instances received. will try again in 1s") + break + + print("Received: {0}".format(summary)) + + received_payload = json.loads(summary["payload"]) + assert received_payload["policy_type_id"] == test_type + assert summary["message type"] == 20010 + + payload = { + "policy_type_id": received_payload["policy_type_id"], + "policy_instance_id": received_payload["policy_instance_id"], + "handler_id": "query_tester", + "status": "OK", + } + val = json.dumps(payload).encode("utf-8") + rmr.set_payload_and_length(val, sbuf_rcv) # TODO: extend rmr-python to allow rts to accept this param + sbuf_rcv.contents.mtype = 20011 # TODO: extend rmr-python to allow rts to accept this param + print("Pre reply summary: {}".format(rmr.message_summary(sbuf_rcv))) + + # send ack + sbuf_rcv = rmr.rmr_rts_msg(mrc, sbuf_rcv) + post_reply_summary = rmr.message_summary(sbuf_rcv) + print("Post reply summary: {}".format(post_reply_summary)) diff --git a/integration_tests/test_a1.tavern.yaml b/integration_tests/test_a1.tavern.yaml index 4c1dea6..a025643 100644 --- a/integration_tests/test_a1.tavern.yaml +++ b/integration_tests/test_a1.tavern.yaml @@ -464,6 +464,109 @@ stages: --- +test_name: test query + +stages: + - name: type not there yet + request: + url: http://localhost:10000/a1-p/policytypes/1006001 + method: GET + response: + status_code: 404 + + - name: put the type + request: + url: http://localhost:10000/a1-p/policytypes/1006001 + method: PUT + json: + name: query test + description: test + policy_type_id: 1006001 + create_schema: + "$schema": http://json-schema.org/draft-07/schema# + type: object + additionalProperties: false + properties: + foo: + type: string + required: + - foo + response: + status_code: 201 + + - name: type there now + request: + url: http://localhost:10000/a1-p/policytypes/1006001 + method: GET + response: + status_code: 200 + + - name: instance list 200 but empty + request: + url: http://localhost:10000/a1-p/policytypes/1006001/policies + method: GET + response: + status_code: 200 + body: [] + + - name: instance 1 + request: + url: http://localhost:10000/a1-p/policytypes/1006001/policies/qt1 + method: PUT + json: + foo: "bar1" + headers: + content-type: application/json + response: + status_code: 202 + + - name: instance 2 + request: + url: http://localhost:10000/a1-p/policytypes/1006001/policies/qt2 + method: PUT + json: + foo: "bar2" + headers: + content-type: application/json + response: + status_code: 202 + + - name: instance list + request: + url: http://localhost:10000/a1-p/policytypes/1006001/policies + method: GET + response: + status_code: 200 + body: [qt1, qt2] + + # after the query, a1 should send, query receiver should send back, and the policy should be in effect + + - name: test the query status get + max_retries: 3 + delay_before: 6 # give it a few seconds for rmr ; delay reciever sleeps for 5 seconds by default + request: + url: http://localhost:10000/a1-p/policytypes/1006001/policies/qt1/status + method: GET + response: + status_code: 200 + body: + instance_status: "IN EFFECT" + has_been_deleted: False + + - name: test the query status get 2 + max_retries: 3 + delay_before: 6 # give it a few seconds for rmr ; delay reciever sleeps for 5 seconds by default + request: + url: http://localhost:10000/a1-p/policytypes/1006001/policies/qt2/status + method: GET + response: + status_code: 200 + body: + instance_status: "IN EFFECT" + has_been_deleted: False + +--- + test_name: test bad routing file endpoint stages: @@ -554,5 +657,3 @@ stages: type: object response: status_code: 400 - - diff --git a/integration_tests/test_local.rt b/integration_tests/test_local.rt deleted file mode 100644 index 6e61fef..0000000 --- a/integration_tests/test_local.rt +++ /dev/null @@ -1,5 +0,0 @@ -newrt|start -mse|20010|6660666|devarchwork:4560 -mse|20010|20001|devarchwork:4563 -rte|20011|devarchwork:4562 -newrt|end diff --git a/integration_tests/testreceiver/templates/config.yaml b/integration_tests/testreceiver/templates/config.yaml index caf4338..e6f4801 100644 --- a/integration_tests/testreceiver/templates/config.yaml +++ b/integration_tests/testreceiver/templates/config.yaml @@ -18,5 +18,19 @@ metadata: data: local.rt: | newrt|start + # we actaully use rts so i dont even think this is used rte|20011|a1rmrservice:4562 newrt|end + +--- + +apiVersion: v1 +kind: ConfigMap +metadata: + name: queryreceiverconf +data: + local.rt: | + newrt|start + # this query is initiated in the query receiver so this is certainly needed + rte|20012|a1rmrservice:4562 + newrt|end diff --git a/integration_tests/testreceiver/templates/deployment.yaml b/integration_tests/testreceiver/templates/deployment.yaml index 29be456..0a08fa3 100644 --- a/integration_tests/testreceiver/templates/deployment.yaml +++ b/integration_tests/testreceiver/templates/deployment.yaml @@ -17,6 +17,22 @@ spec: app.kubernetes.io/instance: {{ .Release.Name }} spec: containers: + #query receiver + - name: queryreceiver + image: queryreceiver:latest + imagePullPolicy: Never + resources: + {{- toYaml .Values.resources | nindent 12 }} + volumeMounts: + - name: queryreceiverconf + mountPath: /opt/route/local.rt + subPath: local.rt + env: + # this sets the source field in messages from a1 to point back to a1s service name, rather than it's random pod name + - name: RMR_SRC_ID + value: {{ .Values.queryrmrservice.name }} + + # test receiver - name: testreceiver image: testreceiver:latest imagePullPolicy: Never @@ -26,6 +42,8 @@ spec: - name: testreceiverconf mountPath: /opt/route/local.rt subPath: local.rt + + # test receiver that delays until sending - name: delayreceiver image: testreceiver:latest imagePullPolicy: Never @@ -50,3 +68,6 @@ spec: - name: "delayreceiverconf" configMap: name: "delayreceiverconf" + - name: "queryreceiverconf" + configMap: + name: "queryreceiverconf" diff --git a/integration_tests/testreceiver/templates/service.yaml b/integration_tests/testreceiver/templates/service.yaml index 1df7654..ecb90a0 100644 --- a/integration_tests/testreceiver/templates/service.yaml +++ b/integration_tests/testreceiver/templates/service.yaml @@ -31,3 +31,21 @@ spec: selector: app.kubernetes.io/name: {{ include "testreceiver.name" . }} app.kubernetes.io/instance: {{ .Release.Name }} + +--- + +apiVersion: v1 +kind: Service +metadata: + name: {{ .Values.queryrmrservice.name }} + labels: +{{ include "testreceiver.labels" . | indent 4 }} +spec: + type: {{ .Values.queryrmrservice.type }} + ports: + - port: {{ .Values.queryrmrservice.port }} + targetPort: {{ .Values.queryrmrservice.port }} + protocol: TCP + selector: + app.kubernetes.io/name: {{ include "testreceiver.name" . }} + app.kubernetes.io/instance: {{ .Release.Name }} diff --git a/integration_tests/testreceiver/values.yaml b/integration_tests/testreceiver/values.yaml index fcf6536..d53b7c7 100644 --- a/integration_tests/testreceiver/values.yaml +++ b/integration_tests/testreceiver/values.yaml @@ -13,3 +13,8 @@ delayrmrservice: name: delayreceiverrmrservice type: ClusterIP port: 4563 + +queryrmrservice: + name: queryreceiverrmrservice + type: ClusterIP + port: 4564 diff --git a/rmr-version.yaml b/rmr-version.yaml index e79b81c..0d9c301 100644 --- a/rmr-version.yaml +++ b/rmr-version.yaml @@ -1,3 +1,3 @@ # CI script installs RMR from PackageCloud using this version --- -version: 1.8.1 +version: 1.10.2 diff --git a/setup.py b/setup.py index d20a50f..c2dc3f6 100644 --- a/setup.py +++ b/setup.py @@ -18,13 +18,13 @@ from setuptools import setup, find_packages setup( name="a1", - version="1.0.4", + version="2.0.0", packages=find_packages(exclude=["tests.*", "tests"]), author="Tommy Carpenter", description="RIC A1 Mediator for policy/intent changes", url="https://gerrit.o-ran-sc.org/r/admin/repos/ric-plt/a1", entry_points={"console_scripts": ["run.py=a1.run:main"]}, # we require jsonschema, should be in that list, but connexion already requires a specific version of it - install_requires=["requests", "Flask", "connexion[swagger-ui]", "gevent", "msgpack", "rmr>=1.0.0", "mdclogpy"], + install_requires=["requests", "Flask", "connexion[swagger-ui]", "gevent", "msgpack", "rmr>=2.2.0", "mdclogpy"], package_data={"a1": ["openapi.yaml"]}, ) diff --git a/tests/test_controller.py b/tests/test_controller.py index 971ce3f..971d547 100644 --- a/tests/test_controller.py +++ b/tests/test_controller.py @@ -30,6 +30,7 @@ ADM_CTRL_POLICIES = "/a1-p/policytypes/{0}/policies".format(ADM_CRTL_TID) ADM_CTRL_INSTANCE = ADM_CTRL_POLICIES + "/" + ADM_CTRL_IID ADM_CTRL_INSTANCE_STATUS = ADM_CTRL_INSTANCE + "/status" ADM_CTRL_TYPE = "/a1-p/policytypes/{0}".format(ADM_CRTL_TID) +ACK_MT = 20011 def _fake_dequeue(): @@ -37,8 +38,8 @@ def _fake_dequeue(): pay = json.dumps( {"policy_type_id": ADM_CRTL_TID, "policy_instance_id": ADM_CTRL_IID, "handler_id": RCV_ID, "status": "OK"} ).encode() - fake_msg = {"payload": pay} - return [fake_msg] + fake_msg = {"payload": pay, "message type": ACK_MT} + return [(fake_msg, None)] def _fake_dequeue_none(): @@ -49,31 +50,37 @@ def _fake_dequeue_none(): def _fake_dequeue_deleted(): """for monkeypatching with a DELETED status""" new_msgs = [] + good_pay = json.dumps( + {"policy_type_id": ADM_CRTL_TID, "policy_instance_id": ADM_CTRL_IID, "handler_id": RCV_ID, "status": "DELETED"} + ).encode() - # non existent type + # non existent type id pay = json.dumps( {"policy_type_id": 911, "policy_instance_id": ADM_CTRL_IID, "handler_id": RCV_ID, "status": "DELETED"} ).encode() - fake_msg = {"payload": pay} - new_msgs.append(fake_msg) + fake_msg = {"payload": pay, "message type": ACK_MT} + new_msgs.append((fake_msg, None)) + # bad instance id pay = json.dumps( {"policy_type_id": ADM_CRTL_TID, "policy_instance_id": "darkness", "handler_id": RCV_ID, "status": "DELETED"} ).encode() - fake_msg = {"payload": pay} - new_msgs.append(fake_msg) + fake_msg = {"payload": pay, "message type": ACK_MT} + new_msgs.append((fake_msg, None)) + + # good body but bad message type + fake_msg = {"payload": good_pay, "message type": ACK_MT * 3} + new_msgs.append((fake_msg, None)) # insert a bad one with a malformed body to make sure we keep going - new_msgs.append({"payload": "asdf"}) + new_msgs.append(({"payload": "asdf", "message type": ACK_MT}, None)) # not even a json - new_msgs.append("asdf") + new_msgs.append(("asdf", None)) - pay = json.dumps( - {"policy_type_id": ADM_CRTL_TID, "policy_instance_id": ADM_CTRL_IID, "handler_id": RCV_ID, "status": "DELETED"} - ).encode() - fake_msg = {"payload": pay} - new_msgs.append(fake_msg) + # good + fake_msg = {"payload": good_pay, "message type": ACK_MT} + new_msgs.append((fake_msg, None)) return new_msgs @@ -83,27 +90,6 @@ def _test_put_patch(monkeypatch): # assert that rmr bad states don't cause problems monkeypatch.setattr("rmr.rmr.rmr_send_msg", rmr_mocks.send_mock_generator(10)) - # we need this because free expects a real sbuf - # TODO: move this into rmr_mocks - def noop(_sbuf): - pass - - monkeypatch.setattr("rmr.rmr.rmr_free_msg", noop) - - # we need to repatch alloc (already patched in patch_rmr) to fix the transactionid, alloc is called in send and recieve - def fake_alloc(_unused1, _unused2, _unused3, _unused4, _unused5): - sbuf = rmr_mocks.Rmr_mbuf_t() - sbuf.contents.xaction = b"d49b53e478b711e9a1130242ac110002" - return sbuf - - # we also need to repatch set, since in the send function, we alloc, then set a new transid - def fake_set_transactionid(sbuf): - sbuf.contents.xaction = b"d49b53e478b711e9a1130242ac110002" - - # Note, we could have just patched summary, but this patches at a "lower level" so is a better test - monkeypatch.setattr("rmr.rmr.rmr_alloc_msg", fake_alloc) - monkeypatch.setattr("rmr.rmr.generate_and_set_transaction_id", fake_set_transactionid) - def _no_ac(client): # no type there yet -- 2.16.6