* Implements new logic around when instances are deleted. See flowcharts in docs/. Basically timeouts now trigger to actually delete instances from a1s database, and these timeouts are configurable.
* Eliminates the barrier to deleting an instance when no xapp evdr replied (via timeouts)
* Add two new ENV variables that control timeouts
* Make unit tests more modular so new workflows can be tested easily
* Fixes the API for ../status to return a richer structure. This is an (albeit tiny) API change.
* Clean up unused items in the integration tests helm chart
* Removed "RMR_RCV_RETRY_INTERVAL" leftovers since this isn't used anymore
* Uses the standard RIC logging library
* Switch the backend routing scheme to using subscription id with constant message types, per request.
* Given the above, policy type ids can be any valid 32bit greater than 0
* Decouple the API between northbound and A1 from A1 with xapps. This is now two seperate OpenAPI files
* Update example for AC Xapp
* Updgrade rmr and rmr-python to utilize new features; lots of cleanups because of that
Change-Id: Ie7812607244cbcc484fe14c60fe27371e7e65082
Signed-off-by: Tommy Carpenter <tc677g@att.com>
# install a well known working rmr
FROM python:3.7-alpine
RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git
-RUN git clone --branch 1.10.1 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
+RUN git clone --branch 1.10.2 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
&& cd rmr \
&& mkdir build \
&& cd build \
# install a well known working rmr
FROM python:3.7-alpine
RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git
-RUN git clone --branch 1.10.1 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
+RUN git clone --branch 1.10.2 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
&& cd rmr \
&& mkdir build \
&& cd build \
WORKDIR /tmp
# Run the unit tests
-RUN tox -e py37, flake8
+RUN tox -e py37,flake8
from threading import Thread
from rmr import rmr, helpers
from mdclogpy import Logger
-from a1 import data
+from a1 import data, messages
from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
mdc_logger = Logger(name=__name__)
self.keep_going = True
self.rcv_func = None
self.last_ran = time.time()
- self.work_queue = queue.Queue() # thread safe queue https://docs.python.org/3/library/queue.html
+ self.instance_send_queue = queue.Queue() # thread safe queue https://docs.python.org/3/library/queue.html
# intialize rmr context
if init_func_override:
time.sleep(0.5)
# set the receive function
- # TODO: when policy query is implemented, add A1_POLICY_QUERY
self.rcv_func = (
- rcv_func_override if rcv_func_override else lambda: helpers.rmr_rcvall_msgs(self.mrc, [A1_POLICY_RESPONSE])
+ rcv_func_override
+ if rcv_func_override
+ else lambda: helpers.rmr_rcvall_msgs_raw(self.mrc, [A1_POLICY_RESPONSE, A1_POLICY_QUERY])
)
# start the work loop
self.thread = Thread(target=self.loop)
self.thread.start()
+ def _assert_good_send(self, sbuf, pre_send_summary):
+ """
+ common helper function for _send_msg and _rts_msg
+ """
+ post_send_summary = rmr.message_summary(sbuf)
+ if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
+ return True
+ mdc_logger.debug("Message NOT sent!")
+ mdc_logger.debug("Pre-send summary: {0}, Post-send summary: {1}".format(pre_send_summary, post_send_summary))
+ return False
+
+ def _send_msg(self, pay, mtype, subid):
+ """
+ sends a msg
+ """
+ for _ in range(0, RETRY_TIMES):
+ sbuf = rmr.rmr_alloc_msg(self.mrc, len(pay), payload=pay, gen_transaction_id=True, mtype=mtype, sub_id=subid)
+ sbuf.contents.sub_id = subid
+ pre_send_summary = rmr.message_summary(sbuf)
+ sbuf = rmr.rmr_send_msg(self.mrc, sbuf) # send
+ if self._assert_good_send(sbuf, pre_send_summary):
+ rmr.rmr_free_msg(sbuf) # free
+ break
+
+ def _rts_msg(self, pay, sbuf_rts, mtype):
+ """
+ sends a message using rts
+ we do not call free here because we may rts many times; it is called after the rts loop
+ """
+ for _ in range(0, RETRY_TIMES):
+ pre_send_summary = rmr.message_summary(sbuf_rts)
+ sbuf_rts = rmr.rmr_rts_msg(self.mrc, sbuf_rts, payload=pay, mtype=mtype)
+ if self._assert_good_send(sbuf_rts, pre_send_summary):
+ break
+ return sbuf_rts # in some cases rts may return a new sbuf
+
def loop(self):
"""
This loop runs forever, and has 3 jobs:
while self.keep_going:
# send out all messages waiting for us
- while not self.work_queue.empty():
- work_item = self.work_queue.get(block=False, timeout=None)
-
- pay = work_item["payload"].encode("utf-8")
- for _ in range(0, RETRY_TIMES):
- # Waiting on an rmr bugfix regarding the over-allocation: https://rancodev.atlassian.net/browse/RICPLT-2490
- sbuf = rmr.rmr_alloc_msg(self.mrc, 4096, pay, True, A1_POLICY_REQUEST)
- # TODO: after next rmr is released, this can be done in the alloc call. but that's not avail in pypi yet
- sbuf.contents.sub_id = work_item["ptid"]
- pre_send_summary = rmr.message_summary(sbuf)
- sbuf = rmr.rmr_send_msg(self.mrc, sbuf) # send
- post_send_summary = rmr.message_summary(sbuf)
- mdc_logger.debug(
- "Pre-send summary: {0}, Post-send summary: {1}".format(pre_send_summary, post_send_summary)
- )
- rmr.rmr_free_msg(sbuf) # free
- if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
- mdc_logger.debug("Message sent successfully!")
- break
-
- # read our mailbox and update statuses
- for msg in self.rcv_func():
+ while not self.instance_send_queue.empty():
+ work_item = self.instance_send_queue.get(block=False, timeout=None)
+ payload = json.dumps(messages.a1_to_handler(*work_item)).encode("utf-8")
+ self._send_msg(payload, A1_POLICY_REQUEST, work_item[1])
+
+ # read our mailbox
+ for (msg, sbuf) in self.rcv_func():
+ # TODO: in the future we may also have to catch SDL errors
try:
- pay = json.loads(msg["payload"])
- pti = pay["policy_type_id"]
- pii = pay["policy_instance_id"]
- data.set_policy_instance_status(pti, pii, pay["handler_id"], pay["status"])
- except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError, TypeError, json.decoder.JSONDecodeError):
- # TODO: in the future we may also have to catch SDL errors
- mdc_logger.debug("Dropping malformed or non applicable message: {0}".format(msg))
-
- # TODO: what's a reasonable sleep time? we don't want to hammer redis too much, and a1 isn't a real time component
+ mtype = msg["message type"]
+ except (KeyError, TypeError, json.decoder.JSONDecodeError):
+ mdc_logger.debug("Dropping malformed policy ack/query message: {0}".format(msg))
+
+ if mtype == A1_POLICY_RESPONSE:
+ try:
+ # got a policy response, update status
+ pay = json.loads(msg["payload"])
+ data.set_policy_instance_status(
+ pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]
+ )
+ mdc_logger.debug("Successfully received status update: {0}".format(pay))
+ except (PolicyTypeNotFound, PolicyInstanceNotFound):
+ mdc_logger.debug("Received a response for a non-existent instance")
+ except (KeyError, TypeError, json.decoder.JSONDecodeError):
+ mdc_logger.debug("Dropping malformed policy ack message: {0}".format(msg))
+
+ elif mtype == A1_POLICY_QUERY:
+ try:
+ # got a query, do a lookup and send out all instances
+ pti = json.loads(msg["payload"])["policy_type_id"]
+ mdc_logger.debug("Received query for: {0}".format(pti))
+ for pii in data.get_instance_list(pti):
+ instance = data.get_policy_instance(pti, pii)
+ payload = json.dumps(messages.a1_to_handler("CREATE", pti, pii, instance)).encode("utf-8")
+ sbuf = self._rts_msg(payload, sbuf, A1_POLICY_REQUEST)
+ except (PolicyTypeNotFound, PolicyInstanceNotFound):
+ mdc_logger.debug("Received a query for a non-existent type: {0}".format(msg))
+ except (KeyError, TypeError, json.decoder.JSONDecodeError):
+ mdc_logger.debug("Dropping malformed policy query message: {0}".format(msg))
+
+ else:
+ mdc_logger.debug("Received message type {0} but A1 does not handle this".format(mtype))
+
+ # we must free each sbuf
+ rmr.rmr_free_msg(sbuf)
+
self.last_ran = time.time()
time.sleep(1)
__RMR_LOOP__.keep_going = False
-def queue_work(item):
+def queue_instance_send(item):
"""
push an item into the work queue
currently the only type of work is to send out messages
"""
- __RMR_LOOP__.work_queue.put(item)
+ __RMR_LOOP__.instance_send_queue.put(item)
def healthcheck_rmr_thread(seconds=30):
# See the License for the specific language governing permissions and
# limitations under the License.
# ==================================================================================
-import json
from flask import Response
from jsonschema import validate
from jsonschema.exceptions import ValidationError
return Response(status=500)
-def _gen_body_to_handler(operation, policy_type_id, policy_instance_id, payload=None):
- """
- used to create the payloads that get sent to downstream policy handlers
- """
- return {
- "operation": operation,
- "policy_type_id": policy_type_id,
- "policy_instance_id": policy_instance_id,
- "payload": payload,
- }
-
-
# Healthcheck
# store the instance
data.store_policy_instance(policy_type_id, policy_instance_id, instance)
- # send rmr (best effort)
- body = _gen_body_to_handler("CREATE", policy_type_id, policy_instance_id, payload=instance)
- a1rmr.queue_work({"payload": json.dumps(body), "ptid": policy_type_id})
+ # queue rmr send (best effort)
+ a1rmr.queue_instance_send(("CREATE", policy_type_id, policy_instance_id, instance))
return "", 202
"""
data.delete_policy_instance(policy_type_id, policy_instance_id)
- # send rmr (best effort)
- body = _gen_body_to_handler("DELETE", policy_type_id, policy_instance_id)
- a1rmr.queue_work({"payload": json.dumps(body), "ptid": policy_type_id})
+ # queue rmr send (best effort)
+ a1rmr.queue_instance_send(("DELETE", policy_type_id, policy_instance_id, ""))
return "", 202
--- /dev/null
+"""
+rmr messages
+"""
+# ==================================================================================
+# Copyright (c) 2019 Nokia
+# Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ==================================================================================
+
+
+def a1_to_handler(operation, policy_type_id, policy_instance_id, payload=None):
+ """
+ used to create the payloads that get sent to downstream policy handlers
+ """
+ return {
+ "operation": operation,
+ "policy_type_id": policy_type_id,
+ "policy_instance_id": policy_instance_id,
+ "payload": payload,
+ }
# ==================================================================================
openapi: 3.0.0
info:
- version: 1.1.0
+ version: 2.0.0
title: RIC A1
paths:
'/a1-p/healthcheck':
# The Jenkins job uses this string for the tag in the image name
# for example nexus3.o-ran-sc.org:10004/my-image-name:my-tag
---
-tag: 1.0.4
+tag: 2.0.0
2) ``Dockerfile-Unit-Test``
-3) ``integration_tests/Dockerfile``
+3) ``integration_tests/Dockerfile-test-delay-receiver``
+
+4) ``integration_tests/Dockerfile-query-receiver``
+
+5) ``rmr-version.yaml``
rmr-python is the python binding to rmr . Installing rmr per the above does not install it.
Bumping the rmr python version dependency requires changes in:
1) ``setup.py``
-2) ``integration_tests/Dockerfile``
+2) ``integration_tests/Dockerfile-test-delay-receiver``
-Run the integration tests after attempting this.
+3) ``integration_tests/Dockerfile-query-receiver``
+Run the integration tests after attempting this.
Unit Testing
------------
Integration testing
-------------------
-This tests A1’s external API with two test receivers. This depends on helm+k8s, meaning you cannot run this if this is not installed.
+This tests A1’s external API with three test receivers. This depends on helm+k8s.
-Unlike the unit tests, however, this does not require rmr to be installed on the base system, as everything
-runs in Docker, and the Dockerfiles provide/install rmr.
+Build all the containers:
-First, build the latest A1 you are testing (from the root):
::
- docker build --no-cache -t a1:latest .
+ docker build -t a1:latest .; cd integration_tests/; docker build -t testreceiver:latest . -f Dockerfile-test-delay-receiver; docker build -t queryreceiver:latest . -f Dockerfile-query-receiver; cd ..
-Note that this step also runs the unit tests, since running the unit tests are part of the Dockerfile for A1.
-If you've never run the integration tests before, build the test receiver, which is referenced in the helm chart:
-::
-
- cd integration_tests
- docker build --no-cache -t testreceiver:latest .
+Then, run all the tests from the root (this requires the python packages ``tox``, ``pytest``, and ``tavern``).
-Finally, run all the tests from the root (this requires the python packages ``tox``, ``pytest``, and ``tavern``).
::
tox -c tox-integration.ini
This script:
-1. Deploys 3 helm charts into a local kubernetes installation
+1. Deploys 2 helm charts (4 containers) into a local kubernetes installation
2. Port forwards a pod ClusterIP to localhost
3. Uses “tavern” to run some tests against the server
4. Barrages the server with apache bench
5. Tears everything down
-
-Unless you're a core A1 developer, you should probably stop here. The below instructions
-are for running A1 locally, without docker, and is much more involved (however useful when developing a1).
-
-Running locally
----------------
-
-1. Before this will work, for the first time on that machine, run ``./install_deps.sh``
-
-2. It also requires rmr-python installed. (The dockerfile does this)
-
-3. Create a ``local.rt`` file and copy it into ``/opt/route/local.rt``.
- Note, the example one in ``integration_tests`` will need to be modified for
- your scenario and machine.
-
-4. Copy a ric manifest into ``/opt/ricmanifest.json`` and an rmr mapping
- table into ``/opt/rmr_string_int_mapping.txt``. You can use the test
- ones packaged if you want:
-
- ::
-
- cp tests/fixtures/ricmanifest.json /opt/ricmanifest.json
- cp tests/fixtures/rmr_string_int_mapping.txt /opt/rmr_string_int_mapping.txt
-
-5. Then:
-
- ::
-
- sudo pip install -e .
- set -x LD_LIBRARY_PATH /usr/local/lib/; set -x RMR_SEED_RT /opt/route/local.rt ; set -x RMR_RCV_RETRY_INTERVAL 500; set -x RMR_RETRY_TIMES 10;
- /usr/bin/run.py
-
-
-There are also two test receivers in ``integration_tests`` you can run locally.
-The first is meant to be used with the ``control_admission`` policy
-(that comes in test fixture ric manifest):
-
-::
-
- set -x LD_LIBRARY_PATH /usr/local/lib/; set -x RMR_SEED_RT /opt/route/local.rt ; python receiver.py
-
-The second can be used against the ``test_policy`` policy to test the
-async nature of A1, and to test race conditions. You can start it with
-several env variables as follows:
-
-::
-
- set -x LD_LIBRARY_PATH /usr/local/lib/; set -x RMR_SEED_RT /opt/route/local.rt ; set -x TEST_RCV_PORT 4563; set -x TEST_RCV_RETURN_MINT 10001; set -x TEST_RCV_SEC_DELAY 5; set -x TEST_RCV_RETURN_PAYLOAD '{"ACK_FROM": "DELAYED_TEST", "status": "SUCCESS"}' ; python receiver.py
-
-To test the async nature of A1, trigger a call to ``test_policy``, which
-will target the delayed receiver, then immediately call
-``control_admission``. The ``control_admission`` policy return should be
-returned immediately, whereas the ``test_policy`` should return after
-about ``TEST_RCV_SEC_DELAY 5``. The ``test_policy`` should not block A1
-while it is sleeping, and both responses should be correct.
-
-::
-
- curl -v -X PUT -H "Content-Type: application/json" -d '{}' localhost:10000/ric/policies/test_policy
- curl -v -X PUT -H "Content-Type: application/json" -d '{ "enforce":true, "window_length":10, "blocking_rate":20, "trigger_threshold":10 }' localhost:10000/ric/policies/admission_control_policy
- curl -v localhost:10000/ric/policies/admission_control_policy
- curl -v localhost:10000/a1-p/healthcheck
:depth: 3
:local:
-[1.x.x] - TBD
+[x.x.x] - TBD
-------------
::
- * Represents a resillent version of 1.0.0 that uses Redis for persistence
+ * Represents a resillent version of x.x.x that uses Redis for persistence
-[x.x.x] - TBD
--------------
+[2.0.0] - 12/9/2019
+-------------------
::
* Eliminates the barrier to deleting an instance when no xapp evdr replied (via timeouts)
* Add two new ENV variables that control timeouts
* Make unit tests more modular so new workflows can be tested easily
- * Changes the API for ../status to return a richer structure
+ * Fixes the API for ../status to return a richer structure. This is an (albeit tiny) API change.
* Clean up unused items in the integration tests helm chart
* Removed "RMR_RCV_RETRY_INTERVAL" leftovers since this isn't used anymore
* Uses the standard RIC logging library
* Given the above, policy type ids can be any valid 32bit greater than 0
* Decouple the API between northbound and A1 from A1 with xapps. This is now two seperate OpenAPI files
* Update example for AC Xapp
+ * Updgrade rmr and rmr-python to utilize new features; lots of cleanups because of that
[1.0.4]
-------
--- /dev/null
+# ==================================================================================
+# Copyright (c) 2019 Nokia
+# Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ==================================================================================
+# install a well known working rmr
+FROM python:3.7-alpine
+RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git
+RUN git clone --branch 1.10.2 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
+ && cd rmr \
+ && mkdir build \
+ && cd build \
+ && cmake .. -DPACK_EXTERNALS=1 \
+ && make install
+
+# stage2
+FROM python:3.7-alpine
+
+# copies
+COPY --from=0 /usr/local/lib64/libnng.so /usr/local/lib64/libnng.so
+COPY --from=0 /usr/local/lib64/librmr_nng.so /usr/local/lib64/librmr_nng.so
+COPY query_tester.py /
+
+# Install RMr python bindings
+RUN pip install --upgrade pip
+RUN pip install rmr==2.2.0
+
+# rmr setups
+RUN mkdir -p /opt/route/
+ENV LD_LIBRARY_PATH /usr/local/lib:/usr/local/lib64
+ENV RMR_SEED_RT /opt/route/local.rt
+
+WORKDIR /
+CMD ["python","-u","query_tester.py"]
# install a well known working rmr
FROM python:3.7-alpine
RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git
-RUN git clone --branch 1.10.1 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
+RUN git clone --branch 1.10.2 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
&& cd rmr \
&& mkdir build \
&& cd build \
# Install RMr python bindings
RUN pip install --upgrade pip
-RUN pip install rmr==1.0.0
+RUN pip install rmr==2.2.0
# rmr setups
RUN mkdir -p /opt/route/
apiVersion: v1
description: A1 Helm chart for Kubernetes
name: a1mediator
-version: 1.0.4
+version: 2.0.0
#!/bin/sh
+echo "\n\n a1" >> log.txt
kubectl get pods --namespace=default | awk '{ print $1 }' | egrep '^a1-a1mediator-' | xargs kubectl logs > log.txt 2>&1
+
+echo "\n\n test receiver" >> log.txt
kubectl get pods --namespace=default | awk '{ print $1 }' | egrep '^testreceiver-' | xargs -I X kubectl logs X testreceiver >> log.txt 2>&1
+
+echo "\n\n delay" >> log.txt
kubectl get pods --namespace=default | awk '{ print $1 }' | egrep '^testreceiver-' | xargs -I X kubectl logs X delayreceiver >> log.txt 2>&1
+
+echo "\n\n query" >> log.txt
+kubectl get pods --namespace=default | awk '{ print $1 }' | egrep '^testreceiver-' | xargs -I X kubectl logs X queryreceiver >> log.txt 2>&1
#!/bin/sh
-git clone --branch 1.9.0 https://gerrit.oran-osc.org/r/ric-plt/lib/rmr \
+git clone --branch 1.10.2 https://gerrit.oran-osc.org/r/ric-plt/lib/rmr \
&& cd rmr \
&& mkdir .build; cd .build; cmake .. -DPACK_EXTERNALS=1; sudo make install \
&& cd ../.. \
--- /dev/null
+# ==================================================================================
+# Copyright (c) 2019 Nokia
+# Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ==================================================================================
+"""
+Test receiver
+"""
+
+import time
+import json
+from rmr import rmr
+
+PORT = "4564"
+
+mrc = rmr.rmr_init(PORT.encode("utf-8"), rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
+test_type = 1006001
+
+while rmr.rmr_ready(mrc) == 0:
+ time.sleep(1)
+ print("not yet ready")
+
+print("listening ON {}".format(PORT))
+
+# loop
+while True:
+
+ # do query
+ pay = {"policy_type_id": test_type}
+ sbuf_send = rmr.rmr_alloc_msg(mrc, 4096, payload=json.dumps(pay).encode("utf-8"), gen_transaction_id=True, mtype=20012)
+ sbuf_send = rmr.rmr_send_msg(mrc, sbuf_send)
+ post_send_summary = rmr.message_summary(sbuf_send)
+
+ if not (post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK"):
+ print("was unable to send query to a1!")
+ time.sleep(1)
+ else:
+ # query worked, wait 2 seconds, then receive everything we have
+ time.sleep(1)
+ print("reading messages")
+
+ # this is a hacked up version of rmr_rcvall_msgs in the rmr package
+ # we need the actual messages, not the summaries, to use rts
+ sbuf_rcv = rmr.rmr_alloc_msg(mrc, 4096) # allocate buffer to have something for a return status
+ while True:
+ sbuf_rcv = rmr.rmr_torcv_msg(mrc, sbuf_rcv, 0) # set the timeout to 0 so this doesn't block!!
+
+ summary = rmr.message_summary(sbuf_rcv)
+ if summary["message status"] != "RMR_OK": # ok indicates msg received, stop on all other states
+ print("no more instances received. will try again in 1s")
+ break
+
+ print("Received: {0}".format(summary))
+
+ received_payload = json.loads(summary["payload"])
+ assert received_payload["policy_type_id"] == test_type
+ assert summary["message type"] == 20010
+
+ payload = {
+ "policy_type_id": received_payload["policy_type_id"],
+ "policy_instance_id": received_payload["policy_instance_id"],
+ "handler_id": "query_tester",
+ "status": "OK",
+ }
+ val = json.dumps(payload).encode("utf-8")
+ rmr.set_payload_and_length(val, sbuf_rcv) # TODO: extend rmr-python to allow rts to accept this param
+ sbuf_rcv.contents.mtype = 20011 # TODO: extend rmr-python to allow rts to accept this param
+ print("Pre reply summary: {}".format(rmr.message_summary(sbuf_rcv)))
+
+ # send ack
+ sbuf_rcv = rmr.rmr_rts_msg(mrc, sbuf_rcv)
+ post_reply_summary = rmr.message_summary(sbuf_rcv)
+ print("Post reply summary: {}".format(post_reply_summary))
---
+test_name: test query
+
+stages:
+ - name: type not there yet
+ request:
+ url: http://localhost:10000/a1-p/policytypes/1006001
+ method: GET
+ response:
+ status_code: 404
+
+ - name: put the type
+ request:
+ url: http://localhost:10000/a1-p/policytypes/1006001
+ method: PUT
+ json:
+ name: query test
+ description: test
+ policy_type_id: 1006001
+ create_schema:
+ "$schema": http://json-schema.org/draft-07/schema#
+ type: object
+ additionalProperties: false
+ properties:
+ foo:
+ type: string
+ required:
+ - foo
+ response:
+ status_code: 201
+
+ - name: type there now
+ request:
+ url: http://localhost:10000/a1-p/policytypes/1006001
+ method: GET
+ response:
+ status_code: 200
+
+ - name: instance list 200 but empty
+ request:
+ url: http://localhost:10000/a1-p/policytypes/1006001/policies
+ method: GET
+ response:
+ status_code: 200
+ body: []
+
+ - name: instance 1
+ request:
+ url: http://localhost:10000/a1-p/policytypes/1006001/policies/qt1
+ method: PUT
+ json:
+ foo: "bar1"
+ headers:
+ content-type: application/json
+ response:
+ status_code: 202
+
+ - name: instance 2
+ request:
+ url: http://localhost:10000/a1-p/policytypes/1006001/policies/qt2
+ method: PUT
+ json:
+ foo: "bar2"
+ headers:
+ content-type: application/json
+ response:
+ status_code: 202
+
+ - name: instance list
+ request:
+ url: http://localhost:10000/a1-p/policytypes/1006001/policies
+ method: GET
+ response:
+ status_code: 200
+ body: [qt1, qt2]
+
+ # after the query, a1 should send, query receiver should send back, and the policy should be in effect
+
+ - name: test the query status get
+ max_retries: 3
+ delay_before: 6 # give it a few seconds for rmr ; delay reciever sleeps for 5 seconds by default
+ request:
+ url: http://localhost:10000/a1-p/policytypes/1006001/policies/qt1/status
+ method: GET
+ response:
+ status_code: 200
+ body:
+ instance_status: "IN EFFECT"
+ has_been_deleted: False
+
+ - name: test the query status get 2
+ max_retries: 3
+ delay_before: 6 # give it a few seconds for rmr ; delay reciever sleeps for 5 seconds by default
+ request:
+ url: http://localhost:10000/a1-p/policytypes/1006001/policies/qt2/status
+ method: GET
+ response:
+ status_code: 200
+ body:
+ instance_status: "IN EFFECT"
+ has_been_deleted: False
+
+---
+
test_name: test bad routing file endpoint
stages:
type: object
response:
status_code: 400
-
-
+++ /dev/null
-newrt|start
-mse|20010|6660666|devarchwork:4560
-mse|20010|20001|devarchwork:4563
-rte|20011|devarchwork:4562
-newrt|end
data:
local.rt: |
newrt|start
+ # we actaully use rts so i dont even think this is used
rte|20011|a1rmrservice:4562
newrt|end
+
+---
+
+apiVersion: v1
+kind: ConfigMap
+metadata:
+ name: queryreceiverconf
+data:
+ local.rt: |
+ newrt|start
+ # this query is initiated in the query receiver so this is certainly needed
+ rte|20012|a1rmrservice:4562
+ newrt|end
app.kubernetes.io/instance: {{ .Release.Name }}
spec:
containers:
+ #query receiver
+ - name: queryreceiver
+ image: queryreceiver:latest
+ imagePullPolicy: Never
+ resources:
+ {{- toYaml .Values.resources | nindent 12 }}
+ volumeMounts:
+ - name: queryreceiverconf
+ mountPath: /opt/route/local.rt
+ subPath: local.rt
+ env:
+ # this sets the source field in messages from a1 to point back to a1s service name, rather than it's random pod name
+ - name: RMR_SRC_ID
+ value: {{ .Values.queryrmrservice.name }}
+
+ # test receiver
- name: testreceiver
image: testreceiver:latest
imagePullPolicy: Never
- name: testreceiverconf
mountPath: /opt/route/local.rt
subPath: local.rt
+
+ # test receiver that delays until sending
- name: delayreceiver
image: testreceiver:latest
imagePullPolicy: Never
- name: "delayreceiverconf"
configMap:
name: "delayreceiverconf"
+ - name: "queryreceiverconf"
+ configMap:
+ name: "queryreceiverconf"
selector:
app.kubernetes.io/name: {{ include "testreceiver.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
+
+---
+
+apiVersion: v1
+kind: Service
+metadata:
+ name: {{ .Values.queryrmrservice.name }}
+ labels:
+{{ include "testreceiver.labels" . | indent 4 }}
+spec:
+ type: {{ .Values.queryrmrservice.type }}
+ ports:
+ - port: {{ .Values.queryrmrservice.port }}
+ targetPort: {{ .Values.queryrmrservice.port }}
+ protocol: TCP
+ selector:
+ app.kubernetes.io/name: {{ include "testreceiver.name" . }}
+ app.kubernetes.io/instance: {{ .Release.Name }}
name: delayreceiverrmrservice
type: ClusterIP
port: 4563
+
+queryrmrservice:
+ name: queryreceiverrmrservice
+ type: ClusterIP
+ port: 4564
# CI script installs RMR from PackageCloud using this version
---
-version: 1.8.1
+version: 1.10.2
setup(
name="a1",
- version="1.0.4",
+ version="2.0.0",
packages=find_packages(exclude=["tests.*", "tests"]),
author="Tommy Carpenter",
description="RIC A1 Mediator for policy/intent changes",
url="https://gerrit.o-ran-sc.org/r/admin/repos/ric-plt/a1",
entry_points={"console_scripts": ["run.py=a1.run:main"]},
# we require jsonschema, should be in that list, but connexion already requires a specific version of it
- install_requires=["requests", "Flask", "connexion[swagger-ui]", "gevent", "msgpack", "rmr>=1.0.0", "mdclogpy"],
+ install_requires=["requests", "Flask", "connexion[swagger-ui]", "gevent", "msgpack", "rmr>=2.2.0", "mdclogpy"],
package_data={"a1": ["openapi.yaml"]},
)
ADM_CTRL_INSTANCE = ADM_CTRL_POLICIES + "/" + ADM_CTRL_IID
ADM_CTRL_INSTANCE_STATUS = ADM_CTRL_INSTANCE + "/status"
ADM_CTRL_TYPE = "/a1-p/policytypes/{0}".format(ADM_CRTL_TID)
+ACK_MT = 20011
def _fake_dequeue():
pay = json.dumps(
{"policy_type_id": ADM_CRTL_TID, "policy_instance_id": ADM_CTRL_IID, "handler_id": RCV_ID, "status": "OK"}
).encode()
- fake_msg = {"payload": pay}
- return [fake_msg]
+ fake_msg = {"payload": pay, "message type": ACK_MT}
+ return [(fake_msg, None)]
def _fake_dequeue_none():
def _fake_dequeue_deleted():
"""for monkeypatching with a DELETED status"""
new_msgs = []
+ good_pay = json.dumps(
+ {"policy_type_id": ADM_CRTL_TID, "policy_instance_id": ADM_CTRL_IID, "handler_id": RCV_ID, "status": "DELETED"}
+ ).encode()
- # non existent type
+ # non existent type id
pay = json.dumps(
{"policy_type_id": 911, "policy_instance_id": ADM_CTRL_IID, "handler_id": RCV_ID, "status": "DELETED"}
).encode()
- fake_msg = {"payload": pay}
- new_msgs.append(fake_msg)
+ fake_msg = {"payload": pay, "message type": ACK_MT}
+ new_msgs.append((fake_msg, None))
+ # bad instance id
pay = json.dumps(
{"policy_type_id": ADM_CRTL_TID, "policy_instance_id": "darkness", "handler_id": RCV_ID, "status": "DELETED"}
).encode()
- fake_msg = {"payload": pay}
- new_msgs.append(fake_msg)
+ fake_msg = {"payload": pay, "message type": ACK_MT}
+ new_msgs.append((fake_msg, None))
+
+ # good body but bad message type
+ fake_msg = {"payload": good_pay, "message type": ACK_MT * 3}
+ new_msgs.append((fake_msg, None))
# insert a bad one with a malformed body to make sure we keep going
- new_msgs.append({"payload": "asdf"})
+ new_msgs.append(({"payload": "asdf", "message type": ACK_MT}, None))
# not even a json
- new_msgs.append("asdf")
+ new_msgs.append(("asdf", None))
- pay = json.dumps(
- {"policy_type_id": ADM_CRTL_TID, "policy_instance_id": ADM_CTRL_IID, "handler_id": RCV_ID, "status": "DELETED"}
- ).encode()
- fake_msg = {"payload": pay}
- new_msgs.append(fake_msg)
+ # good
+ fake_msg = {"payload": good_pay, "message type": ACK_MT}
+ new_msgs.append((fake_msg, None))
return new_msgs
# assert that rmr bad states don't cause problems
monkeypatch.setattr("rmr.rmr.rmr_send_msg", rmr_mocks.send_mock_generator(10))
- # we need this because free expects a real sbuf
- # TODO: move this into rmr_mocks
- def noop(_sbuf):
- pass
-
- monkeypatch.setattr("rmr.rmr.rmr_free_msg", noop)
-
- # we need to repatch alloc (already patched in patch_rmr) to fix the transactionid, alloc is called in send and recieve
- def fake_alloc(_unused1, _unused2, _unused3, _unused4, _unused5):
- sbuf = rmr_mocks.Rmr_mbuf_t()
- sbuf.contents.xaction = b"d49b53e478b711e9a1130242ac110002"
- return sbuf
-
- # we also need to repatch set, since in the send function, we alloc, then set a new transid
- def fake_set_transactionid(sbuf):
- sbuf.contents.xaction = b"d49b53e478b711e9a1130242ac110002"
-
- # Note, we could have just patched summary, but this patches at a "lower level" so is a better test
- monkeypatch.setattr("rmr.rmr.rmr_alloc_msg", fake_alloc)
- monkeypatch.setattr("rmr.rmr.generate_and_set_transaction_id", fake_set_transactionid)
-
def _no_ac(client):
# no type there yet