# a1 stage 2
FROM python:3.7-alpine
+# dir that rmr routing file temp goes into
+RUN mkdir -p /opt/route/
+
+# Gevent needs gcc
+RUN apk update && apk add bash gcc musl-dev
+
+# Speed hack; we install gevent here because when building repeatedly (eg during dev) and only changing a1 code,
+# we do not need to keep compiling gevent which takes forever
+RUN pip install --upgrade pip && pip install gevent
+
# copies
COPY --from=0 /usr/local/lib64/libnng.so /usr/local/lib64/libnng.so
COPY --from=0 /usr/local/lib64/librmr_nng.so /usr/local/lib64/librmr_nng.so
COPY a1/ /tmp/a1
-COPY tests/ /tmp/tests
COPY setup.py tox.ini /tmp/
WORKDIR /tmp
-# dir that rmr routing file temp goes into
-RUN mkdir -p /opt/route/
-
-# Gevent needs gcc
-RUN apk update && apk add bash gcc musl-dev
-
# do the actual install; this writes into /usr/local, need root
-RUN pip install --upgrade pip && pip install .
+RUN pip install .
# Switch to a non-root user for security reasons.
# a1 does not currently write into any dirs so no chowns are needed at this time.
ENV A1USER a1user
-RUN addgroup -S $A1USER && adduser -S -G $A1USER $A1USER
+RUN addgroup -S $A1USER && adduser -S -G $A1USER $A1USER
USER $A1USER
# misc setups
EXPOSE 10000
ENV LD_LIBRARY_PATH /usr/local/lib/:/usr/local/lib64
ENV RMR_SEED_RT /opt/route/local.rt
-
# dont buffer logging
ENV PYTHONUNBUFFERED 1
+"""
+Main a1 controller
+"""
# ==================================================================================
# Copyright (c) 2019 Nokia
# Copyright (c) 2018-2019 AT&T Intellectual Property.
import json
from flask import Response
from jsonschema import validate
-import connexion
from jsonschema.exceptions import ValidationError
+import connexion
from a1 import get_module_logger
from a1 import a1rmr, exceptions, data
"""
try:
return func()
- except ValidationError as exc:
+ except (ValidationError, exceptions.PolicyTypeAlreadyExists) as exc:
logger.exception(exc)
return "", 400
- except exceptions.PolicyTypeAlreadyExists as exc:
- logger.exception(exc)
- return "", 400
- except exceptions.PolicyTypeNotFound as exc:
+ except (exceptions.PolicyTypeNotFound, exceptions.PolicyInstanceNotFound) as exc:
logger.exception(exc)
return "", 404
- except exceptions.PolicyInstanceNotFound as exc:
- logger.exception(exc)
- return "", 404
- except exceptions.MissingManifest as exc:
- logger.exception(exc)
- return "A1 was unable to find the required RIC manifest. report this!", 500
- except exceptions.MissingRmrString as exc:
- logger.exception(exc)
- return "A1 does not have a mapping for the desired rmr string. report this!", 500
except BaseException as exc:
# catch all, should never happen...
logger.exception(exc)
return Response(status=500)
+def _gen_body_to_handler(operation, policy_type_id, policy_instance_id, payload=None):
+ """
+ used to create the payloads that get sent to downstream policy handlers
+ """
+ return {
+ "operation": operation,
+ "policy_type_id": policy_type_id,
+ "policy_instance_id": policy_instance_id,
+ "payload": payload,
+ }
+
+
# Healthcheck
"""
Handles GET /a1-p/policytypes
"""
- return _try_func_return(lambda: data.get_type_list())
+ return _try_func_return(data.get_type_list)
def create_policy_type(policy_type_id):
Handles PUT /a1-p/policytypes/policy_type_id
"""
- def _put_type_handler(policy_type_id, body):
+ def put_type_handler():
data.store_policy_type(policy_type_id, body)
return "", 201
body = connexion.request.json
- return _try_func_return(lambda: _put_type_handler(policy_type_id, body))
+ return _try_func_return(put_type_handler)
def get_policy_type(policy_type_id):
"""
Handles DELETE /a1-p/policytypes/policy_type_id
"""
+ logger.error(policy_type_id)
return "", 501
"""
Handles GET /a1-p/policytypes/policy_type_id/policies
"""
- return _try_func_return(lambda: data.get_instance_list(policy_type_id))
+
+ def get_all_instance_handler():
+ # try to clean up instances for this type
+ for policy_instance_id in data.get_instance_list(policy_type_id):
+ data.delete_policy_instance_if_applicable(policy_type_id, policy_instance_id)
+
+ # re-fetch this list as it may have changed
+ return data.get_instance_list(policy_type_id), 200
+
+ return _try_func_return(get_all_instance_handler)
def get_policy_instance(policy_type_id, policy_instance_id):
"""
Handles GET /a1-p/policytypes/polidyid/policies/policy_instance_id
"""
- # 200 is automatic here
- return _try_func_return(lambda: data.get_policy_instance(policy_type_id, policy_instance_id))
+
+ def get_instance_handler():
+ # delete if applicable (will raise if not applicable to begin with)
+ data.delete_policy_instance_if_applicable(policy_type_id, policy_instance_id)
+
+ # raise 404 now that we may have deleted, or get the instance otherwise
+ return data.get_policy_instance(policy_type_id, policy_instance_id), 200
+
+ return _try_func_return(get_instance_handler)
def get_policy_instance_status(policy_type_id, policy_instance_id):
"""
Handles GET /a1-p/policytypes/polidyid/policies/policy_instance_id/status
- """
-
- def _get_status_handler(policy_type_id, policy_instance_id):
- """
- Pop trough A1s mailbox, insert the latest status updates into the database, and then return the status vector
-
- NOTE: this is done lazily. Meaning, when someone performs a GET on this API, we pop through a1s mailbox.
- THis may not work in the future if there are "thousands" of policy acknowledgements that hit a1 before this is called,
- because the rmr mailbox may fill. However, in the near term, we do not expect this to happen.
- """
- # check validity to 404 first:
- data.type_is_valid(policy_type_id)
- data.instance_is_valid(policy_type_id, policy_instance_id)
- # pop a1s mailbox, looking for policy notifications
- new_messages = a1rmr.dequeue_all_waiting_messages(21024)
+ Return the aggregated status. The order of rules is as follows:
+ 1. If a1 has received at least one status, and *all* received statuses are "DELETED", we blow away the instance and return a 404
+ 2. if a1 has received at least one status and at least one is OK, we return "IN EFFECT"
+ 3. "NOT IN EFFECT" otherwise (no statuses, or none are OK but not all are deleted)
+ """
- # try to parse the messages as responses. Drop those that are malformed
- for msg in new_messages:
- # note, we don't use the parameters "policy_type_id, policy_instance" from above here,
- # because we are popping the whole mailbox, which might include other statuses
- pay = json.loads(msg["payload"])
- if "policy_type_id" in pay and "policy_instance_id" in pay and "handler_id" in pay and "status" in pay:
- data.set_policy_instance_status(
- pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]
- )
- else:
- logger.debug("Dropping message")
- logger.debug(pay)
+ def get_status_handler():
+ # delete if applicable (will raise if not applicable to begin with)
+ data.delete_policy_instance_if_applicable(policy_type_id, policy_instance_id)
- # return the status vector
- return data.get_policy_instance_statuses(policy_type_id, policy_instance_id)
+ vector = data.get_policy_instance_statuses(policy_type_id, policy_instance_id)
+ for i in vector:
+ if i == "OK":
+ return "IN EFFECT", 200
+ return "NOT IN EFFECT", 200
- return _try_func_return(lambda: _get_status_handler(policy_type_id, policy_instance_id))
+ return _try_func_return(get_status_handler)
def create_or_replace_policy_instance(policy_type_id, policy_instance_id):
"""
Handles PUT /a1-p/policytypes/polidyid/policies/policy_instance_id
"""
+ instance = connexion.request.json
- def _put_instance_handler(policy_type_id, policy_instance_id, instance):
+ def put_instance_handler():
"""
Handles policy instance put
# store the instance
data.store_policy_instance(policy_type_id, policy_instance_id, instance)
- body = {
- "operation": "CREATE",
- "policy_type_id": policy_type_id,
- "policy_instance_id": policy_instance_id,
- "payload": instance,
- }
-
# send rmr (best effort)
+ body = _gen_body_to_handler("CREATE", policy_type_id, policy_instance_id, payload=instance)
a1rmr.send(json.dumps(body), message_type=policy_type_id)
- return "", 201
+ return "", 202
- instance = connexion.request.json
- return _try_func_return(lambda: _put_instance_handler(policy_type_id, policy_instance_id, instance))
+ return _try_func_return(put_instance_handler)
def delete_policy_instance(policy_type_id, policy_instance_id):
"""
Handles DELETE /a1-p/policytypes/polidyid/policies/policy_instance_id
"""
- return "", 501
+
+ def delete_instance_handler():
+ """
+ here we send out the DELETEs but we don't delete the instance until a GET is called where we check the statuses
+ """
+ # send rmr (best effort)
+ body = _gen_body_to_handler("DELETE", policy_type_id, policy_instance_id)
+ a1rmr.send(json.dumps(body), message_type=policy_type_id)
+
+ return "", 202
+
+ return _try_func_return(delete_instance_handler)
"""
from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound, PolicyTypeAlreadyExists
from a1 import get_module_logger
+from a1 import a1rmr
+import json
logger = get_module_logger(__name__)
H = "handlers"
D = "data"
+
# Types
POLICY_DATA[policy_type_id][I][policy_instance_id][H] = {}
+def delete_policy_instance_if_applicable(policy_type_id, policy_instance_id):
+ """
+ delete a policy instance if all known statuses are DELETED
+
+ pops a1s waiting mailbox
+ """
+ # pop through a1s mailbox, updating a1s db of all policy statuses
+ for msg in a1rmr.dequeue_all_waiting_messages(21024):
+ # try to parse the messages as responses. Drop those that are malformed
+ # NOTE: we don't use the parameters "policy_type_id, policy_instance" from above here,
+ # because we are popping the whole mailbox, which might include other statuses
+ pay = json.loads(msg["payload"])
+ if "policy_type_id" in pay and "policy_instance_id" in pay and "handler_id" in pay and "status" in pay:
+ set_policy_instance_status(pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"])
+ else:
+ logger.debug("Dropping message")
+ logger.debug(pay)
+
+ # raise if not valid
+ instance_is_valid(policy_type_id, policy_instance_id)
+
+ # see if we can delete
+ vector = get_policy_instance_statuses(policy_type_id, policy_instance_id)
+ if vector != []:
+ all_deleted = True
+ for i in vector:
+ if i != "DELETED":
+ all_deleted = False
+ break # have at least one not DELETED, do nothing
+
+ # blow away from a1 db
+ if all_deleted:
+ del POLICY_DATA[policy_type_id][I][policy_instance_id]
+
+
def get_policy_instance(policy_type_id, policy_instance_id):
"""
Retrieve a policy instance
"""
- type_is_valid(policy_type_id)
instance_is_valid(policy_type_id, policy_instance_id)
return POLICY_DATA[policy_type_id][I][policy_instance_id][D]
"""
Retrieve the status vector for a policy instance
"""
- type_is_valid(policy_type_id)
instance_is_valid(policy_type_id, policy_instance_id)
- return [{"handler_id": k, "status": v} for k, v in POLICY_DATA[policy_type_id][I][policy_instance_id][H].items()]
+ return [v for _, v in POLICY_DATA[policy_type_id][I][policy_instance_id][H].items()]
def set_policy_instance_status(policy_type_id, policy_instance_id, handler_id, status):
"""
Update the status of a handler id of a policy instance
"""
- type_is_valid(policy_type_id)
instance_is_valid(policy_type_id, policy_instance_id)
POLICY_DATA[policy_type_id][I][policy_instance_id][H][handler_id] = status
class PolicyTypeAlreadyExists(BaseException):
"""a policy type already exists and replace not supported at this time"""
-
-
-class MissingRmrString(BaseException):
- pass
-
-
-class MissingManifest(BaseException):
- pass
-
-
-class MissingRmrMapping(BaseException):
- pass
- A1 Mediator
operationId: a1.controller.delete_policy_instance
responses:
- '204':
+ '202':
description: >
- policy instance successfully deleted
+ policy instance deletion initiated
'404':
description: >
there is no policy instance with this policy_instance_id
trigger_threshold: 10
responses:
- '201':
+ '202':
description: >
- Policy instance created
+ Policy instance creation initiated
'400':
description: >
Bad PUT body for this policy instance
get:
description: >
Retrieve the policy instance status across all handlers of the policy
-
+ If this endpoint returns successfully (200), it is either IN EFFECT or NOT IN EFFECT.
+ IN EFFECT is returned if at least one policy handler in the RIC is implementing the policy
+ NOT IN EFFECT is returned otherwise
+ If a policy instance is successfully deleted, this endpoint will return a 404 (not a 200)
tags:
- A1 Mediator
operationId: a1.controller.get_policy_instance_status
responses:
'200':
description: >
- The policy instance status.
- Returns a vector of statuses, where each contains a handler_id (opaque id of a RIC component that implements this policy) and the policy status as returned by that handler
+ successfully retrieved the status
content:
- application/json:
+ text/plain:
schema:
- type: array
- items:
- type: object
- properties:
- handler_id:
- type: string
- status:
- type: string
- example:
- [{"handler_id": "1234-5678", "status" : "OK"}, {"handler_id": "abc-def", "status" : "NOT IMPLEMENTED"}]
+ type: string
+ enum:
+ - IN EFFECT
+ - NOT IN EFFECT
'404':
description: >
there is no policy instance with this policy_instance_id or there is no policy type with this policy_type_id
-
components:
schemas:
policy_type_schema:
description: >
the status of this policy instance in this handler
type: string
+ enum:
+ - OK
+ - ERROR
+ - DELETED
example:
policy_type_id: 12345678
policy_instance_id: 3d2157af-6a8f-4a7c-810f-38c2f824bf12
# The Jenkins job uses this string for the tag in the image name
# for example nexus3.o-ran-sc.org:10004/my-image-name:my-tag
---
-tag: 0.13.0-NOT_FOR_USE_YET
+tag: 0.14.0-NOT_FOR_USE_YET
* Release 1.0.0 will be the Release A version of A1
+[0.14.0] - 10/1/2019
+::
+
+ * Implement instance delete
+ * Moves away from the status vector and now aggregates statuses
+ * Pop through a1s mailbox "3x as often"; on all 3 kinds of instance GET since all such calls want the latest information
+ * Misc cleanups in controller (closures ftw)
+ * Add rmr-version.yaml for CICD jobs
+
[0.13.0] - 9/25/2019
::
#!/bin/sh
-git clone https://gerrit.oran-osc.org/r/ric-plt/lib/rmr \
+git clone --branch 1.8.1 https://gerrit.oran-osc.org/r/ric-plt/lib/rmr \
&& cd rmr \
&& mkdir .build; cd .build; cmake .. -DPACK_EXTERNALS=1; sudo make install \
&& cd ../.. \
apiVersion: v1
description: A1 Helm chart for Kubernetes
name: a1mediator
-version: 0.13.0
+version: 0.14.0
received_payload = json.loads(summary["payload"])
+ op = received_payload["operation"]
+ send_payload_status = "ERROR"
+ if op == "CREATE":
+ send_payload_status = "OK"
+ elif op == "DELETE":
+ send_payload_status = "DELETED"
+
payload = {
"policy_type_id": received_payload["policy_type_id"],
"policy_instance_id": received_payload["policy_instance_id"],
"handler_id": HANDLER_ID,
- "status": "OK",
+ "status": send_payload_status,
}
val = json.dumps(payload).encode("utf-8")
- trigger_threshold
- window_length
additionalProperties: false
+ response:
+ status_code: 201
- name: type there now
request:
response:
status_code: 404
- - name: put the admission control policy
+ # PUT the instance and make sure subsequent GETs return properly
+ - name: put the admission control policy instance
request:
url: http://localhost:10000/a1-p/policytypes/20000/policies/admission_control_policy
method: PUT
headers:
content-type: application/json
response:
- status_code: 201
+ status_code: 202
- name: test the admission control policy get
request:
method: GET
response:
status_code: 200
- body:
- - handler_id: test_receiver
- status: OK
+ # tavern doesn't yet let you check string statuses!!!
- name: instance list 200 and contains the instance
request:
body:
- admission_control_policy
+ # DELETE the instance and make sure subsequent GETs return properly
+ - name: delete the instance
+ request:
+ url: http://localhost:10000/a1-p/policytypes/20000/policies/admission_control_policy
+ method: DELETE
+ response:
+ status_code: 202
+ - name: instance list 200 but no instance
+ request:
+ url: http://localhost:10000/a1-p/policytypes/20000/policies
+ method: GET
+ response:
+ status_code: 200
+ body: []
+
+ - name: cant get instance status
+ delay_before: 3 # give it a few seconds for rmr
+ request:
+ url: http://localhost:10000/a1-p/policytypes/20000/policies/admission_control_policy/status
+ method: GET
+ response:
+ status_code: 404
+
+ - name: cant get instance
+ request:
+ url: http://localhost:10000/a1-p/policytypes/20000/policies/admission_control_policy
+ method: GET
+ response:
+ status_code: 404
---
required:
- test
additionalProperties: false
+ response:
+ status_code: 201
- name: type there now
request:
headers:
content-type: application/json
response:
- status_code: 201
+ status_code: 202
- name: test the delay policy get
request:
method: GET
response:
status_code: 200
- body:
- - handler_id: delay_receiver
- status: OK
+ # tavern doesn't let you check non json yet!
- name: instance list 200 and there
request:
headers:
content-type: application/json
response:
- status_code: 201
+ status_code: 202
- name: should be no status
delay_before: 5 # give it a few seconds for rmr ; delay reciever sleeps for 5 seconds by default
--- /dev/null
+# CI script installs RMR from PackageCloud using this version
+---
+version: 1.8.1
setup(
name="a1",
- version="0.13.0",
+ version="0.14.0",
packages=find_packages(exclude=["tests.*", "tests"]),
author="Tommy Carpenter",
description="RIC A1 Mediator for policy/intent changes",
def _fake_dequeue(_filter_type):
"""
- for monkeypatching a1rmnr.dequeue_all_messages
+ for monkeypatching a1rmnr.dequeue_all_messages with a good status
"""
fake_msg = {}
pay = b'{"policy_type_id": 20000, "policy_instance_id": "admission_control_policy", "handler_id": "test_receiver", "status": "OK"}'
return new_messages
+def _fake_dequeue_none(_filter_type):
+ """
+ for monkeypatching a1rmnr.dequeue_all_messages with no waiting messages
+ """
+ return []
+
+
+def _fake_dequeue_deleted(_filter_type):
+ """
+ for monkeypatching a1rmnr.dequeue_all_messages with a DELETED status
+ """
+ fake_msg = {}
+ pay = b'{"policy_type_id": 20000, "policy_instance_id": "admission_control_policy", "handler_id": "test_receiver", "status": "DELETED"}'
+ fake_msg["payload"] = pay
+ new_messages = [fake_msg]
+ return new_messages
+
+
def _test_put_patch(monkeypatch):
rmr_mocks.patch_rmr(monkeypatch)
monkeypatch.setattr("rmr.rmr.rmr_send_msg", rmr_mocks.send_mock_generator(0)) # good sends for this whole batch
assert res.json == []
# instance 404 because type not there yet
+ monkeypatch.setattr("a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue_none)
res = client.get(ADM_CTRL_POLICIES)
assert res.status_code == 404
# create a good instance
_test_put_patch(monkeypatch)
res = client.put(ADM_CTRL_INSTANCE, json=adm_instance_good)
- assert res.status_code == 201
+ assert res.status_code == 202
# instance 200 and in list
res = client.get(ADM_CTRL_POLICIES)
assert res.status_code == 200
assert res.json == [ADM_CTRL]
- # get the instance
- res = client.get(ADM_CTRL_INSTANCE)
- assert res.status_code == 200
- assert res.json == adm_instance_good
+ def get_instance_good(expected):
+ # get the instance
+ res = client.get(ADM_CTRL_INSTANCE)
+ assert res.status_code == 200
+ assert res.json == adm_instance_good
+
+ # get the instance status
+ res = client.get(ADM_CTRL_INSTANCE_STATUS)
+ assert res.status_code == 200
+ assert res.get_data(as_text=True) == expected
- # get the instance status
+ # try a status get but pretend we didn't get any ACKs yet to test NOT IN EFFECT
+ monkeypatch.setattr("a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue_none)
+ get_instance_good("NOT IN EFFECT")
+
+ # now pretend we did get a good ACK
monkeypatch.setattr("a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue)
- res = client.get(ADM_CTRL_INSTANCE_STATUS)
+ get_instance_good("IN EFFECT")
+
+ # delete it
+ res = client.delete(ADM_CTRL_INSTANCE)
+ assert res.status_code == 202
+ res = client.delete(ADM_CTRL_INSTANCE) # should be able to do multiple deletes
+ assert res.status_code == 202
+
+ # status after a delete, but there are no messages yet, should still return
+ monkeypatch.setattr("a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue)
+ get_instance_good("IN EFFECT")
+
+ # now pretend we deleted successfully
+ monkeypatch.setattr("a1.a1rmr.dequeue_all_waiting_messages", _fake_dequeue_deleted)
+ res = client.get(ADM_CTRL_INSTANCE_STATUS) # cant get status
+ assert res.status_code == 404
+ res = client.get(ADM_CTRL_INSTANCE) # cant get instance
+ assert res.status_code == 404
+ # list still 200 but no instance
+ res = client.get(ADM_CTRL_POLICIES)
assert res.status_code == 200
- assert res.json == [{"handler_id": "test_receiver", "status": "OK"}]
+ assert res.json == []
- # assert that rmr bad states don't cause problems
+
+def test_xapp_put_good_bad_rmr(client, monkeypatch, adm_instance_good):
+ """
+ assert that rmr bad states don't cause problems
+ """
+ _test_put_patch(monkeypatch)
monkeypatch.setattr("rmr.rmr.rmr_send_msg", rmr_mocks.send_mock_generator(10))
res = client.put(ADM_CTRL_INSTANCE, json=adm_instance_good)
- assert res.status_code == 201
+ assert res.status_code == 202
monkeypatch.setattr("rmr.rmr.rmr_send_msg", rmr_mocks.send_mock_generator(5))
res = client.put(ADM_CTRL_INSTANCE, json=adm_instance_good)
- assert res.status_code == 201
+ assert res.status_code == 202
def test_bad_instances(client, monkeypatch, adm_type_good):
# run apache bench
ab -n 100 -c 10 -u putdata -T application/json http://localhost:10000/a1-p/policytypes/20000/policies/admission_control_policy
# echo "log collection"
-#integration_tests/getlogs.sh
+# integration_tests/getlogs.sh
commands_post=
echo "teardown"
helm delete testreceiver