From: Tommy Carpenter Date: Mon, 2 Dec 2019 14:34:25 +0000 (-0500) Subject: Merge branch 'development' X-Git-Tag: 2.0.0~1^2 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=refs%2Fchanges%2F01%2F1901%2F1;hp=50487bf719884e6a339b48ee5971d757837287b2;p=ric-plt%2Fa1.git Merge branch 'development' Signed-off-by: Tommy Carpenter Change-Id: I44938167f438f84337cd078d57588bd90ff2d9a3 --- diff --git a/.gitignore b/.gitignore index 5e125ef..fb1708b 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,6 @@ +# misc cruft *.log +integration_tests/log.txt NOTES.txt rmr/* docs_and_diagrams/ @@ -7,6 +9,7 @@ docs_and_diagrams/ .tox docs/_build/ +# standard python ignore template .pytest_cache/ xunit-results.xml .DS_Store diff --git a/Dockerfile-Unit-Test b/Dockerfile-Unit-Test index 6be53d5..b5d524c 100644 --- a/Dockerfile-Unit-Test +++ b/Dockerfile-Unit-Test @@ -44,4 +44,4 @@ COPY setup.py tox.ini /tmp/ WORKDIR /tmp # Run the unit tests -RUN tox +RUN tox -e py37, flake8 diff --git a/a1/__init__.py b/a1/__init__.py index 6212289..818ed1a 100644 --- a/a1/__init__.py +++ b/a1/__init__.py @@ -1,3 +1,6 @@ +""" +contains the app; broken out here for ease of unit testing +""" # ================================================================================== # Copyright (c) 2019 Nokia # Copyright (c) 2018-2019 AT&T Intellectual Property. @@ -14,23 +17,8 @@ # See the License for the specific language governing permissions and # limitations under the License. # ================================================================================== -import logging import connexion -def get_module_logger(mod_name): - """ - To use this, do logger = get_module_logger(__name__) - """ - logger = logging.getLogger(mod_name) - handler = logging.StreamHandler() - formatter = logging.Formatter( - '%(asctime)s [%(name)-12s] %(levelname)-8s %(message)s') - handler.setFormatter(formatter) - logger.addHandler(handler) - logger.setLevel(logging.DEBUG) - return logger - - -app = connexion.App(__name__, specification_dir='.') -app.add_api('openapi.yaml', arguments={'title': 'My Title'}) +app = connexion.App(__name__, specification_dir=".") +app.add_api("openapi.yaml", arguments={"title": "My Title"}) diff --git a/a1/a1rmr.py b/a1/a1rmr.py index fd0d87f..ae2bf00 100644 --- a/a1/a1rmr.py +++ b/a1/a1rmr.py @@ -23,14 +23,20 @@ import time import json from threading import Thread from rmr import rmr, helpers -from a1 import get_module_logger +from mdclogpy import Logger from a1 import data from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound -logger = get_module_logger(__name__) +mdc_logger = Logger(name=__name__) -RETRY_TIMES = int(os.environ.get("RMR_RETRY_TIMES", 4)) +RETRY_TIMES = int(os.environ.get("A1_RMR_RETRY_TIMES", 4)) + + +A1_POLICY_REQUEST = 20010 +A1_POLICY_RESPONSE = 20011 +A1_POLICY_QUERY = 20012 + # Note; yes, globals are bad, but this is a private (to this module) global # No other module can import/access this (well, python doesn't enforce this, but all linters will complain) @@ -53,7 +59,7 @@ class _RmrLoop: if init_func_override: self.mrc = init_func_override() else: - logger.debug("Waiting for rmr to initialize..") + mdc_logger.debug("Waiting for rmr to initialize..") # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an # internal ring of messages, and receive calls read from that # currently the size is 2048 messages, so this is fine for the foreseeable future @@ -62,7 +68,10 @@ class _RmrLoop: time.sleep(0.5) # set the receive function - self.rcv_func = rcv_func_override if rcv_func_override else lambda: helpers.rmr_rcvall_msgs(self.mrc, [21024]) + # TODO: when policy query is implemented, add A1_POLICY_QUERY + self.rcv_func = ( + rcv_func_override if rcv_func_override else lambda: helpers.rmr_rcvall_msgs(self.mrc, [A1_POLICY_RESPONSE]) + ) # start the work loop self.thread = Thread(target=self.loop) @@ -76,7 +85,7 @@ class _RmrLoop: - clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET) """ # loop forever - logger.debug("Work loop starting") + mdc_logger.debug("Work loop starting") while self.keep_going: # send out all messages waiting for us @@ -86,33 +95,30 @@ class _RmrLoop: pay = work_item["payload"].encode("utf-8") for _ in range(0, RETRY_TIMES): # Waiting on an rmr bugfix regarding the over-allocation: https://rancodev.atlassian.net/browse/RICPLT-2490 - sbuf = rmr.rmr_alloc_msg(self.mrc, 4096, pay, True, work_item["msg type"]) + sbuf = rmr.rmr_alloc_msg(self.mrc, 4096, pay, True, A1_POLICY_REQUEST) + # TODO: after next rmr is released, this can be done in the alloc call. but that's not avail in pypi yet + sbuf.contents.sub_id = work_item["ptid"] pre_send_summary = rmr.message_summary(sbuf) sbuf = rmr.rmr_send_msg(self.mrc, sbuf) # send post_send_summary = rmr.message_summary(sbuf) - logger.debug("Pre-send summary: %s, Post-send summary: %s", pre_send_summary, post_send_summary) + mdc_logger.debug( + "Pre-send summary: {0}, Post-send summary: {1}".format(pre_send_summary, post_send_summary) + ) rmr.rmr_free_msg(sbuf) # free if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK": - logger.debug("Message sent successfully!") + mdc_logger.debug("Message sent successfully!") break # read our mailbox and update statuses - updated_instances = set() for msg in self.rcv_func(): try: pay = json.loads(msg["payload"]) pti = pay["policy_type_id"] pii = pay["policy_instance_id"] - data.set_status(pti, pii, pay["handler_id"], pay["status"]) - updated_instances.add((pti, pii)) - except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError, json.decoder.JSONDecodeError): + data.set_policy_instance_status(pti, pii, pay["handler_id"], pay["status"]) + except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError, TypeError, json.decoder.JSONDecodeError): # TODO: in the future we may also have to catch SDL errors - logger.debug(("Dropping malformed or non applicable message", msg)) - - # for all updated instances, see if we can trigger a delete - # should be no catch needed here, since the status update would have failed if it was a bad pair - for ut in updated_instances: - data.clean_up_instance(ut[0], ut[1]) + mdc_logger.debug("Dropping malformed or non applicable message: {0}".format(msg)) # TODO: what's a reasonable sleep time? we don't want to hammer redis too much, and a1 isn't a real time component self.last_ran = time.time() diff --git a/a1/controller.py b/a1/controller.py index 75d8456..1022320 100644 --- a/a1/controller.py +++ b/a1/controller.py @@ -22,11 +22,11 @@ from flask import Response from jsonschema import validate from jsonschema.exceptions import ValidationError import connexion -from a1 import get_module_logger +from mdclogpy import Logger from a1 import a1rmr, exceptions, data -logger = get_module_logger(__name__) +mdc_logger = Logger(name=__name__) def _try_func_return(func): @@ -41,7 +41,7 @@ def _try_func_return(func): return "", 404 except BaseException as exc: # catch all, should never happen... - logger.exception(exc) + mdc_logger.exception(exc) return Response(status=500) @@ -142,14 +142,7 @@ def get_policy_instance_status(policy_type_id, policy_instance_id): 3. "NOT IN EFFECT" otherwise (no statuses, or none are OK but not all are deleted) """ - def get_status_handler(): - vector = data.get_policy_instance_statuses(policy_type_id, policy_instance_id) - for i in vector: - if i == "OK": - return "IN EFFECT", 200 - return "NOT IN EFFECT", 200 - - return _try_func_return(get_status_handler) + return _try_func_return(lambda: data.get_policy_instance_status(policy_type_id, policy_instance_id)) def create_or_replace_policy_instance(policy_type_id, policy_instance_id): @@ -173,7 +166,7 @@ def create_or_replace_policy_instance(policy_type_id, policy_instance_id): # send rmr (best effort) body = _gen_body_to_handler("CREATE", policy_type_id, policy_instance_id, payload=instance) - a1rmr.queue_work({"payload": json.dumps(body), "msg type": policy_type_id}) + a1rmr.queue_work({"payload": json.dumps(body), "ptid": policy_type_id}) return "", 202 @@ -188,12 +181,13 @@ def delete_policy_instance(policy_type_id, policy_instance_id): def delete_instance_handler(): """ here we send out the DELETEs but we don't delete the instance until a GET is called where we check the statuses + We also set the status as deleted which would be reflected in a GET to ../status (before the DELETE completes) """ - data.instance_is_valid(policy_type_id, policy_instance_id) + data.delete_policy_instance(policy_type_id, policy_instance_id) # send rmr (best effort) body = _gen_body_to_handler("DELETE", policy_type_id, policy_instance_id) - a1rmr.queue_work({"payload": json.dumps(body), "msg type": policy_type_id}) + a1rmr.queue_work({"payload": json.dumps(body), "ptid": policy_type_id}) return "", 202 diff --git a/a1/data.py b/a1/data.py index 074f326..a98be5c 100644 --- a/a1/data.py +++ b/a1/data.py @@ -1,3 +1,11 @@ +""" +Represents A1s database and database access functions. +In the future, this may change to use a different backend, possibly dramatically. +Hopefully, the access functions are a good api so nothing else has to change when this happens + +For now, the database is in memory. +We use dict data structures (KV) with the expectation of having to move this into Redis +""" # ================================================================================== # Copyright (c) 2019 Nokia # Copyright (c) 2018-2019 AT&T Intellectual Property. @@ -14,20 +22,18 @@ # See the License for the specific language governing permissions and # limitations under the License. # ================================================================================== - -""" -Represents A1s database and database access functions. -In the future, this may change to use a different backend, possibly dramatically. -Hopefully, the access functions are a good api so nothing else has to change when this happens - -For now, the database is in memory. -We use dict data structures (KV) with the expectation of having to move this into Redis -""" +import os +import time +from threading import Thread import msgpack +from mdclogpy import Logger from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound, PolicyTypeAlreadyExists, CantDeleteNonEmptyType -from a1 import get_module_logger -logger = get_module_logger(__name__) +mdc_logger = Logger(name=__name__) + + +INSTANCE_DELETE_NO_RESP_TTL = int(os.environ.get("INSTANCE_DELETE_NO_RESP_TTL", 5)) +INSTANCE_DELETE_RESP_TTL = int(os.environ.get("INSTANCE_DELETE_RESP_TTL", 5)) class SDLWrapper: @@ -65,6 +71,7 @@ SDL = SDLWrapper() TYPE_PREFIX = "a1.policy_type." INSTANCE_PREFIX = "a1.policy_instance." +METADATA_PREFIX = "a1.policy_inst_metadata." HANDLER_PREFIX = "a1.policy_handler." @@ -85,6 +92,13 @@ def _generate_instance_key(policy_type_id, policy_instance_id): return "{0}{1}.{2}".format(INSTANCE_PREFIX, policy_type_id, policy_instance_id) +def _generate_instance_metadata_key(policy_type_id, policy_instance_id): + """ + generate a key for a policy instance metadata + """ + return "{0}{1}.{2}".format(METADATA_PREFIX, policy_type_id, policy_instance_id) + + def _generate_handler_prefix(policy_type_id, policy_instance_id): """ generate the prefix to a handler key @@ -99,11 +113,28 @@ def _generate_handler_key(policy_type_id, policy_instance_id, handler_id): return "{0}{1}".format(_generate_handler_prefix(policy_type_id, policy_instance_id), handler_id) +def _type_is_valid(policy_type_id): + """ + check that a type is valid + """ + if SDL.get(_generate_type_key(policy_type_id)) is None: + raise PolicyTypeNotFound() + + +def _instance_is_valid(policy_type_id, policy_instance_id): + """ + check that an instance is valid + """ + _type_is_valid(policy_type_id) + if SDL.get(_generate_instance_key(policy_type_id, policy_instance_id)) is None: + raise PolicyInstanceNotFound + + def _get_statuses(policy_type_id, policy_instance_id): """ shared helper to get statuses for an instance """ - instance_is_valid(policy_type_id, policy_instance_id) + _instance_is_valid(policy_type_id, policy_instance_id) prefixes_for_handler = "{0}{1}.{2}.".format(HANDLER_PREFIX, policy_type_id, policy_instance_id) return list(SDL.find_and_get(prefixes_for_handler).values()) @@ -112,7 +143,7 @@ def _get_instance_list(policy_type_id): """ shared helper to get instance list for a type """ - type_is_valid(policy_type_id) + _type_is_valid(policy_type_id) prefixes_for_type = "{0}{1}.".format(INSTANCE_PREFIX, policy_type_id) instancekeys = SDL.find_and_get(prefixes_for_type).keys() return [k.split(prefixes_for_type)[1] for k in instancekeys] @@ -128,6 +159,31 @@ def _clear_handlers(policy_type_id, policy_instance_id): SDL.delete(k) +def _get_metadata(policy_type_id, policy_instance_id): + """ + get instance metadata + """ + _instance_is_valid(policy_type_id, policy_instance_id) + metadata_key = _generate_instance_metadata_key(policy_type_id, policy_instance_id) + return SDL.get(metadata_key) + + +def _delete_after(policy_type_id, policy_instance_id, ttl): + """ + this is a blocking function, must call this in a thread to not block! + waits ttl seconds, then deletes the instance + """ + _instance_is_valid(policy_type_id, policy_instance_id) + + time.sleep(ttl) + + # ready to delete + _clear_handlers(policy_type_id, policy_instance_id) # delete all the handlers + SDL.delete(_generate_instance_key(policy_type_id, policy_instance_id)) # delete instance + SDL.delete(_generate_instance_metadata_key(policy_type_id, policy_instance_id)) # delete instance metadata + mdc_logger.debug("type {0} instance {1} deleted".format(policy_type_id, policy_instance_id)) + + # Types @@ -140,14 +196,6 @@ def get_type_list(): return [int(k.split(TYPE_PREFIX)[1]) for k in typekeys] -def type_is_valid(policy_type_id): - """ - check that a type is valid - """ - if SDL.get(_generate_type_key(policy_type_id)) is None: - raise PolicyTypeNotFound() - - def store_policy_type(policy_type_id, body): """ store a policy type if it doesn't already exist @@ -173,98 +221,97 @@ def get_policy_type(policy_type_id): """ retrieve a type """ - type_is_valid(policy_type_id) + _type_is_valid(policy_type_id) return SDL.get(_generate_type_key(policy_type_id)) # Instances -def instance_is_valid(policy_type_id, policy_instance_id): - """ - check that an instance is valid - """ - type_is_valid(policy_type_id) - if SDL.get(_generate_instance_key(policy_type_id, policy_instance_id)) is None: - raise PolicyInstanceNotFound - - def store_policy_instance(policy_type_id, policy_instance_id, instance): """ Store a policy instance """ - type_is_valid(policy_type_id) + _type_is_valid(policy_type_id) + creation_timestamp = time.time() + + # store the instance key = _generate_instance_key(policy_type_id, policy_instance_id) if SDL.get(key) is not None: # Reset the statuses because this is a new policy instance, even if it was overwritten _clear_handlers(policy_type_id, policy_instance_id) # delete all the handlers SDL.set(key, instance) + metadata_key = _generate_instance_metadata_key(policy_type_id, policy_instance_id) + SDL.set(metadata_key, {"created_at": creation_timestamp, "has_been_deleted": False}) + def get_policy_instance(policy_type_id, policy_instance_id): """ Retrieve a policy instance """ - instance_is_valid(policy_type_id, policy_instance_id) + _instance_is_valid(policy_type_id, policy_instance_id) return SDL.get(_generate_instance_key(policy_type_id, policy_instance_id)) -def get_policy_instance_statuses(policy_type_id, policy_instance_id): +def get_instance_list(policy_type_id): """ - Retrieve the status vector for a policy instance + retrieve all instance ids for a type """ - return _get_statuses(policy_type_id, policy_instance_id) + return _get_instance_list(policy_type_id) -def get_instance_list(policy_type_id): +def delete_policy_instance(policy_type_id, policy_instance_id): """ - retrieve all instance ids for a type + initially sets has_been_deleted + then launches a thread that waits until the relevent timer expires, and finally deletes the instance """ - return _get_instance_list(policy_type_id) + _instance_is_valid(policy_type_id, policy_instance_id) + + # set the metadata first + deleted_timestamp = time.time() + metadata_key = _generate_instance_metadata_key(policy_type_id, policy_instance_id) + existing_metadata = _get_metadata(policy_type_id, policy_instance_id) + SDL.set( + metadata_key, + {"created_at": existing_metadata["created_at"], "has_been_deleted": True, "deleted_at": deleted_timestamp}, + ) + + # wait, then delete + vector = _get_statuses(policy_type_id, policy_instance_id) + if vector == []: + # handler is empty; we wait for t1 to expire then goodnight + clos = lambda: _delete_after(policy_type_id, policy_instance_id, INSTANCE_DELETE_NO_RESP_TTL) + else: + # handler is not empty, we wait max t1,t2 to expire then goodnight + clos = lambda: _delete_after( + policy_type_id, policy_instance_id, max(INSTANCE_DELETE_RESP_TTL, INSTANCE_DELETE_NO_RESP_TTL) + ) + Thread(target=clos).start() # Statuses -def set_status(policy_type_id, policy_instance_id, handler_id, status): +def set_policy_instance_status(policy_type_id, policy_instance_id, handler_id, status): """ update the database status for a handler called from a1's rmr thread """ - type_is_valid(policy_type_id) - instance_is_valid(policy_type_id, policy_instance_id) + _type_is_valid(policy_type_id) + _instance_is_valid(policy_type_id, policy_instance_id) SDL.set(_generate_handler_key(policy_type_id, policy_instance_id, handler_id), status) -def clean_up_instance(policy_type_id, policy_instance_id): - """ - see if we can delete an instance based on it's status - """ - type_is_valid(policy_type_id) - instance_is_valid(policy_type_id, policy_instance_id) - +def get_policy_instance_status(policy_type_id, policy_instance_id): """ - TODO: not being able to delete if the list is [] is prolematic. - There are cases, such as a bad routing file, where this type will never be able to be deleted because it never went to any xapps - However, A1 cannot distinguish between the case where [] was never going to work, and the case where it hasn't worked *yet* - - However, removing this constraint also leads to problems. - Deleting the instance when the vector is empty, for example doing so “shortly after” the PUT, can lead to a worse race condition where the xapps get the policy after that, implement it, but because the DELETE triggered “too soon”, you can never get the status or do the delete on it again, so the xapps are all implementing the instance roguely. - - This requires some thought to address. - For now we stick with the "less bad problem". + Gets the status of an instance """ - - vector = _get_statuses(policy_type_id, policy_instance_id) - if vector != []: - all_deleted = True - for i in vector: - if i != "DELETED": - all_deleted = False - break # have at least one not DELETED, do nothing - - # blow away from a1 db - if all_deleted: - _clear_handlers(policy_type_id, policy_instance_id) # delete all the handlers - SDL.delete(_generate_instance_key(policy_type_id, policy_instance_id)) # delete instance - logger.debug("type %s instance %s deleted", policy_type_id, policy_instance_id) + _instance_is_valid(policy_type_id, policy_instance_id) + metadata = _get_metadata(policy_type_id, policy_instance_id) + metadata["instance_status"] = "NOT IN EFFECT" + for i in _get_statuses(policy_type_id, policy_instance_id): + if i == "OK": + metadata["instance_status"] = "IN EFFECT" + break + return metadata diff --git a/a1/openapi.yaml b/a1/openapi.yaml index fed4b77..c8c0bd9 100644 --- a/a1/openapi.yaml +++ b/a1/openapi.yaml @@ -16,7 +16,7 @@ # ================================================================================== openapi: 3.0.0 info: - version: 1.0.0 + version: 1.1.0 title: RIC A1 paths: '/a1-p/healthcheck': @@ -273,12 +273,21 @@ paths: description: > successfully retrieved the status content: - text/plain: + application/json: schema: - type: string - enum: - - IN EFFECT - - NOT IN EFFECT + type: object + properties: + instance_status: + type: string + enum: + - IN EFFECT + - NOT IN EFFECT + has_been_deleted: + type: boolean + created_at: + type: string + format: date-time + '404': description: > there is no policy instance with this policy_instance_id or there is no policy type with this policy_type_id @@ -312,76 +321,11 @@ components: description: > represents a policy type identifier. Currently this is restricted to an integer range. type: integer - minimum: 20000 - maximum: 21023 + minimum: 1 + maximum: 2147483647 policy_instance_id: description: > represents a policy instance identifier. UUIDs are advisable but can be any string type: string example: "3d2157af-6a8f-4a7c-810f-38c2f824bf12" - - downstream_message_schema: - type: object - required: - - operation - - policy_type_id - - policy_instance_id - - payload - additionalProperties: false - properties: - operation: - description: the operation being performed - type: string - enum: - - CREATE - - DELETE - - UPDATE - - READ - policy_type_id: - "$ref": "#/components/schemas/policy_type_id" - policy_instance_id: - "$ref": "#/components/schemas/policy_instance_id" - payload: - description: payload for this operation - type: object - example: - operation: CREATE - policy_type_id: 12345678 - policy_instance_id: 3d2157af-6a8f-4a7c-810f-38c2f824bf12 - payload: - enforce: true - window_length: 10 - blocking_rate: 20 - trigger_threshold: 10 - - downstream_notification_schema: - type: object - required: - - policy_type_id - - policy_instance_id - - handler_id - - status - additionalProperties: false - properties: - policy_type_id: - "$ref": "#/components/schemas/policy_type_id" - policy_instance_id: - "$ref": "#/components/schemas/policy_instance_id" - handler_id: - description: > - id of the policy handler - type: string - status: - description: > - the status of this policy instance in this handler - type: string - enum: - - OK - - ERROR - - DELETED - example: - policy_type_id: 12345678 - policy_instance_id: 3d2157af-6a8f-4a7c-810f-38c2f824bf12 - handler_id: 1234-5678 - status: OK diff --git a/a1/run.py b/a1/run.py index c15b644..feaada3 100644 --- a/a1/run.py +++ b/a1/run.py @@ -18,20 +18,21 @@ A1 entrypoint # limitations under the License. # ================================================================================== from gevent.pywsgi import WSGIServer -from a1 import get_module_logger, app +from mdclogpy import Logger +from a1 import app from a1 import a1rmr -logger = get_module_logger(__name__) +mdc_logger = Logger(name=__name__) def main(): """Entrypoint""" # start rmr thread - logger.debug("Initializing rmr thread. A1s webserver will not start until rmr initialization is complete.") + mdc_logger.debug("Initializing rmr thread. A1s webserver will not start until rmr initialization is complete.") a1rmr.start_rmr_thread() # start webserver - logger.debug("Starting gevent server") + mdc_logger.debug("Starting gevent server") http_server = WSGIServer(("", 10000), app) http_server.serve_forever() diff --git a/docs/a1_xapp_contract_openapi.yaml b/docs/a1_xapp_contract_openapi.yaml new file mode 100644 index 0000000..3a0427f --- /dev/null +++ b/docs/a1_xapp_contract_openapi.yaml @@ -0,0 +1,84 @@ +openapi: 3.0.0 +info: + version: 1.0.0 + title: Contract between A1 and RIC Xapps + +components: + schemas: + + policy_type_id: + description: > + represents a policy type identifier. Currently this is restricted to an integer range. + type: integer + minimum: 1 + maximum: 2147483647 + + policy_instance_id: + description: > + represents a policy instance identifier. UUIDs are advisable but can be any string + type: string + example: "3d2157af-6a8f-4a7c-810f-38c2f824bf12" + + downstream_message_schema: + type: object + required: + - operation + - policy_type_id + - policy_instance_id + - payload + additionalProperties: false + properties: + operation: + description: the operation being performed + type: string + enum: + - CREATE + - DELETE + - UPDATE + policy_type_id: + "$ref": "#/components/schemas/policy_type_id" + policy_instance_id: + "$ref": "#/components/schemas/policy_instance_id" + payload: + description: payload for this operation + type: object + example: + operation: CREATE + policy_type_id: 12345678 + policy_instance_id: 3d2157af-6a8f-4a7c-810f-38c2f824bf12 + payload: + enforce: true + window_length: 10 + blocking_rate: 20 + trigger_threshold: 10 + + downstream_notification_schema: + type: object + required: + - policy_type_id + - policy_instance_id + - handler_id + - status + additionalProperties: false + properties: + policy_type_id: + "$ref": "#/components/schemas/policy_type_id" + policy_instance_id: + "$ref": "#/components/schemas/policy_instance_id" + handler_id: + description: > + id of the policy handler + type: string + status: + description: > + the status of this policy instance in this handler + type: string + enum: + - OK + - ERROR + - DELETED + example: + policy_type_id: 12345678 + policy_instance_id: 3d2157af-6a8f-4a7c-810f-38c2f824bf12 + handler_id: 1234-5678 + status: OK diff --git a/docs/deleted flowchart.pdf b/docs/deleted flowchart.pdf new file mode 100644 index 0000000..0089b29 Binary files /dev/null and b/docs/deleted flowchart.pdf differ diff --git a/docs/installation-guide.rst b/docs/installation-guide.rst index 5ddad48..babd4fb 100644 --- a/docs/installation-guide.rst +++ b/docs/installation-guide.rst @@ -1,5 +1,6 @@ .. This work is licensed under a Creative Commons Attribution 4.0 International License. .. http://creativecommons.org/licenses/by/4.0 +.. Copyright (C) 2019 AT&T Intellectual Property Installation Guide ================== @@ -13,9 +14,11 @@ Optional ENV Variables You can set the following ENVs to change the A1 behavior: -1. ``RMR_RETRY_TIMES`` the number of times failed rmr operations such as -timeouts and send failures should be retried before A1 gives up and -returns a 503. The default is ``4``. +1. ``A1_RMR_RETRY_TIMES``: the number of times failed rmr operations such as timeouts and send failures should be retried before A1 gives up and returns a 503. The default is ``4``. + +2. ``INSTANCE_DELETE_NO_RESP_TTL``: Please refer to the delete flowchart in docs/; this is ``T1`` there. The default is 5 (seconds). Basically, the number of seconds that a1 waits to remove an instance from the database after a delete is called in the case that no downstream apps responded. + +3. ``INSTANCE_DELETE_RESP_TTL``: Please refer to the delete flowchart in docs/; this is ``T2`` there. The default is 5 (seconds). Basically, the number of seconds that a1 waits to remove an instance from the database after a delete is called in the case that downstream apps responded. K8S --- diff --git a/docs/overview.rst b/docs/overview.rst index c015097..183f655 100644 --- a/docs/overview.rst +++ b/docs/overview.rst @@ -39,7 +39,7 @@ Integrating Xapps with A1 A1 to Xapps ~~~~~~~~~~~ -When A1 sends a message to xapps, the schema for messages from A1 to the xapp is defined here: https://gerrit.o-ran-sc.org/r/gitweb?p=ric-plt/a1.git;a=blob;f=a1/openapi.yaml;h=fed4b77546264cc8a390504dae725ca15060d81a;hb=97f5cc3e3d42e1525af61560d01c4a824b0b2ad9#l324 +When A1 sends a message to xapps, the schema for messages from A1 to the xapp is defined by ``downstream_message_schema`` in ``docs/a1_xapp_contract_openapi.yaml`` All policy instance requests get sent from A1 using message type 20010 @@ -47,6 +47,6 @@ Xapps to A1 ~~~~~~~~~~~ There are three scenarios in which Xapps are to send a message to A1: -1. When an xapp receives a CREATE or UPDATE message for a policy instance. Xapps must respond to these requests by sending a message of type 20011 to A1. The schema for that message is here: https://gerrit.o-ran-sc.org/r/gitweb?p=ric-plt/a1.git;a=blob;f=a1/openapi.yaml;h=fed4b77546264cc8a390504dae725ca15060d81a;hb=97f5cc3e3d42e1525af61560d01c4a824b0b2ad9#l358 +1. When an xapp receives a CREATE or UPDATE message for a policy instance. Xapps must respond to these requests by sending a message of type 20011 to A1. The schema for that message is defined by ``downstream_notification_schema`` in ``docs/a1_xapp_contract_openapi.yaml`` 2. Since policy instances can "deprecate" other instances, there are times when xapps need to asyncronously tell A1 that a policy is no longer active. Same message type and schema. The only difference between case 1 and 2 is that case 1 is a "reply" and case 2 is "unsolicited". 3. Xapps can request A1 to re-send all instances of a type using a query, message 20012. When A1 receives this (TBD HERE, STILL BE WORKED OUT) diff --git a/docs/policy instance state diagram.pdf b/docs/policy instance state diagram.pdf new file mode 100644 index 0000000..d25d339 Binary files /dev/null and b/docs/policy instance state diagram.pdf differ diff --git a/docs/release-notes.rst b/docs/release-notes.rst index 49c3a18..485f5a8 100644 --- a/docs/release-notes.rst +++ b/docs/release-notes.rst @@ -1,5 +1,6 @@ .. This work is licensed under a Creative Commons Attribution 4.0 International License. .. http://creativecommons.org/licenses/by/4.0 +.. Copyright (C) 2019 AT&T Intellectual Property Release Notes =============== @@ -13,7 +14,6 @@ and this project adheres to `Semantic Versioning `__. :depth: 3 :local: - [1.x.x] - TBD ------------- @@ -21,8 +21,27 @@ and this project adheres to `Semantic Versioning `__. * Represents a resillent version of 1.0.0 that uses Redis for persistence -[1.0.4] - 10/24/2019 --------------------- + +[x.x.x] - TBD +------------- + +:: + + * Implements new logic around when instances are deleted. See flowcharts in docs/. Basically timeouts now trigger to actually delete instances from a1s database, and these timeouts are configurable. + * Eliminates the barrier to deleting an instance when no xapp evdr replied (via timeouts) + * Add two new ENV variables that control timeouts + * Make unit tests more modular so new workflows can be tested easily + * Changes the API for ../status to return a richer structure + * Clean up unused items in the integration tests helm chart + * Removed "RMR_RCV_RETRY_INTERVAL" leftovers since this isn't used anymore + * Uses the standard RIC logging library + * Switch the backend routing scheme to using subscription id with constant message types, per request. + * Given the above, policy type ids can be any valid 32bit greater than 0 + * Decouple the API between northbound and A1 from A1 with xapps. This is now two seperate OpenAPI files + * Update example for AC Xapp + +[1.0.4] +------- :: diff --git a/integration_tests/a1mediator/files/ricmanifest.json b/integration_tests/a1mediator/files/ricmanifest.json deleted file mode 100644 index 7883c8a..0000000 --- a/integration_tests/a1mediator/files/ricmanifest.json +++ /dev/null @@ -1,88 +0,0 @@ -{ - "controls":[ - { - "name":20000, - "description":"various parameters to control admission of dual connection", - "control_state_request_rmr_type":"DC_ADM_GET_POLICY", - "control_state_request_reply_rmr_type":"DC_ADM_GET_POLICY_ACK", - "message_receives_rmr_type":"DC_ADM_INT_CONTROL", - "message_receives_payload_schema":{ - "$schema":"http://json-schema.org/draft-07/schema#", - "type":"object", - "properties":{ - "enforce":{ - "type":"boolean", - "default":true - }, - "window_length":{ - "type":"integer", - "default":1, - "minimum":1, - "maximum":60, - "description":"Sliding window length (in minutes)" - }, - "blocking_rate":{ - "type":"number", - "default":10, - "minimum":1, - "maximum":100, - "description":"% Connections to block" - }, - "trigger_threshold":{ - "type":"integer", - "default":10, - "minimum":1, - "description":"Minimum number of events in window to trigger blocking" - } - }, - "required":[ - "enforce", - "blocking_rate", - "trigger_threshold", - "window_length" - ], - "additionalProperties":false - }, - "message_sends_rmr_type":"DC_ADM_INT_CONTROL_ACK", - "message_sends_payload_schema":{ - "$schema":"http://json-schema.org/draft-07/schema#", - "type":"object", - "properties":{ - "status":{ - "type":"string", - "enum":[ - "SUCCESS", - "FAIL" - ] - }, - "message":{ - "type":"string" - } - }, - "required":[ - "status" - ], - "additionalProperties":false - } - }, - { - "name":20001, - "description":"for the purposes of testing", - "message_receives_rmr_type":"TEST_REQ", - "message_sends_rmr_type":"TEST_ACK", - "message_sends_payload_schema":{ - "$schema":"http://json-schema.org/draft-07/schema#", - "type":"object", - "properties":{ - "status":{ - "type":"string", - "enum":[ - "SUCCESS", - "FAIL" - ] - } - } - } - } - ] -} diff --git a/integration_tests/a1mediator/files/rmr_string_int_mapping.txt b/integration_tests/a1mediator/files/rmr_string_int_mapping.txt deleted file mode 100644 index cd8db8d..0000000 --- a/integration_tests/a1mediator/files/rmr_string_int_mapping.txt +++ /dev/null @@ -1,6 +0,0 @@ -DC_ADM_INT_CONTROL:20000 -DC_ADM_INT_CONTROL_ACK:20001 -DC_ADM_GET_POLICY: 20002 -DC_ADM_GET_POLICY_ACK: 20003 -TEST_REQ:10000 -TEST_ACK:10001 diff --git a/integration_tests/a1mediator/templates/config.yaml b/integration_tests/a1mediator/templates/config.yaml index 1bb639c..811c118 100644 --- a/integration_tests/a1mediator/templates/config.yaml +++ b/integration_tests/a1mediator/templates/config.yaml @@ -5,8 +5,8 @@ metadata: data: local.rt: | newrt|start - rte|20000|testreceiverrmrservice:4560 - rte|20001|delayreceiverrmrservice:4563 + mse|20010|6660666|testreceiverrmrservice:4560 + mse|20010|20001|delayreceiverrmrservice:4563 # purposefully bad route to make sure rmr doesn't block on non listening receivers: - rte|20002|testreceiverrmrservice:4563 + rte|20010|testreceiverrmrservice:4563 newrt|end diff --git a/integration_tests/a1mediator/templates/deployment.yaml b/integration_tests/a1mediator/templates/deployment.yaml index e94045d..a6e0786 100644 --- a/integration_tests/a1mediator/templates/deployment.yaml +++ b/integration_tests/a1mediator/templates/deployment.yaml @@ -30,9 +30,13 @@ spec: value: {{ .Values.rmrservice.name }} - name: PYTHONUNBUFFERED value: "1" - - name: RMR_RETRY_TIMES + - name: A1_RMR_RETRY_TIMES value: "{{ .Values.rmr_timeout_config.rcv_retry_times }}" - image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}" + - name: INSTANCE_DELETE_NO_RESP_TTL + value: "5" + - name: INSTANCE_DELETE_RESP_TTL + value: "10" + image: "a1:latest" imagePullPolicy: {{ .Values.image.pullPolicy }} ports: - name: http diff --git a/integration_tests/a1mediator/templates/secrets.yaml b/integration_tests/a1mediator/templates/secrets.yaml deleted file mode 100644 index 4afe4ec..0000000 --- a/integration_tests/a1mediator/templates/secrets.yaml +++ /dev/null @@ -1,7 +0,0 @@ -apiVersion: v1 -kind: Secret -metadata: - name: lfhelper -type: kubernetes.io/dockerconfigjson -data: - .dockerconfigjson: {{ template "imagePullSecret" . }} diff --git a/integration_tests/a1mediator/values.yaml b/integration_tests/a1mediator/values.yaml index 5924f1d..e8bd0d2 100644 --- a/integration_tests/a1mediator/values.yaml +++ b/integration_tests/a1mediator/values.yaml @@ -8,13 +8,6 @@ image: # name of the secret that allows for privagte registry docker pulls. # if the value is "lfhelper", there is a helper function included in this chart, and it uses imageCredentials . # imageCredentials is referenced in secrets.yaml, and uses a helper function to formulate the docker reg username and password into a valid dockerconfig.json. -# Note, if the value of lf_docker_reg_secret is changed, these image credentials are ignored and not used. -lf_docker_reg_secret: lfhelper -imageCredentials: - registry: nexus3.o-ran-sc.org:10004/ric-plt-a1 - username: - password: - # This is the service for A1's external facing HTTP API httpservice: name: a1httpservice diff --git a/integration_tests/bombard.py b/integration_tests/bombard.py deleted file mode 100644 index f0a501b..0000000 --- a/integration_tests/bombard.py +++ /dev/null @@ -1,47 +0,0 @@ -# ================================================================================== -# Copyright (c) 2019 Nokia -# Copyright (c) 2018-2019 AT&T Intellectual Property. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# ================================================================================== -import time -import random -import string -import os -import signal -import sys -from rmr import rmr - - -DELAY_MS = int(os.environ.get("BOMBARD_DELAY_MS", 100)) - -# Init rmr -mrc = rmr.rmr_init(b"4565", rmr.RMR_MAX_RCV_BYTES, 0x00) -while rmr.rmr_ready(mrc) == 0: - time.sleep(1) - print("not yet ready") -rmr.rmr_set_stimeout(mrc, 2) -sbuf = rmr.rmr_alloc_msg(mrc, 256) - - -while True: - # generate a random value between 1 and 256 bytes, then gen some random bytes with several nulls thrown in - val = "BOMBS AWAY".encode("utf8") - rmr.set_payload_and_length(val, sbuf) - rmr.generate_and_set_transaction_id(sbuf) - sbuf.contents.state = 0 - sbuf.contents.mtype = random.choice([20001, 10001]) - print("Pre send summary: {}".format(rmr.message_summary(sbuf))) - sbuf = rmr.rmr_send_msg(mrc, sbuf) - print("Post send summary: {}".format(rmr.message_summary(sbuf))) - time.sleep(0.001 * DELAY_MS) diff --git a/integration_tests/getlogs.sh b/integration_tests/getlogs.sh index b952047..668ab2b 100755 --- a/integration_tests/getlogs.sh +++ b/integration_tests/getlogs.sh @@ -1,5 +1,4 @@ #!/bin/sh -kubectl get pods --namespace=default | awk '{ print $1 }' | egrep '^a1-a1mediator-' | xargs kubectl logs -kubectl get pods --namespace=default | awk '{ print $1 }' | egrep '^testreceiver-' | xargs -I X kubectl logs X testreceiver -kubectl get pods --namespace=default | awk '{ print $1 }' | egrep '^testreceiver-' | xargs -I X kubectl logs X delayreceiver - +kubectl get pods --namespace=default | awk '{ print $1 }' | egrep '^a1-a1mediator-' | xargs kubectl logs > log.txt 2>&1 +kubectl get pods --namespace=default | awk '{ print $1 }' | egrep '^testreceiver-' | xargs -I X kubectl logs X testreceiver >> log.txt 2>&1 +kubectl get pods --namespace=default | awk '{ print $1 }' | egrep '^testreceiver-' | xargs -I X kubectl logs X delayreceiver >> log.txt 2>&1 diff --git a/install_deps.sh b/integration_tests/install_deps.sh similarity index 100% rename from install_deps.sh rename to integration_tests/install_deps.sh diff --git a/integration_tests/putdata b/integration_tests/putdata deleted file mode 100644 index a2f8d11..0000000 --- a/integration_tests/putdata +++ /dev/null @@ -1,6 +0,0 @@ -{ - "enforce":true, - "window_length":10, - "blocking_rate":20, - "trigger_threshold":10 -} diff --git a/integration_tests/receiver.py b/integration_tests/receiver.py index fedc690..5c3ceaf 100644 --- a/integration_tests/receiver.py +++ b/integration_tests/receiver.py @@ -62,7 +62,7 @@ while True: val = json.dumps(payload).encode("utf-8") rmr.set_payload_and_length(val, sbuf) - sbuf.contents.mtype = 21024 + sbuf.contents.mtype = 20011 print("Pre reply summary: {}".format(rmr.message_summary(sbuf))) time.sleep(DELAY) diff --git a/integration_tests/test_a1.tavern.yaml b/integration_tests/test_a1.tavern.yaml index b4ccd62..4c1dea6 100644 --- a/integration_tests/test_a1.tavern.yaml +++ b/integration_tests/test_a1.tavern.yaml @@ -17,7 +17,7 @@ test_name: test admission control stages: - name: type not there yet request: - url: http://localhost:10000/a1-p/policytypes/20000 + url: http://localhost:10000/a1-p/policytypes/6660666 method: GET response: status_code: 404 @@ -32,55 +32,56 @@ stages: - name: instance list 404 request: - url: http://localhost:10000/a1-p/policytypes/20000/policies + url: http://localhost:10000/a1-p/policytypes/6660666/policies method: GET response: status_code: 404 - name: put the type request: - url: http://localhost:10000/a1-p/policytypes/20000 + url: http://localhost:10000/a1-p/policytypes/6660666 method: PUT json: name: Admission Control description: various parameters to control admission of dual connection - policy_type_id: 20000 + policy_type_id: 6660666 create_schema: "$schema": http://json-schema.org/draft-07/schema# type: object + additionalProperties: false properties: + class: + type: integer + minimum: 1 + maximum: 256 + description: integer id representing class to which we are applying policy enforce: type: boolean - default: true + description: Whether to enable or disable enforcement of policy on this class window_length: type: integer - default: 1 + minimum: 15 + maximum: 300 + description: Sliding window length in seconds + trigger_threshold: + type: integer minimum: 1 - maximum: 60 - description: Sliding window length (in minutes) blocking_rate: type: number - default: 10 - minimum: 1 + minimum: 0 maximum: 100 - description: "% Connections to block" - trigger_threshold: - type: integer - default: 10 - minimum: 1 - description: Minimum number of events in window to trigger blocking required: + - class - enforce - - blocking_rate - - trigger_threshold - window_length - additionalProperties: false + - trigger_threshold + - blocking_rate response: status_code: 201 - name: type there now request: - url: http://localhost:10000/a1-p/policytypes/20000 + url: http://localhost:10000/a1-p/policytypes/6660666 method: GET response: status_code: 200 @@ -91,11 +92,11 @@ stages: method: GET response: status_code: 200 - body: [20000] + body: [6660666] - name: instance list 200 but empty request: - url: http://localhost:10000/a1-p/policytypes/20000/policies + url: http://localhost:10000/a1-p/policytypes/6660666/policies method: GET response: status_code: 200 @@ -103,21 +104,21 @@ stages: - name: test the admission control policy get not there yet request: - url: http://localhost:10000/a1-p/policytypes/20000/policies/admission_control_policy + url: http://localhost:10000/a1-p/policytypes/6660666/policies/admission_control_policy method: GET response: status_code: 404 - name: test the admission control policy status get not there yet request: - url: http://localhost:10000/a1-p/policytypes/20000/policies/admission_control_policy/status + url: http://localhost:10000/a1-p/policytypes/6660666/policies/admission_control_policy/status method: GET response: status_code: 404 - name: bad body for admission control policy request: - url: http://localhost:10000/a1-p/policytypes/20000/policies/admission_control_policy + url: http://localhost:10000/a1-p/policytypes/6660666/policies/admission_control_policy method: PUT json: not: "expected" @@ -128,7 +129,7 @@ stages: - name: not a json request: - url: http://localhost:10000/a1-p/policytypes/20000/policies/admission_control_policy + url: http://localhost:10000/a1-p/policytypes/6660666/policies/admission_control_policy method: PUT data: "asdf" response: @@ -137,11 +138,12 @@ stages: # put it properly - name: put the admission control policy instance request: - url: http://localhost:10000/a1-p/policytypes/20000/policies/admission_control_policy + url: http://localhost:10000/a1-p/policytypes/6660666/policies/admission_control_policy method: PUT json: + class: 12 enforce: true - window_length: 10 + window_length: 20 blocking_rate: 20 trigger_threshold: 10 headers: @@ -152,35 +154,39 @@ stages: - name: cant delete type with instances delay_before: 3 # wait for the type acks to come back first request: - url: http://localhost:10000/a1-p/policytypes/20000 + url: http://localhost:10000/a1-p/policytypes/6660666 method: DELETE response: status_code: 400 - name: test the admission control policy get request: - url: http://localhost:10000/a1-p/policytypes/20000/policies/admission_control_policy + url: http://localhost:10000/a1-p/policytypes/6660666/policies/admission_control_policy method: GET response: status_code: 200 body: + class: 12 enforce: true - window_length: 10 + window_length: 20 blocking_rate: 20 trigger_threshold: 10 + - name: test the admission control policy status get delay_before: 3 # give it a few seconds for rmr request: - url: http://localhost:10000/a1-p/policytypes/20000/policies/admission_control_policy/status + url: http://localhost:10000/a1-p/policytypes/6660666/policies/admission_control_policy/status method: GET response: status_code: 200 - # tavern doesn't yet let you check string statuses!!! + body: + instance_status: "IN EFFECT" + has_been_deleted: False - name: instance list 200 and contains the instance request: - url: http://localhost:10000/a1-p/policytypes/20000/policies + url: http://localhost:10000/a1-p/policytypes/6660666/policies method: GET response: status_code: 200 @@ -189,16 +195,28 @@ stages: # DELETE the instance and make sure subsequent GETs return properly - name: delete the instance - delay_after: 3 # give it a few seconds for rmr + delay_after: 4 request: - url: http://localhost:10000/a1-p/policytypes/20000/policies/admission_control_policy + url: http://localhost:10000/a1-p/policytypes/6660666/policies/admission_control_policy method: DELETE response: status_code: 202 + - name: status should now be not in effect but still there + delay_before: 3 # give it a few seconds for rmr + delay_after: 8 # 3 + 11 > 10; that is, wait until t2 expires + request: + url: http://localhost:10000/a1-p/policytypes/6660666/policies/admission_control_policy/status + method: GET + response: + status_code: 200 + body: + instance_status: "NOT IN EFFECT" + has_been_deleted: True + - name: instance list 200 but no instance request: - url: http://localhost:10000/a1-p/policytypes/20000/policies + url: http://localhost:10000/a1-p/policytypes/6660666/policies method: GET response: status_code: 200 @@ -206,35 +224,35 @@ stages: - name: cant get instance status request: - url: http://localhost:10000/a1-p/policytypes/20000/policies/admission_control_policy/status + url: http://localhost:10000/a1-p/policytypes/6660666/policies/admission_control_policy/status method: GET response: status_code: 404 - name: cant get instance request: - url: http://localhost:10000/a1-p/policytypes/20000/policies/admission_control_policy + url: http://localhost:10000/a1-p/policytypes/6660666/policies/admission_control_policy method: GET response: status_code: 404 - name: delete ac type request: - url: http://localhost:10000/a1-p/policytypes/20000 + url: http://localhost:10000/a1-p/policytypes/6660666 method: DELETE response: status_code: 204 - name: cant delete again request: - url: http://localhost:10000/a1-p/policytypes/20000 + url: http://localhost:10000/a1-p/policytypes/6660666 method: DELETE response: status_code: 404 - name: cant get request: - url: http://localhost:10000/a1-p/policytypes/20000 + url: http://localhost:10000/a1-p/policytypes/6660666 method: DELETE response: status_code: 404 @@ -367,6 +385,16 @@ stages: response: status_code: 202 + - name: test the delay status get, not in effect yet + request: + url: http://localhost:10000/a1-p/policytypes/20001/policies/delaytest/status + method: GET + response: + status_code: 200 + body: + instance_status: "NOT IN EFFECT" + has_been_deleted: False + - name: test the delay policy get request: url: http://localhost:10000/a1-p/policytypes/20001/policies/delaytest @@ -376,6 +404,15 @@ stages: body: test: foo + - name: instance list 200 and there + request: + url: http://localhost:10000/a1-p/policytypes/20001/policies + method: GET + response: + status_code: 200 + body: + - delaytest + - name: test the delay status get max_retries: 3 delay_before: 6 # give it a few seconds for rmr ; delay reciever sleeps for 5 seconds by default @@ -384,16 +421,46 @@ stages: method: GET response: status_code: 200 - # tavern doesn't let you check non json yet! + body: + instance_status: "IN EFFECT" + has_been_deleted: False - - name: instance list 200 and there + # DELETE the instance and make sure subsequent GETs return properly + - name: delete the instance request: - url: http://localhost:10000/a1-p/policytypes/20001/policies + url: http://localhost:10000/a1-p/policytypes/20001/policies/delaytest + method: DELETE + response: + status_code: 202 + + - name: test the delay status get immediately + request: + url: http://localhost:10000/a1-p/policytypes/20001/policies/delaytest/status method: GET response: status_code: 200 body: - - delaytest + instance_status: "IN EFFECT" + has_been_deleted: True + + - name: test the delay status get after delay but before timers + delay_before: 7 + request: + url: http://localhost:10000/a1-p/policytypes/20001/policies/delaytest/status + method: GET + response: + status_code: 200 + body: + instance_status: "NOT IN EFFECT" + has_been_deleted: True + + - name: test the delay status get after delay and after the timers + delay_before: 7 + request: + url: http://localhost:10000/a1-p/policytypes/20001/policies/delaytest/status + method: GET + response: + status_code: 404 --- @@ -462,12 +529,12 @@ stages: - name: bad int range 1 request: - url: http://localhost:10000/a1-p/policytypes/19999 + url: http://localhost:10000/a1-p/policytypes/0 method: PUT json: name: test policy description: just for testing - policy_type_id: 19999 + policy_type_id: 0 create_schema: "$schema": http://json-schema.org/draft-07/schema# type: object @@ -476,12 +543,12 @@ stages: - name: bad int range 2 request: - url: http://localhost:10000/a1-p/policytypes/21024 + url: http://localhost:10000/a1-p/policytypes/2147483648 method: PUT json: name: test policy description: just for testing - policy_type_id: 21024 + policy_type_id: 2147483648 create_schema: "$schema": http://json-schema.org/draft-07/schema# type: object diff --git a/integration_tests/test_local.rt b/integration_tests/test_local.rt index 6055639..6e61fef 100644 --- a/integration_tests/test_local.rt +++ b/integration_tests/test_local.rt @@ -1,5 +1,5 @@ newrt|start -rte|20000|devarchwork:4560 -rte|20001|devarchwork:4563 -rte|21024|devarchwork:4562 +mse|20010|6660666|devarchwork:4560 +mse|20010|20001|devarchwork:4563 +rte|20011|devarchwork:4562 newrt|end diff --git a/integration_tests/testreceiver/templates/config.yaml b/integration_tests/testreceiver/templates/config.yaml index 78a9f60..caf4338 100644 --- a/integration_tests/testreceiver/templates/config.yaml +++ b/integration_tests/testreceiver/templates/config.yaml @@ -5,7 +5,8 @@ metadata: data: local.rt: | newrt|start - rte|21024|a1rmrservice:4562 + # we actaully use rts so i dont even think this is used + rte|20011|a1rmrservice:4562 newrt|end --- @@ -17,5 +18,5 @@ metadata: data: local.rt: | newrt|start - rte|21024|a1rmrservice:4562 + rte|20011|a1rmrservice:4562 newrt|end diff --git a/setup.py b/setup.py index 2263cc4..d20a50f 100644 --- a/setup.py +++ b/setup.py @@ -25,6 +25,6 @@ setup( url="https://gerrit.o-ran-sc.org/r/admin/repos/ric-plt/a1", entry_points={"console_scripts": ["run.py=a1.run:main"]}, # we require jsonschema, should be in that list, but connexion already requires a specific version of it - install_requires=["requests", "Flask", "connexion[swagger-ui]", "gevent", "msgpack", "rmr>=1.0.0"], + install_requires=["requests", "Flask", "connexion[swagger-ui]", "gevent", "msgpack", "rmr>=1.0.0", "mdclogpy"], package_data={"a1": ["openapi.yaml"]}, ) diff --git a/tests/conftest.py b/tests/conftest.py index 73573d0..c5b3a57 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -45,37 +45,34 @@ def adm_type_good(): represents a good put for adm control type """ return { - "name": "Admission Control", - "description": "various parameters to control admission of dual connection", - "policy_type_id": 20000, + "name": "Policy for Rate Control", + "policy_type_id": 6660666, + "description": "This policy is associated with rate control. An instance of the policy specifies the traffic class to which it applies and parameters to use to control how much it must be throttled in case of an overload. Each instance of the policy that is created MUST be associated with a unique class ID (identifyed by the key 'class', which is used by the xAPP to differentiate traffic. If an agent tries to create a policy with the SAME class id, it will be rejected by the xAPP, even if it has a unique policy instance id. ", "create_schema": { "$schema": "http://json-schema.org/draft-07/schema#", "type": "object", + "additionalProperties": False, "properties": { - "enforce": {"type": "boolean", "default": True}, - "window_length": { + "class": { "type": "integer", - "default": 1, "minimum": 1, - "maximum": 60, - "description": "Sliding window length (in minutes)", + "maximum": 256, + "description": "integer id representing class to which we are applying policy", }, - "blocking_rate": { - "type": "number", - "default": 10, - "minimum": 1, - "maximum": 100, - "description": "% Connections to block", + "enforce": { + "type": "boolean", + "description": "Whether to enable or disable enforcement of policy on this class", }, - "trigger_threshold": { + "window_length": { "type": "integer", - "default": 10, - "minimum": 1, - "description": "Minimum number of events in window to trigger blocking", + "minimum": 15, + "maximum": 300, + "description": "Sliding window length in seconds", }, + "trigger_threshold": {"type": "integer", "minimum": 1}, + "blocking_rate": {"type": "number", "minimum": 0, "maximum": 100}, }, - "required": ["enforce", "blocking_rate", "trigger_threshold", "window_length"], - "additionalProperties": False, + "required": ["class", "enforce", "blocking_rate", "trigger_threshold", "window_length"], }, } @@ -85,4 +82,4 @@ def adm_instance_good(): """ represents a good put for adm control instance """ - return {"enforce": True, "window_length": 10, "blocking_rate": 20, "trigger_threshold": 10} + return {"class": 12, "enforce": True, "window_length": 20, "blocking_rate": 20, "trigger_threshold": 10} diff --git a/tests/test_controller.py b/tests/test_controller.py index 756a0d0..971ce3f 100644 --- a/tests/test_controller.py +++ b/tests/test_controller.py @@ -1,3 +1,6 @@ +""" +tests for controller +""" # ================================================================================== # Copyright (c) 2019 Nokia # Copyright (c) 2018-2019 AT&T Intellectual Property. @@ -16,21 +19,24 @@ # ================================================================================== import time +import json from rmr.rmr_mocks import rmr_mocks from a1 import a1rmr - -ADM_CTRL = "admission_control_policy" -ADM_CTRL_POLICIES = "/a1-p/policytypes/20000/policies" -ADM_CTRL_INSTANCE = ADM_CTRL_POLICIES + "/" + ADM_CTRL +RCV_ID = "test_receiver" +ADM_CRTL_TID = 6660666 +ADM_CTRL_IID = "admission_control_policy" +ADM_CTRL_POLICIES = "/a1-p/policytypes/{0}/policies".format(ADM_CRTL_TID) +ADM_CTRL_INSTANCE = ADM_CTRL_POLICIES + "/" + ADM_CTRL_IID ADM_CTRL_INSTANCE_STATUS = ADM_CTRL_INSTANCE + "/status" -ADM_CTRL_TYPE = "/a1-p/policytypes/20000" -TEST_TYPE = "/a1-p/policytypes/20001" +ADM_CTRL_TYPE = "/a1-p/policytypes/{0}".format(ADM_CRTL_TID) def _fake_dequeue(): """for monkeypatching with a good status""" - pay = b'{"policy_type_id": 20000, "policy_instance_id": "admission_control_policy", "handler_id": "test_receiver", "status": "OK"}' + pay = json.dumps( + {"policy_type_id": ADM_CRTL_TID, "policy_instance_id": ADM_CTRL_IID, "handler_id": RCV_ID, "status": "OK"} + ).encode() fake_msg = {"payload": pay} return [fake_msg] @@ -44,20 +50,28 @@ def _fake_dequeue_deleted(): """for monkeypatching with a DELETED status""" new_msgs = [] - # insert some that don't exist to make sure nothing blows up - pay = b'{"policy_type_id": 20666, "policy_instance_id": "admission_control_policy", "handler_id": "test_receiver", "status": "DELETED"}' + # non existent type + pay = json.dumps( + {"policy_type_id": 911, "policy_instance_id": ADM_CTRL_IID, "handler_id": RCV_ID, "status": "DELETED"} + ).encode() fake_msg = {"payload": pay} new_msgs.append(fake_msg) - pay = b'{"policy_type_id": 20000, "policy_instance_id": "darkness", "handler_id": "test_receiver", "status": "DELETED"}' + pay = json.dumps( + {"policy_type_id": ADM_CRTL_TID, "policy_instance_id": "darkness", "handler_id": RCV_ID, "status": "DELETED"} + ).encode() fake_msg = {"payload": pay} new_msgs.append(fake_msg) # insert a bad one with a malformed body to make sure we keep going - fake_msg = {"payload": "asdf"} - new_msgs.append(fake_msg) + new_msgs.append({"payload": "asdf"}) - pay = b'{"policy_type_id": 20000, "policy_instance_id": "admission_control_policy", "handler_id": "test_receiver", "status": "DELETED"}' + # not even a json + new_msgs.append("asdf") + + pay = json.dumps( + {"policy_type_id": ADM_CRTL_TID, "policy_instance_id": ADM_CTRL_IID, "handler_id": RCV_ID, "status": "DELETED"} + ).encode() fake_msg = {"payload": pay} new_msgs.append(fake_msg) @@ -91,24 +105,7 @@ def _test_put_patch(monkeypatch): monkeypatch.setattr("rmr.rmr.generate_and_set_transaction_id", fake_set_transactionid) -# Module level Hack - - -def setup_module(): - """module level setup""" - - def noop(): - pass - - # launch the thread with a fake init func and a patched rcv func; we will "repatch" later - a1rmr.start_rmr_thread(init_func_override=noop, rcv_func_override=_fake_dequeue_none) - - -# Actual Tests - - -def test_workflow_nothing_there_yet(client): - """ test policy put good""" +def _no_ac(client): # no type there yet res = client.get(ADM_CTRL_TYPE) assert res.status_code == 404 @@ -123,31 +120,49 @@ def test_workflow_nothing_there_yet(client): assert res.status_code == 404 -def test_workflow(client, monkeypatch, adm_type_good, adm_instance_good): - """ - test a full A1 workflow - """ +def _put_ac_type(client, typedef): + _no_ac(client) + # put the type - res = client.put(ADM_CTRL_TYPE, json=adm_type_good) + res = client.put(ADM_CTRL_TYPE, json=typedef) assert res.status_code == 201 # cant replace types - res = client.put(ADM_CTRL_TYPE, json=adm_type_good) + res = client.put(ADM_CTRL_TYPE, json=typedef) assert res.status_code == 400 # type there now res = client.get(ADM_CTRL_TYPE) assert res.status_code == 200 - assert res.json == adm_type_good + assert res.json == typedef + + # type in type list res = client.get("/a1-p/policytypes") assert res.status_code == 200 - assert res.json == [20000] + assert res.json == [ADM_CRTL_TID] # instance 200 but empty list res = client.get(ADM_CTRL_POLICIES) assert res.status_code == 200 assert res.json == [] + +def _delete_ac_type(client): + res = client.delete(ADM_CTRL_TYPE) + assert res.status_code == 204 + + # cant get + res = client.get(ADM_CTRL_TYPE) + assert res.status_code == 404 + + # cant invoke delete on it again + res = client.delete(ADM_CTRL_TYPE) + assert res.status_code == 404 + + _no_ac(client) + + +def _put_ac_instance(client, monkeypatch, instancedef): # no instance there yet res = client.get(ADM_CTRL_INSTANCE) assert res.status_code == 404 @@ -156,38 +171,20 @@ def test_workflow(client, monkeypatch, adm_type_good, adm_instance_good): # create a good instance _test_put_patch(monkeypatch) - res = client.put(ADM_CTRL_INSTANCE, json=adm_instance_good) + res = client.put(ADM_CTRL_INSTANCE, json=instancedef) assert res.status_code == 202 # replace is allowed on instances - res = client.put(ADM_CTRL_INSTANCE, json=adm_instance_good) + res = client.put(ADM_CTRL_INSTANCE, json=instancedef) assert res.status_code == 202 # instance 200 and in list res = client.get(ADM_CTRL_POLICIES) assert res.status_code == 200 - assert res.json == [ADM_CTRL] - - def get_instance_good(expected): - # get the instance - res = client.get(ADM_CTRL_INSTANCE) - assert res.status_code == 200 - assert res.json == adm_instance_good - - # get the instance status - res = client.get(ADM_CTRL_INSTANCE_STATUS) - assert res.status_code == 200 - assert res.get_data(as_text=True) == expected + assert res.json == [ADM_CTRL_IID] - # try a status get but we didn't get any ACKs yet to test NOT IN EFFECT - time.sleep(1) # wait for the rmr thread - get_instance_good("NOT IN EFFECT") - - # now pretend we did get a good ACK - a1rmr.replace_rcv_func(_fake_dequeue) - time.sleep(1) # wait for the rmr thread - get_instance_good("IN EFFECT") +def _delete_instance(client): # cant delete type until there are no instances res = client.delete(ADM_CTRL_TYPE) assert res.status_code == 400 @@ -195,34 +192,140 @@ def test_workflow(client, monkeypatch, adm_type_good, adm_instance_good): # delete it res = client.delete(ADM_CTRL_INSTANCE) assert res.status_code == 202 - res = client.delete(ADM_CTRL_INSTANCE) # should be able to do multiple deletes + + # should be able to do multiple deletes until it's actually gone + res = client.delete(ADM_CTRL_INSTANCE) assert res.status_code == 202 - # status after a delete, but there are no messages yet, should still return - time.sleep(1) # wait for the rmr thread - get_instance_good("IN EFFECT") - # now pretend we deleted successfully - a1rmr.replace_rcv_func(_fake_dequeue_deleted) - time.sleep(1) # wait for the rmr thread +def _instance_is_gone(client, seconds_to_try=10): + for _ in range(seconds_to_try): + # idea here is that we have to wait for the seperate thread to process the event + try: + res = client.get(ADM_CTRL_INSTANCE_STATUS) + assert res.status_code == 404 + except AssertionError: + time.sleep(1) + + res = client.get(ADM_CTRL_INSTANCE_STATUS) + assert res.status_code == 404 + # list still 200 but no instance res = client.get(ADM_CTRL_POLICIES) assert res.status_code == 200 assert res.json == [] - res = client.get(ADM_CTRL_INSTANCE_STATUS) # cant get status - assert res.status_code == 404 - res = client.get(ADM_CTRL_INSTANCE) # cant get instance + + # cant get instance + res = client.get(ADM_CTRL_INSTANCE) assert res.status_code == 404 + +def _verify_instance_and_status(client, expected_instance, expected_status, expected_deleted, seconds_to_try=5): + # get the instance + res = client.get(ADM_CTRL_INSTANCE) + assert res.status_code == 200 + assert res.json == expected_instance + + for _ in range(seconds_to_try): + # idea here is that we have to wait for the seperate thread to process the event + res = client.get(ADM_CTRL_INSTANCE_STATUS) + assert res.status_code == 200 + assert res.json["has_been_deleted"] == expected_deleted + try: + assert res.json["instance_status"] == expected_status + return + except AssertionError: + time.sleep(1) + assert res.json["instance_status"] == expected_status + + +# Module level Hack + + +def setup_module(): + """module level setup""" + + def noop(): + pass + + # launch the thread with a fake init func and a patched rcv func; we will "repatch" later + a1rmr.start_rmr_thread(init_func_override=noop, rcv_func_override=_fake_dequeue_none) + + +# Actual Tests + + +def test_workflow(client, monkeypatch, adm_type_good, adm_instance_good): + """ + test a full A1 workflow + """ + _put_ac_type(client, adm_type_good) + _put_ac_instance(client, monkeypatch, adm_instance_good) + + """ + we test the state transition diagram of all 5 states here; + 1. not in effect, not deleted + 2. in effect, not deleted + 3. in effect, deleted + 4. not in effect, deleted + 5. gone (timeout expires) + """ + + # try a status get but we didn't get any ACKs yet to test NOT IN EFFECT + _verify_instance_and_status(client, adm_instance_good, "NOT IN EFFECT", False) + + # now pretend we did get a good ACK + a1rmr.replace_rcv_func(_fake_dequeue) + _verify_instance_and_status(client, adm_instance_good, "IN EFFECT", False) + + # delete the instance + _delete_instance(client) + + # status after a delete, but there are no messages yet, should still return + _verify_instance_and_status(client, adm_instance_good, "IN EFFECT", True) + + # now pretend we deleted successfully + a1rmr.replace_rcv_func(_fake_dequeue_deleted) + + # status should be reflected first (before delete triggers) + _verify_instance_and_status(client, adm_instance_good, "NOT IN EFFECT", True) + + # instance should be totally gone after a few seconds + _instance_is_gone(client) + # delete the type - res = client.delete(ADM_CTRL_TYPE) - assert res.status_code == 204 + _delete_ac_type(client) - # cant touch this - res = client.get(ADM_CTRL_TYPE) - assert res.status_code == 404 - res = client.delete(ADM_CTRL_TYPE) - assert res.status_code == 404 + +def test_cleanup_via_t1(client, monkeypatch, adm_type_good, adm_instance_good): + """ + create a type, create an instance, but no acks ever come in, delete instance + """ + _put_ac_type(client, adm_type_good) + + a1rmr.replace_rcv_func(_fake_dequeue_none) + + _put_ac_instance(client, monkeypatch, adm_instance_good) + + """ + here we test the state transition diagram when it never goes into effect: + 1. not in effect, not deleted + 2. not in effect, deleted + 3. gone (timeout expires) + """ + + _verify_instance_and_status(client, adm_instance_good, "NOT IN EFFECT", False) + + # delete the instance + _delete_instance(client) + + _verify_instance_and_status(client, adm_instance_good, "NOT IN EFFECT", True) + + # instance should be totally gone after a few seconds + _instance_is_gone(client) + + # delete the type + _delete_ac_type(client) def test_bad_instances(client, monkeypatch, adm_type_good): @@ -248,7 +351,6 @@ def test_bad_instances(client, monkeypatch, adm_type_good): # get a non existent instance a1rmr.replace_rcv_func(_fake_dequeue) - time.sleep(1) res = client.get(ADM_CTRL_INSTANCE + "DARKNESS") assert res.status_code == 404 @@ -261,9 +363,9 @@ def test_illegal_types(client, adm_type_good): """ Test illegal types """ - res = client.put("/a1-p/policytypes/19999", json=adm_type_good) + res = client.put("/a1-p/policytypes/0", json=adm_type_good) assert res.status_code == 400 - res = client.put("/a1-p/policytypes/21024", json=adm_type_good) + res = client.put("/a1-p/policytypes/2147483648", json=adm_type_good) assert res.status_code == 400 diff --git a/tox-integration.ini b/tox-integration.ini index e39a8cb..81f3bb7 100644 --- a/tox-integration.ini +++ b/tox-integration.ini @@ -48,10 +48,10 @@ commands= pytest --tavern-beta-new-traceback echo "running ab" # run apache bench - ab -n 100 -c 10 -u putdata -T application/json http://localhost:10000/a1-p/policytypes/20000/policies/admission_control_policy + ab -n 100 -c 10 -v 4 http://localhost:10000/a1-p/healthcheck commands_post= -# echo "log collection" -# integration_tests/getlogs.sh + echo "log collection" + integration_tests/getlogs.sh echo "teardown" helm delete testreceiver helm del --purge testreceiver diff --git a/tox.ini b/tox.ini index f6d1800..9606882 100644 --- a/tox.ini +++ b/tox.ini @@ -25,8 +25,9 @@ deps= pytest-cov setenv = LD_LIBRARY_PATH = /usr/local/lib/:/usr/local/lib64 - RMR_RCV_RETRY_INTERVAL = 1 - RMR_RETRY_TIMES = 2 + A1_RMR_RETRY_TIMES = 2 + INSTANCE_DELETE_NO_RESP_TTL = 3 + INSTANCE_DELETE_RESP_TTL = 3 # Note, before this will work, for the first time on that machine, run ./install_deps.sh commands = @@ -41,7 +42,7 @@ deps = flake8 commands = flake8 setup.py a1 tests [flake8] -extend-ignore = E501,E741 +extend-ignore = E501,E741,E731 # verbatim as asked for by the docs instructions: https://wiki.o-ran-sc.org/display/DOC/Configure+Repo+for+Documentation [testenv:docs]