From 7cec82d95d4b9d18c42b103eb14d39f94fcd0776 Mon Sep 17 00:00:00 2001 From: Tommy Carpenter Date: Mon, 14 Oct 2019 16:18:21 -0400 Subject: [PATCH] Towards Resiliency. - Moves the "database" access calls to mimic the SDL API, in preparation for moving to SDL. - Does not yet actually use SDL or Redis, but the transition to those will be much shorter after this change. Change-Id: I596d06bcf85b6e9d5184d478802da574058c69f9 Signed-off-by: Tommy Carpenter --- Dockerfile-Unit-Test | 6 +- a1/controller.py | 2 +- a1/data.py | 151 ++++++++++++++++++++++++-------- container-tag.yaml | 2 +- docs/release-notes.rst | 9 +- integration_tests/a1mediator/Chart.yaml | 2 +- setup.py | 4 +- tests/test_controller.py | 32 +++++-- tests/test_data.py | 46 ++++++++++ 9 files changed, 203 insertions(+), 51 deletions(-) create mode 100644 tests/test_data.py diff --git a/Dockerfile-Unit-Test b/Dockerfile-Unit-Test index 49df469..074b44a 100644 --- a/Dockerfile-Unit-Test +++ b/Dockerfile-Unit-Test @@ -29,11 +29,11 @@ FROM python:3.7-alpine # dir that rmr routing file temp goes into RUN mkdir -p /opt/route/ -# Gevent needs gcc; TODO: this will get fixed +# Gevent needs gcc RUN apk add gcc musl-dev -# Upgrade pip, install tox -RUN pip install --upgrade pip && pip install tox +# Upgrade pip, install tox (gevent is installed as a speed hack in local dev where tox is run many times) +RUN pip install --upgrade pip && pip install tox gevent # copies COPY --from=0 /usr/local/lib64/libnng.so /usr/local/lib64/libnng.so diff --git a/a1/controller.py b/a1/controller.py index b9db142..b0ae31a 100644 --- a/a1/controller.py +++ b/a1/controller.py @@ -65,7 +65,7 @@ def _gen_body_to_handler(operation, policy_type_id, policy_instance_id, payload= def get_healthcheck(): """ Handles healthcheck GET - Currently, this basically checks the server is alive.a1rmr + Currently, this basically checks the server is alive """ return "", 200 diff --git a/a1/data.py b/a1/data.py index d320046..631d2c6 100644 --- a/a1/data.py +++ b/a1/data.py @@ -24,29 +24,90 @@ For now, the database is in memory. We use dict data structures (KV) with the expectation of having to move this into Redis """ import json +import msgpack from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound, PolicyTypeAlreadyExists, CantDeleteNonEmptyType from a1 import get_module_logger from a1 import a1rmr logger = get_module_logger(__name__) -# This is essentially mockouts for future KV -# Note that the D subkey won't be needed when in redis, since you can store data at x anx x_y -POLICY_DATA = {} -I = "instances" -H = "handlers" -D = "data" + +class SDLWrapper: + """ + This is a wrapper around the expected SDL Python interface. + The usage of POLICY_DATA will be replaced with SDL when SDL for python is available. + The eventual SDL API is expected to be very close to what is here. + + We use msgpack for binary (de)serialization: https://msgpack.org/index.html + """ + + def __init__(self): + self.POLICY_DATA = {} + + def set(self, key, value): + """set a key""" + self.POLICY_DATA[key] = msgpack.packb(value, use_bin_type=True) + + def get(self, key): + """get a key""" + if key in self.POLICY_DATA: + return msgpack.unpackb(self.POLICY_DATA[key], raw=False) + return None + + def find_and_get(self, prefix): + """get all k v pairs that start with prefix""" + return {k: msgpack.unpackb(v, raw=False) for k, v in self.POLICY_DATA.items() if k.startswith(prefix)} + + def delete(self, key): + """ delete a key""" + del self.POLICY_DATA[key] + + +SDL = SDLWrapper() + +TYPE_PREFIX = "a1.policy_type." +INSTANCE_PREFIX = "a1.policy_instance." +HANDLER_PREFIX = "a1.policy_handler." # Internal helpers +def _generate_type_key(policy_type_id): + """ + generate a key for a policy type + """ + return "{0}{1}".format(TYPE_PREFIX, policy_type_id) + + +def _generate_instance_key(policy_type_id, policy_instance_id): + """ + generate a key for a policy instance + """ + return "{0}{1}.{2}".format(INSTANCE_PREFIX, policy_type_id, policy_instance_id) + + +def _generate_handler_prefix(policy_type_id, policy_instance_id): + """ + generate the prefix to a handler key + """ + return "{0}{1}.{2}.".format(HANDLER_PREFIX, policy_type_id, policy_instance_id) + + +def _generate_handler_key(policy_type_id, policy_instance_id, handler_id): + """ + generate a key for a policy handler + """ + return "{0}{1}".format(_generate_handler_prefix(policy_type_id, policy_instance_id), handler_id) + + def _get_statuses(policy_type_id, policy_instance_id): """ shared helper to get statuses for an instance """ instance_is_valid(policy_type_id, policy_instance_id) - return [v for _, v in POLICY_DATA[policy_type_id][I][policy_instance_id][H].items()] + prefixes_for_handler = "{0}{1}.{2}.".format(HANDLER_PREFIX, policy_type_id, policy_instance_id) + return list(SDL.find_and_get(prefixes_for_handler).values()) def _get_instance_list(policy_type_id): @@ -54,7 +115,19 @@ def _get_instance_list(policy_type_id): shared helper to get instance list for a type """ type_is_valid(policy_type_id) - return list(POLICY_DATA[policy_type_id][I].keys()) + prefixes_for_type = "{0}{1}.".format(INSTANCE_PREFIX, policy_type_id) + instancekeys = SDL.find_and_get(prefixes_for_type).keys() + return [k.split(prefixes_for_type)[1] for k in instancekeys] + + +def _clear_handlers(policy_type_id, policy_instance_id): + """ + delete all the handlers for a policy instance + """ + all_handlers_pref = _generate_handler_prefix(policy_type_id, policy_instance_id) + keys = SDL.find_and_get(all_handlers_pref) + for k in keys: + SDL.delete(k) def _clean_up_type(policy_type_id): @@ -67,19 +140,24 @@ def _clean_up_type(policy_type_id): # try to parse the messages as responses. Drop those that are malformed pay = json.loads(msg["payload"]) if "policy_type_id" in pay and "policy_instance_id" in pay and "handler_id" in pay and "status" in pay: - """ - NOTE: can't raise an exception here e.g.: - instance_is_valid(pti, pii) - because this is called on many functions; just drop bad status messages. - We def don't want bad messages that happen to hit a1s mailbox to blow up anything - - NOTE2: we don't use the parameters "policy_type_id, policy_instance" from above here, + # We don't use the parameters "policy_type_id, policy_instance" from above here, # because we are popping the whole mailbox, which might include other statuses - """ pti = pay["policy_type_id"] pii = pay["policy_instance_id"] - if pti in POLICY_DATA and pii in POLICY_DATA[pti][I]: # manual check per comment above - POLICY_DATA[pti][I][pii][H][pay["handler_id"]] = pay["status"] + + try: + """ + can't raise an exception here e.g.: + because this is called on many functions; just drop bad status messages. + We def don't want bad messages that happen to hit a1s mailbox to blow up anything + + """ + type_is_valid(pti) + instance_is_valid(pti, pii) + SDL.set(_generate_handler_key(pti, pii, pay["handler_id"]), pay["status"]) + except (PolicyTypeNotFound, PolicyInstanceNotFound): + pass + else: logger.debug("Dropping message") logger.debug(pay) @@ -108,7 +186,8 @@ def _clean_up_type(policy_type_id): # blow away from a1 db if all_deleted: - del POLICY_DATA[policy_type_id][I][policy_instance_id] + _clear_handlers(policy_type_id, policy_instance_id) # delete all the handlers + SDL.delete(_generate_instance_key(policy_type_id, policy_instance_id)) # delete instance # Types @@ -118,15 +197,16 @@ def get_type_list(): """ retrieve all type ids """ - return list(POLICY_DATA.keys()) + typekeys = SDL.find_and_get(TYPE_PREFIX).keys() + # policy types are ints but they get butchered to strings in the KV + return [int(k.split(TYPE_PREFIX)[1]) for k in typekeys] def type_is_valid(policy_type_id): """ check that a type is valid """ - if policy_type_id not in POLICY_DATA: - logger.error("%s not found", policy_type_id) + if SDL.get(_generate_type_key(policy_type_id)) is None: raise PolicyTypeNotFound() @@ -134,12 +214,10 @@ def store_policy_type(policy_type_id, body): """ store a policy type if it doesn't already exist """ - if policy_type_id in POLICY_DATA: + key = _generate_type_key(policy_type_id) + if SDL.get(key) is not None: raise PolicyTypeAlreadyExists() - - POLICY_DATA[policy_type_id] = {} - POLICY_DATA[policy_type_id][D] = body - POLICY_DATA[policy_type_id][I] = {} + SDL.set(key, body) def delete_policy_type(policy_type_id): @@ -148,7 +226,7 @@ def delete_policy_type(policy_type_id): """ pil = get_instance_list(policy_type_id) if pil == []: # empty, can delete - del POLICY_DATA[policy_type_id] + SDL.delete(_generate_type_key(policy_type_id)) else: raise CantDeleteNonEmptyType() @@ -158,7 +236,7 @@ def get_policy_type(policy_type_id): retrieve a type """ type_is_valid(policy_type_id) - return POLICY_DATA[policy_type_id][D] + return SDL.get(_generate_type_key(policy_type_id)) # Instances @@ -169,7 +247,7 @@ def instance_is_valid(policy_type_id, policy_instance_id): check that an instance is valid """ type_is_valid(policy_type_id) - if policy_instance_id not in POLICY_DATA[policy_type_id][I]: + if SDL.get(_generate_instance_key(policy_type_id, policy_instance_id)) is None: raise PolicyInstanceNotFound @@ -178,12 +256,11 @@ def store_policy_instance(policy_type_id, policy_instance_id, instance): Store a policy instance """ type_is_valid(policy_type_id) - - # store the instance - # Reset the statuses because this is a new policy instance, even if it was overwritten - POLICY_DATA[policy_type_id][I][policy_instance_id] = {} - POLICY_DATA[policy_type_id][I][policy_instance_id][D] = instance - POLICY_DATA[policy_type_id][I][policy_instance_id][H] = {} + key = _generate_instance_key(policy_type_id, policy_instance_id) + if SDL.get(key) is not None: + # Reset the statuses because this is a new policy instance, even if it was overwritten + _clear_handlers(policy_type_id, policy_instance_id) # delete all the handlers + SDL.set(key, instance) def get_policy_instance(policy_type_id, policy_instance_id): @@ -192,7 +269,7 @@ def get_policy_instance(policy_type_id, policy_instance_id): """ _clean_up_type(policy_type_id) instance_is_valid(policy_type_id, policy_instance_id) - return POLICY_DATA[policy_type_id][I][policy_instance_id][D] + return SDL.get(_generate_instance_key(policy_type_id, policy_instance_id)) def get_policy_instance_statuses(policy_type_id, policy_instance_id): diff --git a/container-tag.yaml b/container-tag.yaml index 163ffaa..4072098 100644 --- a/container-tag.yaml +++ b/container-tag.yaml @@ -1,4 +1,4 @@ # The Jenkins job uses this string for the tag in the image name # for example nexus3.o-ran-sc.org:10004/my-image-name:my-tag --- -tag: 1.0.0 +tag: 1.0.1 diff --git a/docs/release-notes.rst b/docs/release-notes.rst index 7a537d9..2060a2a 100644 --- a/docs/release-notes.rst +++ b/docs/release-notes.rst @@ -23,12 +23,19 @@ All notable changes to this project will be documented in this file. The format is based on `Keep a Changelog `__ and this project adheres to `Semantic Versioning `__. -[1.1.0] - TBD +[1.x.x] - TBD :: * Represents a resillent version of 1.0.0 that uses Redis for persistence +[1.0.1] + +:: + + * Moves the "database" access calls to mimick the SDL API, in preparation for moving to SDL + * Does not yet actually use SDL or Redis, but the transition to those will be much shorter after this change. + [1.0.0] - 10/7/2019 diff --git a/integration_tests/a1mediator/Chart.yaml b/integration_tests/a1mediator/Chart.yaml index be6cc65..792818d 100644 --- a/integration_tests/a1mediator/Chart.yaml +++ b/integration_tests/a1mediator/Chart.yaml @@ -1,4 +1,4 @@ apiVersion: v1 description: A1 Helm chart for Kubernetes name: a1mediator -version: 1.0.0 +version: 1.0.1 diff --git a/setup.py b/setup.py index 7ce5d53..9c242be 100644 --- a/setup.py +++ b/setup.py @@ -18,13 +18,13 @@ from setuptools import setup, find_packages setup( name="a1", - version="1.0.0", + version="1.0.1", packages=find_packages(exclude=["tests.*", "tests"]), author="Tommy Carpenter", description="RIC A1 Mediator for policy/intent changes", url="https://gerrit.o-ran-sc.org/r/admin/repos/ric-plt/a1", entry_points={"console_scripts": ["run.py=a1.run:main"]}, # we require jsonschema, should be in that list, but connexion already requires a specific version of it - install_requires=["requests", "Flask", "connexion[swagger-ui]", "gevent", "rmr>=0.13.2"], + install_requires=["requests", "Flask", "connexion[swagger-ui]", "gevent", "msgpack", "rmr>=0.13.2"], package_data={"a1": ["openapi.yaml"]}, ) diff --git a/tests/test_controller.py b/tests/test_controller.py index 33f1447..f4e0ae2 100644 --- a/tests/test_controller.py +++ b/tests/test_controller.py @@ -65,11 +65,22 @@ def _fake_dequeue_deleted(_filter_type): """ for monkeypatching a1rmnr.dequeue_all_messages with a DELETED status """ - fake_msg = {} + new_msgs = [] + + # insert some that don't exist to make sure nothing blows up + pay = b'{"policy_type_id": 20666, "policy_instance_id": "admission_control_policy", "handler_id": "test_receiver", "status": "DELETED"}' + fake_msg = {"payload": pay} + new_msgs.append(fake_msg) + + pay = b'{"policy_type_id": 20000, "policy_instance_id": "darkness", "handler_id": "test_receiver", "status": "DELETED"}' + fake_msg = {"payload": pay} + new_msgs.append(fake_msg) + pay = b'{"policy_type_id": 20000, "policy_instance_id": "admission_control_policy", "handler_id": "test_receiver", "status": "DELETED"}' - fake_msg["payload"] = pay - new_messages = [fake_msg] - return new_messages + fake_msg = {"payload": pay} + new_msgs.append(fake_msg) + + return new_msgs def _test_put_patch(monkeypatch): @@ -101,7 +112,7 @@ def _test_put_patch(monkeypatch): # Actual Tests -def test_workflow(client, monkeypatch, adm_type_good, adm_instance_good): +def test_workflow_nothing_there_yet(client, monkeypatch, adm_type_good, adm_instance_good): """ test policy put good""" # no type there yet @@ -118,10 +129,16 @@ def test_workflow(client, monkeypatch, adm_type_good, adm_instance_good): res = client.get(ADM_CTRL_POLICIES) assert res.status_code == 404 + +def test_workflow(client, monkeypatch, adm_type_good, adm_instance_good): # put the type res = client.put(ADM_CTRL_TYPE, json=adm_type_good) assert res.status_code == 201 + # cant replace types + res = client.put(ADM_CTRL_TYPE, json=adm_type_good) + assert res.status_code == 400 + # type there now res = client.get(ADM_CTRL_TYPE) assert res.status_code == 200 @@ -131,6 +148,7 @@ def test_workflow(client, monkeypatch, adm_type_good, adm_instance_good): assert res.json == [20000] # instance 200 but empty list + monkeypatch.setattr("a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue_none) res = client.get(ADM_CTRL_POLICIES) assert res.status_code == 200 assert res.json == [] @@ -148,6 +166,10 @@ def test_workflow(client, monkeypatch, adm_type_good, adm_instance_good): res = client.put(ADM_CTRL_INSTANCE, json=adm_instance_good) assert res.status_code == 202 + # replace is allowed on instances + res = client.put(ADM_CTRL_INSTANCE, json=adm_instance_good) + assert res.status_code == 202 + # instance 200 and in list res = client.get(ADM_CTRL_POLICIES) assert res.status_code == 200 diff --git a/tests/test_data.py b/tests/test_data.py new file mode 100644 index 0000000..45c2eac --- /dev/null +++ b/tests/test_data.py @@ -0,0 +1,46 @@ +# ================================================================================== +# Copyright (c) 2019 Nokia +# Copyright (c) 2018-2019 AT&T Intellectual Property. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ================================================================================== +from a1 import data + + +def test_sdl_raw(): + """ + test raw sdl functions + """ + data.SDL = data.SDLWrapper() + data.SDL.set("as.df1", "data") + data.SDL.set("as.df2", "data2") + assert data.SDL.get("as.df1") == "data" + assert data.SDL.get("as.df2") == "data2" + assert data.SDL.find_and_get("as.df1") == {"as.df1": "data"} + assert data.SDL.find_and_get("as.df2") == {"as.df2": "data2"} + assert data.SDL.find_and_get("as.df") == {"as.df1": "data", "as.df2": "data2"} + assert data.SDL.find_and_get("as.d") == {"as.df1": "data", "as.df2": "data2"} + assert data.SDL.find_and_get("as.") == {"as.df1": "data", "as.df2": "data2"} + assert data.SDL.find_and_get("asd") == {} + + # delete 1 + data.SDL.delete("as.df1") + assert data.SDL.get("as.df1") is None + assert data.SDL.get("as.df2") == "data2" + + # delete 2 + data.SDL.delete("as.df2") + assert data.SDL.get("as.df2") is None + + assert data.SDL.find_and_get("as.df") == {} + assert data.SDL.find_and_get("") == {} -- 2.16.6