# See the License for the specific language governing permissions and
# limitations under the License.
# ==================================================================================
+# install a well known working rmr
+FROM python:3.7-alpine
+RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git
+RUN git clone --branch 1.3.0 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
+ && cd rmr \
+ && mkdir build \
+ && cd build \
+ && cmake .. -DPACK_EXTERNALS=1 \
+ && make install
+
+# a1 stage 2
FROM python:3.7-alpine
-# copy NNG and rmr out of the CI builder nng
-COPY --from=nexus3.o-ran-sc.org:10004/bldr-alpine3:3-a3.9 /usr/local/lib64/libnng.so /usr/local/lib64/libnng.so
-COPY --from=nexus3.o-ran-sc.org:10004/bldr-alpine3:3-a3.9 /usr/local/lib64/librmr_nng.so /usr/local/lib64/librmr_nng.so
-
+# copies
+COPY --from=0 /usr/local/lib64/libnng.so /usr/local/lib64/libnng.so
+COPY --from=0 /usr/local/lib64/librmr_nng.so /usr/local/lib64/librmr_nng.so
COPY a1/ /tmp/a1
COPY tests/ /tmp/tests
COPY setup.py tox.ini /tmp/
# dir that rmr routing file temp goes into
RUN mkdir -p /opt/route/
-# Gevent needs gcc; TODO: this will get fixed
-RUN apk add gcc musl-dev
+# Gevent needs gcc
+RUN apk update && apk add bash gcc musl-dev
# do the actual install; this writes into /usr/local, need root
-RUN pip install .
+RUN pip install --upgrade pip && pip install .
# Switch to a non-root user for security reasons.
# a1 does not currently write into any dirs so no chowns are needed at this time.
ENV LD_LIBRARY_PATH /usr/local/lib/:/usr/local/lib64
ENV RMR_SEED_RT /opt/route/local.rt
+# dont buffer logging
+ENV PYTHONUNBUFFERED 1
+
CMD run.py
# See the License for the specific language governing permissions and
# limitations under the License.
# ==================================================================================
+# install a well known working rmr
FROM python:3.7-alpine
+RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git
+RUN git clone --branch 1.3.0 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
+ && cd rmr \
+ && mkdir build \
+ && cd build \
+ && cmake .. -DPACK_EXTERNALS=1 \
+ && make install
-# copy NNG and rmr out of the CI builder nng
-COPY --from=nexus3.o-ran-sc.org:10004/bldr-alpine3:3-a3.9 /usr/local/lib64/libnng.so /usr/local/lib64/libnng.so
-COPY --from=nexus3.o-ran-sc.org:10004/bldr-alpine3:3-a3.9 /usr/local/lib64/librmr_nng.so /usr/local/lib64/librmr_nng.so
+FROM python:3.7-alpine
+# copies
+COPY --from=0 /usr/local/lib64/libnng.so /usr/local/lib64/libnng.so
+COPY --from=0 /usr/local/lib64/librmr_nng.so /usr/local/lib64/librmr_nng.so
COPY a1/ /tmp/a1
COPY tests/ /tmp/tests
COPY setup.py tox.ini /tmp/
import gevent
from rmr import rmr
from a1 import get_module_logger
-from a1.exceptions import MessageSendFailure, ExpectedAckNotReceived
logger = get_module_logger(__name__)
-RMR_RCV_RETRY_INTERVAL = int(os.environ.get("RMR_RCV_RETRY_INTERVAL", 1000))
RETRY_TIMES = int(os.environ.get("RMR_RETRY_TIMES", 4))
MRC = None
-RECEIVED_MESSAGES = [] # used to store messages we need but havent been procedded yet
-WAITING_TRANSIDS = {} # used to store transactionids we are waiting for, so we can filter other stuff out
-
-
-def _dequeue_all_waiting_messages():
- """
- dequeue all waiting rmr messages from rmr, put them into RECEIVED_MESSAGES
- """
- new_messages = []
- sbuf = rmr.rmr_alloc_msg(MRC, 4096)
- while True:
- sbuf = rmr.rmr_torcv_msg(MRC, sbuf, 0) # set the timeout to 0 so this doesn't block!!
- summary = rmr.message_summary(sbuf)
- if summary["message state"] == 12 and summary["message status"] == "RMR_ERR_TIMEOUT":
- break
- elif summary["transaction id"] in WAITING_TRANSIDS: # message is relevent
- new_messages.append(summary)
- else:
- logger.debug("A message was received by a1, but a1 was not expecting it! It's being dropped: %s", summary)
- # do nothing with message, effectively dropped
- return new_messages
-
-
-def _check_if_ack_received(target_transid, target_type):
- """
- Try to recieve the latest messages, then search the current queue for the target ACK
- TODO: probably a slightly more efficient data structure than list. Maybe a dict by message type
- However, in the near term, where there are not many xapps under A1, this is fine. Revisit later.
- TODO: do we need to deal with duplicate ACKs for the same transaction id?
- Is it possible if the downstream xapp uses rmr_rts? Might be harmless to sit in queue.. might slow things
-
- """
- new_messages = _dequeue_all_waiting_messages() # dequeue all waiting messages
- global RECEIVED_MESSAGES # this is ugly, but fine.. we just need an in memory list across the async calls
- RECEIVED_MESSAGES += new_messages
- for index, summary in enumerate(RECEIVED_MESSAGES): # Search the queue for the target message
- if (
- summary["message state"] == 0
- and summary["message status"] == "RMR_OK"
- and summary["message type"] == target_type
- and summary["transaction id"] == target_transid
- ): # Found; delete it from queue
- del RECEIVED_MESSAGES[index]
- return summary
- return None
-
-
def init_rmr():
"""
called from run; not called for unit tests
def send(payload, message_type=0):
"""
- sends a message up to RETRY_TIMES
+ Sends a message up to RETRY_TIMES
If the message is sent successfully, it returns the transactionid
- Raises an exception (MessageSendFailure) otherwise
+ Does nothing otherwise
"""
# we may be called many times in asyncronous loops, so for now, it is safer not to share buffers. We can investifgate later whether this is really a problem.
sbuf = rmr.rmr_alloc_msg(MRC, 4096)
payload = payload if isinstance(payload, bytes) else payload.encode("utf-8")
# retry RETRY_TIMES to send the message
- tried = 0
- while True:
+ for _ in range(0, RETRY_TIMES):
# setup the send message
rmr.set_payload_and_length(payload, sbuf)
rmr.generate_and_set_transaction_id(sbuf)
# check success or failure
if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
- return transaction_id # we are good
- if post_send_summary["message state"] == 10 and post_send_summary["message status"] == "RMR_ERR_RETRY":
- # in this state, we should retry
- if tried == RETRY_TIMES:
- # we have tried RETRY_TIMES and we are still not getting a good state, raise an exception and let the caller deal with it
- raise MessageSendFailure(str(post_send_summary))
- else:
- tried += 1
- else:
- # we hit a state where we should not even retry
- raise MessageSendFailure(str(post_send_summary))
-
+ # we are good
+ logger.debug("Message sent successfully!")
+ return transaction_id
-def send_ack_retry(payload, expected_ack_message_type, message_type=0):
- """
- send a message and check for an ACK.
- If no ACK is recieved, defer execution for RMR_RCV_RETRY_INTERVAL ms, then check again.
- If no ack is received before the timeout (set by _rmr_init), send again and try again up to RETRY_TIMES
+ # we failed all RETRY_TIMES
+ logger.debug("Send failed all %s times, stopping", RETRY_TIMES)
- It is critical here to set the RMR_TIMEOUT to 0 in the rmr_rcv_to function, which causes that function NOT to block.
- Instead, if the message isn't there, we give up execution for the interval, which allows the gevent server to process other requests in the meantime.
- Amazing props to https://sdiehl.github.io/gevent-tutorial/
- (which also runs this whole server)
+def dequeue_all_waiting_messages(filter_type=None):
"""
-
- # try to send the msg to the downstream policy handler
- expected_transaction_id = send(payload, message_type)
- WAITING_TRANSIDS[expected_transaction_id] = 1
-
- gevent.sleep(0.01) # wait 10ms before we try the first recieve
- for _ in range(0, RETRY_TIMES):
- logger.debug("Seeing if return message is fufilled")
- summary = _check_if_ack_received(expected_transaction_id, expected_ack_message_type)
- if summary:
- logger.debug("Target ack Message received!: %s", summary)
- logger.debug("current queue size is %d", len(RECEIVED_MESSAGES))
- del WAITING_TRANSIDS[expected_transaction_id]
- return summary["payload"]
+ dequeue all waiting rmr messages from rmr
+ We only add messages of type 21024; we drop other "spam";
+ see https://rancodev.atlassian.net/wiki/spaces/RICPLT/pages/60784719/RIC+message+types
+ """
+ new_messages = []
+ sbuf = rmr.rmr_alloc_msg(MRC, 4096)
+ while True:
+ sbuf = rmr.rmr_torcv_msg(MRC, sbuf, 0) # set the timeout to 0 so this doesn't block!!
+ summary = rmr.message_summary(sbuf)
+ if summary["message state"] == 12 and summary["message status"] == "RMR_ERR_TIMEOUT":
+ # no new messages
+ break
else:
- logger.debug("Deffering execution for %s seconds", str(RMR_RCV_RETRY_INTERVAL / 1000))
- gevent.sleep(RMR_RCV_RETRY_INTERVAL / 1000)
+ if (not filter_type) or (summary["message type"] == filter_type):
+ # message is relevent
+ new_messages.append(summary)
+ else:
+ # "spam", do nothing with message, effectively dropped
+ logger.debug("A message was received by a1, but it was not desired, droping: %s", summary)
- # we still didn't get the ACK we want
- raise ExpectedAckNotReceived()
+ return new_messages
import json
from jsonschema.exceptions import ValidationError
from a1 import get_module_logger
-from a1 import a1rmr, exceptions, utils
+from a1 import a1rmr, exceptions, utils, data
logger = get_module_logger(__name__)
-def _get_policy_definition(policyname):
+def _get_policy_definition(policy_type_id):
# Currently we read the manifest on each call, which would seem to allow updating A1 in place. Revisit this?
manifest = utils.get_ric_manifest()
for m in manifest["controls"]:
- if m["name"] == policyname:
+ if m["name"] == policy_type_id:
return m
- raise exceptions.PolicyNotFound()
-
-def _get_needed_policy_info(policyname):
- """
- Get the needed info for a policy
- """
- m = _get_policy_definition(policyname)
- return (
- utils.rmr_string_to_int(m["message_receives_rmr_type"]),
- m["message_receives_payload_schema"] if "message_receives_payload_schema" in m else None,
- utils.rmr_string_to_int(m["message_sends_rmr_type"]),
- )
+ raise exceptions.PolicyTypeNotFound()
-def _get_needed_policy_fetch_info(policyname):
+def _get_policy_schema(policy_type_id):
"""
- Get the needed info for fetching a policy state
+ Get the needed info for a policy
"""
- m = _get_policy_definition(policyname)
- req_k = "control_state_request_rmr_type"
- ack_k = "control_state_request_reply_rmr_type"
- return (
- utils.rmr_string_to_int(m[req_k]) if req_k in m else None,
- utils.rmr_string_to_int(m[ack_k]) if ack_k in m else None,
- )
+ m = _get_policy_definition(policy_type_id)
+ return m["message_receives_payload_schema"] if "message_receives_payload_schema" in m else None
def _try_func_return(func):
except ValidationError as exc:
logger.exception(exc)
return "", 400
- except exceptions.PolicyNotFound as exc:
+ except exceptions.PolicyTypeNotFound as exc:
+ logger.exception(exc)
+ return "", 404
+ except exceptions.PolicyInstanceNotFound as exc:
logger.exception(exc)
return "", 404
except exceptions.MissingManifest as exc:
except exceptions.MissingRmrString as exc:
logger.exception(exc)
return "A1 does not have a mapping for the desired rmr string. report this!", 500
- except exceptions.MessageSendFailure as exc:
- logger.exception(exc)
- return "A1 was unable to send a needed message to a downstream subscriber", 504
- except exceptions.ExpectedAckNotReceived as exc:
- logger.exception(exc)
- return "A1 was expecting an ACK back but it didn't receive one or didn't recieve the expected ACK", 504
except BaseException as exc:
# catch all, should never happen...
logger.exception(exc)
return Response(status=500)
-def _put_handler(policyname, data):
+def _put_handler(policy_type_id, policy_instance_id, instance):
"""
Handles policy put
- """
- mtype_send, schema, mtype_return = _get_needed_policy_info(policyname)
+ For now, policy_type_id is used as the message type
+ """
+ # check for 404
+ data.type_is_valid(policy_type_id)
# validate the PUT against the schema, or if there is no shema, make sure the pUT is empty
+ schema = _get_policy_schema(policy_type_id)
if schema:
- utils.validate_json(data, schema)
- elif data != {}:
+ utils.validate_json(instance, schema)
+ elif instance != {}:
return "BODY SUPPLIED BUT POLICY HAS NO EXPECTED BODY", 400
- # send rmr, wait for ACK
- return_payload = a1rmr.send_ack_retry(json.dumps(data), message_type=mtype_send, expected_ack_message_type=mtype_return)
+ # store the instance
+ data.store_policy_instance(policy_type_id, policy_instance_id, instance)
- # right now it is assumed that xapps respond with JSON payloads
- # it is further assumed that they include a field "status" and that the value "SUCCESS" indicates a good policy change
- try:
- rpj = json.loads(return_payload)
- return (rpj, 200) if rpj["status"] == "SUCCESS" else ({"reason": "BAD STATUS", "return_payload": rpj}, 502)
- except json.decoder.JSONDecodeError:
- return {"reason": "NOT JSON", "return_payload": return_payload}, 502
- except KeyError:
- return {"reason": "NO STATUS", "return_payload": rpj}, 502
+ body = {
+ "operation": "CREATE",
+ "policy_type_id": policy_type_id,
+ "policy_instance_id": policy_instance_id,
+ "payload": instance,
+ }
+
+ # send rmr (best effort)
+ a1rmr.send(json.dumps(body), message_type=policy_type_id)
+
+ return "", 201
-def _get_handler(policyname):
+def _get_status_handler(policy_type_id, policy_instance_id):
"""
- Handles policy GET
+ Pop trough A1s mailbox, insert the latest status updates into the database, and then return the status vector
+
+ NOTE: this is done lazily. Meaning, when someone performs a GET on this API, we pop through a1s mailbox.
+ THis may not work in the future if there are "thousands" of policy acknowledgements that hit a1 before this is called,
+ because the rmr mailbox may fill. However, in the near term, we do not expect this to happen.
"""
- mtype_send, mtype_return = _get_needed_policy_fetch_info(policyname)
+ # check validity to 404 first:
+ data.type_is_valid(policy_type_id)
+ data.instance_is_valid(policy_type_id, policy_instance_id)
- if not (mtype_send and mtype_return):
- return "POLICY DOES NOT SUPPORT FETCHING", 400
+ # pop a1s mailbox, looking for policy notifications
+ new_messages = a1rmr.dequeue_all_waiting_messages(21024)
- # send rmr, wait for ACK
- return_payload = a1rmr.send_ack_retry("", message_type=mtype_send, expected_ack_message_type=mtype_return)
+ # try to parse the messages as responses. Drop those that are malformed
+ for msg in new_messages:
+ # note, we don't use the parameters "policy_type_id, policy_instance" from above here,
+ # because we are popping the whole mailbox, which might include other statuses
+ pay = json.loads(msg["payload"])
+ if "policy_type_id" in pay and "policy_instance_id" in pay and "handler_id" in pay and "status" in pay:
+ data.set_policy_instance_status(
+ pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]
+ )
+ else:
+ logger.debug("Dropping message")
+ logger.debug(pay)
- # right now it is assumed that xapps respond with JSON payloads
- try:
- return (json.loads(return_payload), 200)
- except json.decoder.JSONDecodeError:
- return {"reason": "NOT JSON", "return_payload": return_payload}, 502
+ # return the status vector
+ return data.get_policy_instance_statuses(policy_type_id, policy_instance_id)
-# Public
+# Healthcheck
-def put_handler(policyname):
+def get_healthcheck():
"""
- Handles policy replacement
+ Handles healthcheck GET
+ Currently, this basically checks the server is alive.a1rmr
"""
- data = connexion.request.json
- return _try_func_return(lambda: _put_handler(policyname, data))
+ return "", 200
+
+
+# Policy types
-def get_handler(policyname):
+def get_all_policy_types():
"""
- Handles policy GET
+ Handles GET /a1-p/policytypes
"""
- return _try_func_return(lambda: _get_handler(policyname))
+ return "", 501
-def healthcheck_handler():
+def create_policy_type(policy_type_id):
"""
- Handles healthcheck GET
- Currently, this basically checks the server is alive.a1rmr
+ Handles PUT /a1-p/policytypes/policy_type_id
"""
- return "", 200
+ return "", 501
+
+
+def get_policy_type(policy_type_id):
+ """
+ Handles GET /a1-p/policytypes/policy_type_id
+ """
+ return "", 501
+
+
+def delete_policy_type(policy_type_id):
+ """
+ Handles DELETE /a1-p/policytypes/policy_type_id
+ """
+ return "", 501
+
+
+# Policy instances
+
+
+def get_all_instances_for_type(policy_type_id):
+ """
+ Handles GET /a1-p/policytypes/policy_type_id/policies
+ """
+ return "", 501
+
+
+def get_policy_instance(policy_type_id, policy_instance_id):
+ """
+ Handles GET /a1-p/policytypes/polidyid/policies/policy_instance_id
+ """
+ return _try_func_return(lambda: data.get_policy_instance(policy_type_id, policy_instance_id))
+
+
+def get_policy_instance_status(policy_type_id, policy_instance_id):
+ """
+ Handles GET /a1-p/policytypes/polidyid/policies/policy_instance_id/status
+ """
+ return _try_func_return(lambda: _get_status_handler(policy_type_id, policy_instance_id))
+
+
+def create_or_replace_policy_instance(policy_type_id, policy_instance_id):
+ """
+ Handles PUT /a1-p/policytypes/polidyid/policies/policy_instance_id
+ """
+ instance = connexion.request.json
+ return _try_func_return(lambda: _put_handler(policy_type_id, policy_instance_id, instance))
+
+
+def delete_policy_instance(policy_type_id, policy_instance_id):
+ """
+ Handles DELETE /a1-p/policytypes/polidyid/policies/policy_instance_id
+ """
+ return "", 501
--- /dev/null
+# ==================================================================================
+# Copyright (c) 2019 Nokia
+# Copyright (c) 2018-2019 AT&T Intellectual Property.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ==================================================================================
+
+"""
+Represents A1s database and database access functions.
+In the future, this may change to use a different backend, possibly dramatically.
+Hopefully, the access functions are a good api so nothing else has to change when this happens
+
+For now, the database is in memory.
+We use dict data structures (KV) with the expectation of having to move this into Redis
+"""
+from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
+from a1 import get_module_logger
+
+logger = get_module_logger(__name__)
+
+# This is essentially mockouts for future KV
+# Note that the D subkey won't be needed when in redis, since you can store data at x anx x_y
+POLICY_DATA = {}
+I = "instances"
+H = "handlers"
+D = "data"
+
+
+# TODO: REMOVE THIS!! (should be done via PUT)
+POLICY_DATA[20000] = {}
+POLICY_DATA[20000][D] = {"foo": "bar"}
+POLICY_DATA[20000][I] = {}
+POLICY_DATA[20001] = {}
+POLICY_DATA[20001][D] = {"foo": "bar"}
+POLICY_DATA[20001][I] = {}
+
+
+def type_is_valid(policy_type_id):
+ """
+ check that a type is valid
+ """
+ if policy_type_id not in POLICY_DATA:
+ logger.error("%s not found", policy_type_id)
+ raise PolicyTypeNotFound()
+
+
+def instance_is_valid(policy_type_id, policy_instance_id):
+ """
+ check that an instance is valid
+ """
+ type_is_valid(policy_type_id)
+ if policy_instance_id not in POLICY_DATA[policy_type_id][I]:
+ raise PolicyInstanceNotFound
+
+
+def store_policy_instance(policy_type_id, policy_instance_id, instance):
+ """
+ Store a policy instance
+ """
+ type_is_valid(policy_type_id)
+
+ # store the instance
+ # Reset the statuses because this is a new policy instance, even if it was overwritten
+ POLICY_DATA[policy_type_id][I][policy_instance_id] = {}
+ POLICY_DATA[policy_type_id][I][policy_instance_id][D] = instance
+ POLICY_DATA[policy_type_id][I][policy_instance_id][H] = {}
+
+
+def get_policy_instance(policy_type_id, policy_instance_id):
+ """
+ Retrieve a policy instance
+ """
+ type_is_valid(policy_type_id)
+ instance_is_valid(policy_type_id, policy_instance_id)
+ return POLICY_DATA[policy_type_id][I][policy_instance_id][D]
+
+
+def get_policy_instance_statuses(policy_type_id, policy_instance_id):
+ """
+ Retrieve the status vector for a policy instance
+ """
+ type_is_valid(policy_type_id)
+ instance_is_valid(policy_type_id, policy_instance_id)
+
+ return [{"handler_id": k, "status": v} for k, v in POLICY_DATA[policy_type_id][I][policy_instance_id][H].items()]
+
+
+def set_policy_instance_status(policy_type_id, policy_instance_id, handler_id, status):
+ """
+ Update the status of a handler id of a policy instance
+ """
+ type_is_valid(policy_type_id)
+ instance_is_valid(policy_type_id, policy_instance_id)
+
+ POLICY_DATA[policy_type_id][I][policy_instance_id][H][handler_id] = status
"""
-class MessageSendFailure(BaseException):
- pass
-
+class PolicyInstanceNotFound(BaseException):
+ """a policy instance cannot be found"""
-class ExpectedAckNotReceived(BaseException):
- pass
-
-class PolicyNotFound(BaseException):
- pass
+class PolicyTypeNotFound(BaseException):
+ """a policy type instance cannot be found"""
class MissingRmrString(BaseException):
# ==================================================================================
openapi: 3.0.0
info:
- version: 0.10.3
+ version: 1.0.0
title: RIC A1
paths:
'/a1-p/healthcheck':
get:
description: >
- perform a healthcheck on a1
+ Perform a healthcheck on a1
tags:
- A1 Mediator
- operationId: a1.controller.healthcheck_handler
+ operationId: a1.controller.get_healthcheck
responses:
200:
description: >
- a1 is healthy. Anything other than a 200 should be considered a1 as failing
+ A1 is healthy.
+ Anything other than a 200 should be considered a1 as failing
- '/a1-p/policies/{policyname}':
+ '/a1-p/policytypes/':
+ get:
+ description: "Get a list of all registered policy type ids"
+ tags:
+ - A1 Mediator
+ operationId: a1.controller.get_all_policy_types
+ responses:
+ 200:
+ description: "list of all registered policy type ids"
+ content:
+ application/json:
+ schema:
+ type: array
+ items:
+ "$ref": "#/components/schemas/policy_type_id"
+ example: [20000, 20020]
+
+ '/a1-p/policytypes/{policy_type_id}':
parameters:
- - name: policyname
+ - name: policy_type_id
in: path
- description: the name of the policy to retrieve or replace
required: true
schema:
- type: string
+ "$ref": "#/components/schemas/policy_type_id"
+ get:
+ description: >
+ Get this policy type
+ tags:
+ - A1 Mediator
+ operationId: a1.controller.get_policy_type
+ responses:
+ '200':
+ description: "policy type successfully found"
+ content:
+ application/json:
+ schema:
+ "$ref": "#/components/schemas/policy_type_schema"
+ '404':
+ description: >
+ policy type not found
+ delete:
+ description: >
+ Delete this policy type. Can only be performed if there are no instances of this type
+ tags:
+ - A1 Mediator
+ operationId: a1.controller.delete_policy_type
+ responses:
+ '204':
+ description: >
+ policy type successfully deleted
+ '400':
+ description: >
+ Policy type cannot be deleted because there are instances
+ All instances must be removed before a policy type can be deleted
+ '404':
+ description: >
+ policy type not found
put:
description: >
- Replace the current operation of policyname with the new parameters (replaces the current policy with the new one specified here).
-
- Until there are standard policy definitions that are defined OUTSIDE of the scope of xapps, this API will be *very underspecified*.
- This is a known gap, do not despair.
- The PUT body is specified, *currently* in the xapp manifest that implements this policy; the caller should refer to the message_receives_payload_schema field to make this request.
- The return content is also specified as above (in the xapp manifest) in the message_sends_payload_schema field.
+ Create a new policy type .
+ Replace is not currently allowed; to replace, for now do a DELETE and then a PUT again.
- Eventually, we need concrete policy defintions that are decoupled from xapp, and then this API description will become more fully specified.
tags:
- A1 Mediator
- operationId: a1.controller.put_handler
+ operationId: a1.controller.create_policy_type
requestBody:
content:
application/json:
schema:
- type: object
+ "$ref": "#/components/schemas/policy_type_schema"
+ example:
+ name: admission_control_policy
+ description: various parameters to control admission of dual connection
+ policy_type_id: 20000
+ create_schema:
+ $schema: 'http://json-schema.org/draft-07/schema#'
+ type: object
+ properties:
+ enforce:
+ type: boolean
+ default: true
+ window_length:
+ type: integer
+ default: 1
+ minimum: 1
+ maximum: 60
+ description: Sliding window length (in minutes)
+ blocking_rate:
+ type: number
+ default: 10
+ minimum: 1
+ maximum: 100
+ description: '% Connections to block'
+ trigger_threshold:
+ type: integer
+ default: 10
+ minimum: 1
+ description: Minimum number of events in window to trigger blocking
+ additionalProperties: false
+
+ responses:
+ '201':
+ description: "policy type successfully created"
+ '400':
+ description: "illegal ID, or object already existed"
+ '/a1-p/policytypes/{policy_type_id}/policies':
+ parameters:
+ - name: policy_type_id
+ in: path
+ required: true
+ schema:
+ "$ref": "#/components/schemas/policy_type_id"
+ get:
+ description: "get a list of all policy instance ids for this policy type id"
+ tags:
+ - A1 Mediator
+ operationId: a1.controller.get_all_instances_for_type
+ responses:
+ 200:
+ description: "list of all policy instance ids for this policy type id"
+ content:
+ application/json:
+ schema:
+ type: array
+ items:
+ "$ref": "#/components/schemas/policy_instance_id"
+ example: ["3d2157af-6a8f-4a7c-810f-38c2f824bf12", "06911bfc-c127-444a-8eb1-1bffad27cc3d"]
+
+
+ '/a1-p/policytypes/{policy_type_id}/policies/{policy_instance_id}':
+ parameters:
+ - name: policy_type_id
+ in: path
+ required: true
+ schema:
+ "$ref": "#/components/schemas/policy_type_id"
+
+ - name: policy_instance_id
+ in: path
+ required: true
+ schema:
+ "$ref": "#/components/schemas/policy_instance_id"
+
+ get:
+ description: >
+ Retrieve the policy instance
+ tags:
+ - A1 Mediator
+ operationId: a1.controller.get_policy_instance
responses:
'200':
description: >
- The downstream component responsible for implementing this policy replied with a good response. Check the manifest for response details.
- '400':
- description: >
- Bad PUT body for this policyname
- '404':
- description: >
- there is no policy with this name
- '502':
- description: >
- The xapp that implements this policy replied, but the reply was a "failure", OR there was no status indicating success or failure.
- This returns an object containing the reason, and the return payload.
+ The policy instance.
+ the schema of this object is defined by the create_schema field of the policy type
content:
application/json:
schema:
type: object
- properties:
- reason:
- type: string
- enum: [
- "NO STATUS",
- "BAD STATUS",
- "NOT JSON"
- ]
- return_payload:
- type: object
+ '404':
+ description: >
+ there is no policy instance with this policy_instance_id or there is no policy type with this policy_type_id
+
+ delete:
+ description: >
+ Delete this policy instance
- '504':
+ tags:
+ - A1 Mediator
+ operationId: a1.controller.delete_policy_instance
+ responses:
+ '204':
+ description: >
+ policy instance successfully deleted
+ '404':
+ description: >
+ there is no policy instance with this policy_instance_id
+ or there is no policy type with this policy_type_id
+
+ put:
+ description: >
+ Create or replace a policy instance of type policy_type_id.
+ The schema of the PUT body is defined by the create_schema field of the policy type.
+
+ tags:
+ - A1 Mediator
+ operationId: a1.controller.create_or_replace_policy_instance
+ requestBody:
+ content:
+ application/json:
+ schema:
+ type: object
+ description: >
+ the schema of this object is defined by the create_schema field of the policy type
+ example:
+ enforce: true
+ window_length: 10
+ blocking_rate: 20
+ trigger_threshold: 10
+
+ responses:
+ '201':
description: >
- the downstream component responsible for handling this policy did not respond (in time)
+ Policy instance created
+ '400':
+ description: >
+ Bad PUT body for this policy instance
+ '404':
+ description: >
+ There is no policy type with this policy_type_id
+
+ '/a1-p/policytypes/{policy_type_id}/policies/{policy_instance_id}/status':
+ parameters:
+ - name: policy_type_id
+ in: path
+ required: true
+ schema:
+ "$ref": "#/components/schemas/policy_type_id"
+
+ - name: policy_instance_id
+ in: path
+ required: true
+ schema:
+ "$ref": "#/components/schemas/policy_instance_id"
get:
- description: Get the current state/value of policyname
+ description: >
+ Retrieve the policy instance status across all handlers of the policy
+
tags:
- A1 Mediator
- operationId: a1.controller.get_handler
+ operationId: a1.controller.get_policy_instance_status
responses:
'200':
description: >
- The downstream component responsible for implementing this policy replied with a good response. Check the manifest for response details.
+ The policy instance status.
+ Returns a vector of statuses, where each contains a handler_id (opaque id of a RIC component that implements this policy) and the policy status as returned by that handler
content:
application/json:
schema:
- type: object
- '400':
- description: >
- The downstream component for implementing this policy does not support policy fetching.
+ type: array
+ items:
+ type: object
+ properties:
+ handler_id:
+ type: string
+ status:
+ type: string
+ example:
+ [{"handler_id": "1234-5678", "status" : "OK"}, {"handler_id": "abc-def", "status" : "NOT IMPLEMENTED"}]
'404':
description: >
- there is no policy with this name
- '504':
+ there is no policy instance with this policy_instance_id or there is no policy type with this policy_type_id
+
+
+components:
+ schemas:
+ policy_type_schema:
+ type: object
+ required:
+ - name
+ - description
+ - policy_type_id
+ - create_schema
+ additionalProperties: false
+ properties:
+ name:
+ type: string
+ description: name of the policy type
+ description:
+ type: string
+ description: description of the policy type
+ policy_type_id:
+ description: the integer of the policy type
+ type: integer
+ create_schema:
+ type: object
description: >
- the downstream component responsible for handling this policy did not respond (in time)
+ jsonschema (following http://json-schema.org/draft-07/schema) of the CREATE payload to be sent to handlers of this policy
+ policy_type_id:
+ description: >
+ represents a policy type identifier. Currently this is restricted to an integer range.
+ type: integer
+ minimum: 20000
+ maximum: 21024
+ policy_instance_id:
+ description: >
+ represents a policy instance identifier. UUIDs are advisable but can be any string
+ type: string
+ example: "3d2157af-6a8f-4a7c-810f-38c2f824bf12"
+
+ downstream_message_schema:
+ type: object
+ required:
+ - operation
+ - policy_type_id
+ - policy_instance_id
+ - payload
+ additionalProperties: false
+ properties:
+ operation:
+ description: the operation being performed
+ type: string
+ enum:
+ - CREATE
+ - DELETE
+ - UPDATE
+ - READ
+ policy_type_id:
+ "$ref": "#/components/schemas/policy_type_id"
+ policy_instance_id:
+ "$ref": "#/components/schemas/policy_instance_id"
+ payload:
+ description: payload for this operation
+ type: object
+ example:
+ operation: CREATE
+ policy_type_id: 12345678
+ policy_instance_id: 3d2157af-6a8f-4a7c-810f-38c2f824bf12
+ payload:
+ enforce: true
+ window_length: 10
+ blocking_rate: 20
+ trigger_threshold: 10
+
+ downstream_notification_schema:
+ type: object
+ required:
+ - policy_type_id
+ - policy_instance_id
+ - handler_id
+ - status
+ additionalProperties: false
+ properties:
+ policy_type_id:
+ "$ref": "#/components/schemas/policy_type_id"
+ policy_instance_id:
+ "$ref": "#/components/schemas/policy_instance_id"
+ handler_id:
+ description: >
+ id of the policy handler
+ type: string
+ status:
+ description: >
+ the status of this policy instance in this handler
+ type: string
+ example:
+ policy_type_id: 12345678
+ policy_instance_id: 3d2157af-6a8f-4a7c-810f-38c2f824bf12
+ handler_id: 1234-5678
+ status: OK
logger = get_module_logger(__name__)
-def _get_rmr_mapping_table(cache={}):
- """
- Get the rmr mapping file
- Broken out for ease of unit testing
- """
- try:
- return open("/opt/rmr_string_int_mapping.txt", "r").readlines()
- except FileNotFoundError:
- logger.error("Missing RMR Mapping!")
- raise exceptions.MissingRmrMapping
-
-
# Public
except FileNotFoundError:
logger.error("Missing A1 Manifest!")
raise exceptions.MissingManifest
-
-
-def rmr_string_to_int(rmrs, cache={}):
- """
- map an rmr string to an int.
- TODO: should we allow for dynamic updates to this file? If so, we shouldn't cache
- """
- if cache == {}: # fill the cache if not populated
- lines = _get_rmr_mapping_table()
- for l in lines:
- items = l.split(":")
- cache[items[0]] = int(items[1])
-
- if rmrs in cache:
- return cache[rmrs]
- raise exceptions.MissingRmrString(rmrs)
# The Jenkins job uses this string for the tag in the image name
# for example nexus3.o-ran-sc.org:10004/my-image-name:my-tag
---
-tag: 0.10.3
+tag: 0.11.0-NOT_FOR_USE_YET
Optional ENV Variables
----------------------
-You can set the following ENVs to change the A1 behavior: 1)
-``RMR_RCV_RETRY_INTERVAL`` the number of milliseconds that execution
-will defer (back to the server loop to handle http request if
-applicable) when an expected ack is not received by rmr call. The
-default is ``1000`` (1s). The time for the full HTTP request to
-``PUT /policies`` will be > this if an ACK is not recieved within 10ms,
-which is an initial delay until the first rcv is tried. 2)
-``RMR_RETRY_TIMES`` the number of times failed rmr operations such as
+You can set the following ENVs to change the A1 behavior:
+
+1. ``RMR_RETRY_TIMES`` the number of times failed rmr operations such as
timeouts and send failures should be retried before A1 gives up and
returns a 503. The default is ``4``.
The format is based on `Keep a Changelog <http://keepachangelog.com/>`__
and this project adheres to `Semantic Versioning <http://semver.org/>`__.
+[1.0.0] - TBD
+
+::
+
+ * Release 1.0.0 will be the Release A version of A1
+
+
+[0.11.0] - 9/17/2019
+
+::
+
+ * This is on the road to release 1.0.0. It is not meant to be tested (E2E) as it's own release
+ * Implement the Release A spec in the openapi.yaml
+ * Rework A1 to follow that spec
+ * Remove rmr_mapping now that we use policyid as the mtype to send and a well known mtype for the ACKs
+ * Add the delay receiver test to the tavern integration tests
+ * Remove unneeded ENV variables from helm charts
+ * Switch away from builder images to avoid quicksand; upgrade rmr at our own pace
+
+
[0.10.3] - 8/20/2019
::
# See the License for the specific language governing permissions and
# limitations under the License.
# ==================================================================================
-# TODO: switch to alpine once rmr apk available
+# install a well known working rmr
FROM python:3.7-alpine
+RUN apk update && apk add autoconf automake build-base cmake libtool ninja pkgconfig git
+RUN git clone --branch 1.3.0 https://gerrit.o-ran-sc.org/r/ric-plt/lib/rmr \
+ && cd rmr \
+ && mkdir build \
+ && cd build \
+ && cmake .. -DPACK_EXTERNALS=1 \
+ && make install
-COPY receiver.py /
+# stage2
+FROM python:3.7-alpine
-# copy NNG and rmr out of the CI builder nng
-COPY --from=nexus3.o-ran-sc.org:10004/bldr-alpine3:3-a3.9 /usr/local/lib64/libnng.so /usr/local/lib64/libnng.so
-COPY --from=nexus3.o-ran-sc.org:10004/bldr-alpine3:3-a3.9 /usr/local/lib64/librmr_nng.so /usr/local/lib64/librmr_nng.so
+# copies
+COPY --from=0 /usr/local/lib64/libnng.so /usr/local/lib64/libnng.so
+COPY --from=0 /usr/local/lib64/librmr_nng.so /usr/local/lib64/librmr_nng.so
+COPY receiver.py /
# Install RMr python bindings
RUN pip install --upgrade pip
+++ /dev/null
-# ==================================================================================
-# Copyright (c) 2019 Nokia
-# Copyright (c) 2018-2019 AT&T Intellectual Property.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-# ==================================================================================
-# TODO: switch to alpine once rmr apk available
-FROM python3.7
-
-COPY bombard.py /
-
-# copy NNG out of the CI builder nng
-COPY --from=nexus3.o-ran-sc.org:10004/bldr-debian-python-nng:2-py3.7-nng1.1.1 /usr/local/lib/libnng.so /usr/local/lib/libnng.so
-
-# Installs RMr using debian package hosted at packagecloud.io
-RUN wget --content-disposition https://packagecloud.io/o-ran-sc/master/packages/debian/stretch/rmr_1.0.36_amd64.deb/download.deb
-RUN dpkg -i rmr_1.0.36_amd64.deb
-
-# Install RMr python bindings
-RUN pip install --upgrade pip
-RUN pip install rmr==0.10.1
-
-# rmr setups
-RUN mkdir -p /opt/route/
-ENV LD_LIBRARY_PATH /usr/local/lib
-ENV RMR_SEED_RT /opt/route/local.rt
-
-WORKDIR /
-CMD ["python","-u","bombard.py"]
apiVersion: v1
description: A1 Helm chart for Kubernetes
name: a1mediator
-version: 0.10.3
+version: 0.11.0
{
"controls":[
{
- "name":"admission_control_policy",
+ "name":20000,
"description":"various parameters to control admission of dual connection",
"control_state_request_rmr_type":"DC_ADM_GET_POLICY",
"control_state_request_reply_rmr_type":"DC_ADM_GET_POLICY_ACK",
}
},
{
- "name":"test_policy",
+ "name":20001,
"description":"for the purposes of testing",
"message_receives_rmr_type":"TEST_REQ",
"message_sends_rmr_type":"TEST_ACK",
local.rt: |
newrt|start
rte|20000|testreceiverrmrservice:4560
- rte|20001|{{ .Values.rmrservice.name }}:{{ .Values.rmrservice.port }}
- rte|20002|testreceiverrmrservice:4560
- rte|20003|{{ .Values.rmrservice.name }}:{{ .Values.rmrservice.port }}
- rte|10000|delayreceiverrmrservice:4563
- rte|10001|{{ .Values.rmrservice.name }}:{{ .Values.rmrservice.port }}
+ rte|20001|delayreceiverrmrservice:4563
+ rte|21024|{{ .Values.rmrservice.name }}:{{ .Values.rmrservice.port }}
newrt|end
- rmr_string_int_mapping.txt: {{ tpl (.Files.Get "files/rmr_string_int_mapping.txt") . | quote }}
- ricmanifest.json: {{ tpl (.Files.Get "files/ricmanifest.json") . | quote }}
+ ricmanifest.json: {{tpl (.Files.Get "files/ricmanifest.json") . | quote}}
- name: a1conf
mountPath: /opt/ricmanifest.json
subPath: ricmanifest.json
- - name: a1conf
- mountPath: /opt/rmr_string_int_mapping.txt
- subPath: rmr_string_int_mapping.txt
- name: a1conf
mountPath: /opt/route/local.rt
subPath: local.rt
env:
- name: PYTHONUNBUFFERED
- value: "0"
- - name: RMR_RCV_RETRY_INTERVAL
- value: "{{ .Values.rmr_timeout_config.rcv_retry_interval_ms }}"
+ value: "1"
- name: RMR_RETRY_TIMES
value: "{{ .Values.rmr_timeout_config.rcv_retry_times }}"
image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}"
livenessProbe:
httpGet:
- path: /healthcheck
+ path: /a1-p/healthcheck
port: http
readinessProbe:
httpGet:
- path: /healthcheck
+ path: /a1-p/healthcheck
port: http
resources:
{{- toYaml .Values.resources | nindent 12 }}
+++ /dev/null
-{{- if .Values.ingress.enabled -}}
-{{- $fullName := include "a1mediator.fullname" . -}}
-apiVersion: extensions/v1beta1
-kind: Ingress
-metadata:
- name: {{ $fullName }}
- labels:
-{{ include "a1mediator.labels" . | indent 4 }}
- {{- with .Values.ingress.annotations }}
- annotations:
- {{- toYaml . | nindent 4 }}
- {{- end }}
-spec:
-{{- if .Values.ingress.tls }}
- tls:
- {{- range .Values.ingress.tls }}
- - hosts:
- {{- range .hosts }}
- - {{ . | quote }}
- {{- end }}
- secretName: {{ .secretName }}
- {{- end }}
-{{- end }}
- rules:
- {{- range .Values.ingress.hosts }}
- - host: {{ .host | quote }}
- http:
- paths:
- {{- range .paths }}
- - path: {{ . }}
- backend:
- serviceName: {{ $fullName }}
- servicePort: http
- {{- end }}
- {{- end }}
-{{- end }}
# these are ENV variables that A1 takes; see docs
rmr_timeout_config:
- rcv_retry_interval_ms: 500
rcv_retry_times: 20
-
-# These were generated by the helm template generator and have not been altered.
-ingress:
- enabled: false
- annotations: {}
- # kubernetes.io/ingress.class: nginx
- # kubernetes.io/tls-acme: "true"
- hosts:
- - host: chart-example.local
- paths: []
-
- tls: []
- # - secretName: chart-example-tls
- # hosts:
- # - chart-example.local
from rmr import rmr
PORT = os.environ.get("TEST_RCV_PORT", "4560")
-RETURN_MINT = int(os.environ.get("TEST_RCV_RETURN_MINT", 20001))
-RETURN_MINT_FETCH = int(os.environ.get("TEST_RCV_RETURN_MINT", 20003))
DELAY = int(os.environ.get("TEST_RCV_SEC_DELAY", 0))
-PAYLOAD_RETURNED = json.loads(
- os.environ.get("TEST_RCV_RETURN_PAYLOAD", '{"ACK_FROM": "ADMISSION_CONTROL", "status": "SUCCESS"}')
-)
+HANDLER_ID = os.environ.get("HANDLER_ID", "test_receiver")
# TODO: should these be made constants?
mrc = rmr.rmr_init(PORT.encode("utf-8"), rmr.RMR_MAX_RCV_BYTES, 0x00)
time.sleep(1)
print("not yet ready")
-print("listening")
+print("listening ON {}".format(PORT))
sbuf = None
while True:
sbuf = rmr.rmr_torcv_msg(mrc, sbuf, 1000)
else:
print("Message received!: {}".format(summary))
- # if this was a policy fetch (request int =20002), override the payload and return int
- if summary["message type"] == 20002:
- PAYLOAD_RETURNED = {"mock return from FETCH": "pretend policy is here"}
- RETURN_MINT = 20003
+ received_payload = json.loads(summary["payload"])
- val = json.dumps(PAYLOAD_RETURNED).encode("utf-8")
+ payload = {
+ "policy_type_id": received_payload["policy_type_id"],
+ "policy_instance_id": received_payload["policy_instance_id"],
+ "handler_id": HANDLER_ID,
+ "status": "OK",
+ }
+
+ val = json.dumps(payload).encode("utf-8")
rmr.set_payload_and_length(val, sbuf)
- sbuf.contents.mtype = RETURN_MINT
+ sbuf.contents.mtype = 21024
print("Pre reply summary: {}".format(rmr.message_summary(sbuf)))
time.sleep(DELAY)
sbuf = rmr.rmr_rts_msg(mrc, sbuf)
response:
status_code: 200
-
---
-test_name: test delayed policy
+test_name: test admission control
stages:
- - name: test the delayed policy
+ - name: test the admission control policy get not there yet
request:
- url: http://localhost:10000/a1-p/policies/test_policy
- method: PUT
- json:
- {}
- headers:
- content-type: application/json
+ url: http://localhost:10000/a1-p/policytypes/20000/policies/admission_control_policy
+ method: GET
response:
- status_code: 200
- body:
- ACK_FROM: DELAYED_TEST
- status: SUCCESS
-
-
----
+ status_code: 404
-test_name: test admission control
+ - name: test the admission control policy status get not there yet
+ request:
+ url: http://localhost:10000/a1-p/policytypes/20000/policies/admission_control_policy/status
+ method: GET
+ response:
+ status_code: 404
-stages:
- name: test the admission control policy
request:
- url: http://localhost:10000/a1-p/policies/admission_control_policy
+ url: http://localhost:10000/a1-p/policytypes/20000/policies/admission_control_policy
method: PUT
json:
enforce: true
trigger_threshold: 10
headers:
content-type: application/json
+ response:
+ status_code: 201
+
+ - name: test the admission control policy get
+ request:
+ url: http://localhost:10000/a1-p/policytypes/20000/policies/admission_control_policy
+ method: GET
response:
status_code: 200
body:
- ACK_FROM: ADMISSION_CONTROL
- status: SUCCESS
+ enforce: true
+ window_length: 10
+ blocking_rate: 20
+ trigger_threshold: 10
- - name: test the admission control policy get
+ - name: test the admission control policy status get
+ delay_before: 3 # give it a few seconds for rmr
request:
- url: http://localhost:10000/a1-p/policies/admission_control_policy
+ url: http://localhost:10000/a1-p/policytypes/20000/policies/admission_control_policy/status
method: GET
response:
status_code: 200
body:
- mock return from FETCH: pretend policy is here
+ - handler_id: test_receiver
+ status: OK
---
-test_name: bad_requests
+test_name: test the delay receiver
stages:
- - name: does not exist
+ - name: test the delay policy get not there yet
request:
- url: http://localhost:10000/a1-p/policies/darkness
+ url: http://localhost:10000/a1-p/policytypes/20001/policies/delaytest
+ method: GET
+ response:
+ status_code: 404
+
+ - name: test the delay policy status get not there yet
+ request:
+ url: http://localhost:10000/a1-p/policytypes/20001/policies/delaytest/status
+ method: GET
+ response:
+ status_code: 404
+
+ - name: test the delay policy
+ request:
+ url: http://localhost:10000/a1-p/policytypes/20001/policies/delaytest
method: PUT
- json:
- {}
+ json: {}
headers:
content-type: application/json
response:
- status_code: 404
+ status_code: 201
+
+ - name: test the delay policy get
+ request:
+ url: http://localhost:10000/a1-p/policytypes/20001/policies/delaytest
+ method: GET
+ response:
+ status_code: 200
+ body: {}
+
+ - name: test the admission control policy status get
+ delay_before: 8 # give it a few seconds for rmr ; delay reciever sleeps for 5 seconds by default
+ request:
+ url: http://localhost:10000/a1-p/policytypes/20001/policies/delaytest/status
+ method: GET
+ response:
+ status_code: 200
+ body:
+ - handler_id: delay_receiver
+ status: OK
+
+
+
+
+---
+
+test_name: bad_requests
+
+stages:
- name: bad body for admission control policy
request:
- url: http://localhost:10000/a1-p/policies/admission_control_policy
+ url: http://localhost:10000/a1-p/policytypes/20000/policies/admission_control_policy
method: PUT
json:
not: "expected"
- name: not a json
request:
- url: http://localhost:10000/a1-p/policies/admission_control_policy
+ url: http://localhost:10000/a1-p/policytypes/20000/policies/admission_control_policy
method: PUT
data: "asdf"
response:
- name: bad body for test policy
request:
- url: http://localhost:10000/a1-p/policies/test_policy
+ url: http://localhost:10000/a1-p/policytypes/20001/policies/test_policy
method: PUT
json:
not: "welcome"
response:
status_code: 400
-
- - name: test policy doesnt support fetch
- request:
- url: http://localhost:10000/a1-p/policies/test_policy
- method: GET
- response:
- status_code: 400
newrt|start
-rte|10000|devarchwork:4563
-rte|10001|devarchwork:4562
rte|20000|devarchwork:4560
-rte|20002|devarchwork:4560
-rte|20001|devarchwork:4562
-rte|20003|devarchwork:4562
+rte|20001|devarchwork:4563
+rte|21024|devarchwork:4562
newrt|end
local.rt: |
newrt|start
rte|20000|{{ .Values.testrmrservice.name }}:{{ .Values.testrmrservice.port }}
- rte|20001|a1rmrservice:4562
- rte|20002|{{ .Values.testrmrservice.name }}:{{ .Values.testrmrservice.port }}
- rte|20003|a1rmrservice:4562
+ rte|21024|a1rmrservice:4562
newrt|end
---
data:
local.rt: |
newrt|start
- rte|10000|{{ .Values.delayrmrservice.name }}:{{ .Values.delayrmrservice.port }}
- rte|10001|a1rmrservice:4562
+ rte|20001|{{ .Values.delayrmrservice.name }}:{{ .Values.delayrmrservice.port }}
+ rte|21024|a1rmrservice:4562
newrt|end
mountPath: /opt/route/local.rt
subPath: local.rt
env:
- - name: RMR_RCV_RETRY_INTERVAL
- value: "500"
- - name: RMR_RETRY_TIMES
- value: "10"
- name: TEST_RCV_PORT
value: "{{ .Values.delayrmrservice.port }}"
- - name: TEST_RCV_RETURN_MINT
- value: "10001"
- name: TEST_RCV_SEC_DELAY
value: "5"
- - name: TEST_RCV_RETURN_PAYLOAD
- value: '{"ACK_FROM": "DELAYED_TEST", "status": "SUCCESS"}'
+ - name: HANDLER_ID
+ value: "delay_receiver"
volumes:
- name: "testreceiverconf"
name: delayreceiverrmrservice
type: ClusterIP
port: 4563
-
-
-
-ingress:
- enabled: false
- annotations: {}
- # kubernetes.io/ingress.class: nginx
- # kubernetes.io/tls-acme: "true"
- hosts:
- - host: chart-example.local
- paths: []
-
- tls: []
- # - secretName: chart-example-tls
- # hosts:
- # - chart-example.local
-
-resources: {}
- # We usually recommend not to specify default resources and to leave this as a conscious
- # choice for the user. This also increases chances charts run on environments with little
- # resources, such as Minikube. If you do want to specify resources, uncomment the following
- # lines, adjust them as necessary, and remove the curly braces after 'resources:'.
- # limits:
- # cpu: 100m
- # memory: 128Mi
- # requests:
- # cpu: 100m
- # memory: 128Mi
-
-nodeSelector: {}
-
-tolerations: []
-
-affinity: {}
setup(
name="a1",
- version="0.10.3",
+ version="0.11.0",
packages=find_packages(exclude=["tests.*", "tests"]),
author="Tommy Carpenter",
description="RIC A1 Mediator for policy/intent changes",
{
"controls":[
{
- "name":"admission_control_policy",
+ "name":20000,
"description":"various parameters to control admission of dual connection",
"control_state_request_rmr_type":"DC_ADM_GET_POLICY",
"control_state_request_reply_rmr_type":"DC_ADM_GET_POLICY_ACK",
}
},
{
- "name":"test_policy",
+ "name":20001,
"description":"for the purposes of testing",
"message_receives_rmr_type":"TEST_REQ",
"message_sends_rmr_type":"TEST_ACK",
+++ /dev/null
-DC_ADM_INT_CONTROL:20000
-DC_ADM_INT_CONTROL_ACK:20001
-DC_ADM_GET_POLICY: 20002
-DC_ADM_GET_POLICY_ACK: 20003
-TEST_REQ:10000
-TEST_ACK:10001
# ==================================================================================
import tempfile
import os
+
from rmr.rmr_mocks import rmr_mocks
from a1 import app
-from a1 import exceptions
-from rmr import rmr
import testing_helpers
import pytest
+
+ADM_CTRL = "admission_control_policy"
+ADM_CTRL_INSTANCE = "/a1-p/policytypes/20000/policies/" + ADM_CTRL
+ADM_CTRL_INSTANCE_STATUS = ADM_CTRL_INSTANCE + "/status"
+
+
# http://flask.pocoo.org/docs/1.0/testing/
@pytest.fixture
def client():
os.unlink(app.app.config["DATABASE"])
-def _fake_dequeue(
- monkeypatch,
- msg_payload={"status": "SUCCESS", "foo": "bar"},
- msg_type=20001,
- msg_state=0,
- jsonb=True,
- unexpected_first=True,
-):
+def _fake_dequeue(_filter_type):
"""
- generates a mock rmr message response (returns a function that does; uses closures to set params)
+ for monkeypatching a1rmnr.dequeue_all_messages
"""
- new_messages = []
- # stick a message we don't want at the front of the queue, then stick the message we want
- if unexpected_first:
- monkeypatch.setattr("rmr.rmr.rmr_torcv_msg", rmr_mocks.rcv_mock_generator(msg_payload, -1, msg_state, jsonb))
- sbuf = rmr.rmr_alloc_msg(None, None)
- sbuf = rmr.rmr_torcv_msg(None, sbuf, None)
- summary = rmr.message_summary(sbuf)
- new_messages.append(summary)
-
- monkeypatch.setattr("rmr.rmr.rmr_torcv_msg", rmr_mocks.rcv_mock_generator(msg_payload, msg_type, msg_state, jsonb))
- sbuf = rmr.rmr_alloc_msg(None, None)
- sbuf = rmr.rmr_torcv_msg(None, sbuf, None)
- summary = rmr.message_summary(sbuf)
- new_messages.append(summary)
-
- def f():
- return new_messages
-
- return f
+ fake_msg = {}
+ pay = b'{"policy_type_id": 20000, "policy_instance_id": "admission_control_policy", "handler_id": "test_receiver", "status": "OK"}'
+ fake_msg["payload"] = pay
+ new_messages = [fake_msg]
+ return new_messages
def _test_put_patch(monkeypatch):
# Actual Tests
-def test_policy_get(client, monkeypatch):
- """
- test policy GET
- """
- _test_put_patch(monkeypatch)
- monkeypatch.setattr(
- "a1.a1rmr._dequeue_all_waiting_messages",
- _fake_dequeue(monkeypatch, msg_payload={"GET ack": "pretend policy is here"}, msg_type=20003),
- )
- res = client.get("/a1-p/policies/admission_control_policy")
- assert res.status_code == 200
- assert res.json == {"GET ack": "pretend policy is here"}
-
-
-def test_policy_get_unsupported(client, monkeypatch):
- """
- test policy GET
- """
- testing_helpers.patch_all(monkeypatch, nofetch=True)
- res = client.get("/a1-p/policies/admission_control_policy")
- assert res.status_code == 400
- assert res.data == b'"POLICY DOES NOT SUPPORT FETCHING"\n'
-
-
+# def test_policy_get(client, monkeypatch):
+# """
+# test policy GET
+# """
+# _test_put_patch(monkeypatch)
+# monkeypatch.setattr(
+# "a1.a1rmr.dequeue_all_waiting_messages",
+# _fake_dequeue(monkeypatch, msg_payload={"GET ack": "pretend policy is here"}, msg_type=20003),
+# )
+# res = client.get("/a1-p/policies/admission_control_policy")
+# assert res.status_code == 200
+# assert res.json == {"GET ack": "pretend policy is here"}
+#
+#
+# def test_policy_get_unsupported(client, monkeypatch):
+# """
+# test policy GET
+# """
+# testing_helpers.patch_all(monkeypatch, nofetch=True)
+# res = client.get("/a1-p/policies/admission_control_policy")
+# assert res.status_code == 400
+# assert res.data == b'"POLICY DOES NOT SUPPORT FETCHING"\n'
+#
+#
def test_xapp_put_good(client, monkeypatch):
""" test policy put good"""
- _test_put_patch(monkeypatch)
- monkeypatch.setattr("a1.a1rmr._dequeue_all_waiting_messages", _fake_dequeue(monkeypatch))
- res = client.put("/a1-p/policies/admission_control_policy", json=testing_helpers.good_payload())
- assert res.status_code == 200
- assert res.json == {"status": "SUCCESS", "foo": "bar"}
+ # nothing there yet
+ res = client.get(ADM_CTRL_INSTANCE)
+ assert res.status_code == 404
+ res = client.get(ADM_CTRL_INSTANCE_STATUS)
+ assert res.status_code == 404
-def test_xapp_put_bad(client, monkeypatch):
- """Test policy put fails"""
+ # create a good instance
_test_put_patch(monkeypatch)
- # return from policy handler has a status indicating FAIL
- monkeypatch.setattr(
- "a1.a1rmr._dequeue_all_waiting_messages", _fake_dequeue(monkeypatch, msg_payload={"status": "FAIL", "foo": "bar"})
- )
- res = client.put("/a1-p/policies/admission_control_policy", json=testing_helpers.good_payload())
- assert res.status_code == 502
- assert res.json["reason"] == "BAD STATUS"
- assert res.json["return_payload"] == {"status": "FAIL", "foo": "bar"}
-
- # return from policy handler has no status field
- monkeypatch.setattr("a1.a1rmr._dequeue_all_waiting_messages", _fake_dequeue(monkeypatch, msg_payload={"foo": "bar"}))
- res = client.put("/a1-p/policies/admission_control_policy", json=testing_helpers.good_payload())
- assert res.status_code == 502
- assert res.json["reason"] == "NO STATUS"
- assert res.json["return_payload"] == {"foo": "bar"}
-
- # return from policy handler not a json
- monkeypatch.setattr(
- "a1.a1rmr._dequeue_all_waiting_messages", _fake_dequeue(monkeypatch, msg_payload="booger", jsonb=False)
- )
- res = client.put("/a1-p/policies/admission_control_policy", json=testing_helpers.good_payload())
- assert res.status_code == 502
- assert res.json["reason"] == "NOT JSON"
- assert res.json["return_payload"] == "booger"
-
- # bad type
- monkeypatch.setattr("a1.a1rmr._dequeue_all_waiting_messages", _fake_dequeue(monkeypatch, msg_type=666))
- res = client.put("/a1-p/policies/admission_control_policy", json=testing_helpers.good_payload())
- assert res.status_code == 504
- assert res.data == b"\"A1 was expecting an ACK back but it didn't receive one or didn't recieve the expected ACK\"\n"
-
- # bad state
- monkeypatch.setattr("a1.a1rmr._dequeue_all_waiting_messages", _fake_dequeue(monkeypatch, msg_state=666))
- res = client.put("/a1-p/policies/admission_control_policy", json=testing_helpers.good_payload())
- assert res.status_code == 504
- assert res.data == b"\"A1 was expecting an ACK back but it didn't receive one or didn't recieve the expected ACK\"\n"
-
-
-def test_xapp_put_bad_send(client, monkeypatch):
- """
- Test bad send failures
- """
- testing_helpers.patch_all(monkeypatch)
+ res = client.put(ADM_CTRL_INSTANCE, json=testing_helpers.good_payload())
+ assert res.status_code == 201
- monkeypatch.setattr("a1.a1rmr._dequeue_all_waiting_messages", _fake_dequeue(monkeypatch))
- res = client.put("/a1-p/policies/admission_control_policy", json={"not": "expected"})
- assert res.status_code == 400
+ # get the instance
+ res = client.get(ADM_CTRL_INSTANCE)
+ assert res.status_code == 200
+ assert res.json == testing_helpers.good_payload()
+
+ # get the instance status
+ monkeypatch.setattr("a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue)
+ res = client.get(ADM_CTRL_INSTANCE_STATUS)
+ assert res.status_code == 200
+ assert res.json == [{"handler_id": "test_receiver", "status": "OK"}]
+ # assert that rmr bad states don't cause problems
monkeypatch.setattr("rmr.rmr.rmr_send_msg", rmr_mocks.send_mock_generator(10))
- res = client.put("/a1-p/policies/admission_control_policy", json=testing_helpers.good_payload())
- assert res.status_code == 504
- assert res.data == b'"A1 was unable to send a needed message to a downstream subscriber"\n'
+ res = client.put(ADM_CTRL_INSTANCE, json=testing_helpers.good_payload())
+ assert res.status_code == 201
monkeypatch.setattr("rmr.rmr.rmr_send_msg", rmr_mocks.send_mock_generator(5))
- res = client.put("/a1-p/policies/admission_control_policy", json=testing_helpers.good_payload())
- assert res.status_code == 504
- assert res.data == b'"A1 was unable to send a needed message to a downstream subscriber"\n'
+ res = client.put(ADM_CTRL_INSTANCE, json=testing_helpers.good_payload())
+ assert res.status_code == 201
+#
+#
+# def test_xapp_put_bad(client, monkeypatch):
+# """Test policy put fails"""
+# _test_put_patch(monkeypatch)
+# # return from policy handler has a status indicating FAIL
+# monkeypatch.setattr(
+# "a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue(monkeypatch, msg_payload={"status": "FAIL", "foo": "bar"})
+# )
+# res = client.put("/a1-p/policies/admission_control_policy", json=testing_helpers.good_payload())
+# assert res.status_code == 502
+# assert res.json["reason"] == "BAD STATUS"
+# assert res.json["return_payload"] == {"status": "FAIL", "foo": "bar"}
+#
+# # return from policy handler has no status field
+# monkeypatch.setattr("a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue(monkeypatch, msg_payload={"foo": "bar"}))
+# res = client.put("/a1-p/policies/admission_control_policy", json=testing_helpers.good_payload())
+# assert res.status_code == 502
+# assert res.json["reason"] == "NO STATUS"
+# assert res.json["return_payload"] == {"foo": "bar"}
+#
+# # return from policy handler not a json
+# monkeypatch.setattr(
+# "a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue(monkeypatch, msg_payload="booger", jsonb=False)
+# )
+# res = client.put("/a1-p/policies/admission_control_policy", json=testing_helpers.good_payload())
+# assert res.status_code == 502
+# assert res.json["reason"] == "NOT JSON"
+# assert res.json["return_payload"] == "booger"
+#
+# # bad type
+# monkeypatch.setattr("a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue(monkeypatch, msg_type=666))
+# res = client.put("/a1-p/policies/admission_control_policy", json=testing_helpers.good_payload())
+# assert res.status_code == 504
+# assert res.data == b"\"A1 was expecting an ACK back but it didn't receive one or didn't recieve the expected ACK\"\n"
+#
+# # bad state
+# monkeypatch.setattr("a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue(monkeypatch, msg_state=666))
+# res = client.put("/a1-p/policies/admission_control_policy", json=testing_helpers.good_payload())
+# assert res.status_code == 504
+# assert res.data == b"\"A1 was expecting an ACK back but it didn't receive one or didn't recieve the expected ACK\"\n"
+#
+#
def test_bad_requests(client, monkeypatch):
- """Test bad requests"""
+ """
+ Test bad send failures
+ """
testing_helpers.patch_all(monkeypatch)
-
- # test a 404
- res = client.put("/a1-p/policies/noexist", json=testing_helpers.good_payload())
- assert res.status_code == 404
+ res = client.put(ADM_CTRL_INSTANCE, json={"not": "expected"})
+ assert res.status_code == 400
# bad media type
- res = client.put("/a1-p/policies/admission_control_policy", data="notajson")
+ res = client.put(ADM_CTRL_INSTANCE, data="notajson")
assert res.status_code == 415
# test a PUT body against a poliucy not expecting one
- res = client.put("/a1-p/policies/test_policy", json=testing_helpers.good_payload())
+ res = client.put("/a1-p/policytypes/20001/policies/test_policy", json=testing_helpers.good_payload())
assert res.status_code == 400
assert res.data == b'"BODY SUPPLIED BUT POLICY HAS NO EXPECTED BODY"\n'
-def test_missing_manifest(client, monkeypatch):
- """
- test that we get a 500 with an approrpiate message on a missing manifest
- """
-
- def f():
- raise exceptions.MissingManifest()
-
- monkeypatch.setattr("a1.utils.get_ric_manifest", f)
-
- res = client.put("/a1-p/policies/admission_control_policy", json=testing_helpers.good_payload())
- assert res.status_code == 500
- assert res.data == b'"A1 was unable to find the required RIC manifest. report this!"\n'
+# def test_bad_requests(client, monkeypatch):
+# """Test bad requests"""
+# testing_helpers.patch_all(monkeypatch)
+#
+# # test a 404
+# res = client.put("/a1-p/policies/noexist", json=testing_helpers.good_payload())
+# assert res.status_code == 404
-def test_missing_rmr(client, monkeypatch):
- """
- test that we get a 500 with an approrpiate message on a missing rmr rmr_string
- """
- testing_helpers.patch_all(monkeypatch, nonexisting_rmr=True)
- res = client.put("/a1-p/policies/admission_control_policy", json=testing_helpers.good_payload())
- assert res.status_code == 500
- assert res.data == b'"A1 does not have a mapping for the desired rmr string. report this!"\n'
+# def test_missing_manifest(client, monkeypatch):
+# """
+# test that we get a 500 with an approrpiate message on a missing manifest
+# """
+#
+# def f():
+# raise exceptions.MissingManifest()
+#
+# monkeypatch.setattr("a1.utils.get_ric_manifest", f)
+#
+# res = client.put("/a1-p/policies/admission_control_policy", json=testing_helpers.good_payload())
+# assert res.status_code == 500
+# assert res.data == b'"A1 was unable to find the required RIC manifest. report this!"\n'
+#
+#
def test_healthcheck(client):
utils.get_ric_manifest()
-def test_bad_get_rmr_mapping(monkeypatch):
- """
- testing missing rmr mapping
- """
-
- def badopen(filename, mode):
- raise FileNotFoundError()
-
- monkeypatch.setattr("builtins.open", badopen)
- with pytest.raises(exceptions.MissingRmrMapping):
- utils._get_rmr_mapping_table()
-
-
def test_good_get_ric_manifest(monkeypatch):
"""
test get_ric_manifest
utils.get_ric_manifest()
-def test_good_get_rmr_mapping(monkeypatch):
- """
- testing getting the ric maping table
- """
- testing_helpers.patch_all(monkeypatch)
- utils._get_rmr_mapping_table()
-
-
def test_validate(monkeypatch):
"""
test json validation wrapper
monkeypatch.setattr("a1.utils.get_ric_manifest", lambda: man)
- # patch rmr mapping
- mapping = open(_get_fixture_path("rmr_string_int_mapping.txt"), "r").readlines()
- monkeypatch.setattr("a1.utils._get_rmr_mapping_table", lambda: mapping)
-
def good_payload():
return {"enforce": True, "window_length": 10, "blocking_rate": 20, "trigger_threshold": 10}
echo "running tavern"
pytest
echo "running ab"
- ab -n 100 -c 10 -u putdata -T application/json http://localhost:10000/ric/policies/admission_control_policy
+ ab -n 100 -c 10 -u putdata -T application/json http://localhost:10000/a1-p/policytypes/20000/policies/admission_control_policy
commands_post=
helm delete testreceiver
helm del --purge testreceiver
commands = flake8 setup.py a1 tests
[flake8]
-extend-ignore = E501
+extend-ignore = E501,E741