a1s rmr functionality
"""
# ==================================================================================
-# Copyright (c) 2019 Nokia
-# Copyright (c) 2018-2019 AT&T Intellectual Property.
+# Copyright (c) 2019-2020 Nokia
+# Copyright (c) 2018-2020 AT&T Intellectual Property.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
RETRY_TIMES = int(os.environ.get("A1_RMR_RETRY_TIMES", 4))
-
-
A1_POLICY_REQUEST = 20010
A1_POLICY_RESPONSE = 20011
A1_POLICY_QUERY = 20012
self.keep_going = True
self.rcv_func = None
self.last_ran = time.time()
+
+ # see docs/overview#resiliency for a discussion of this
self.instance_send_queue = queue.Queue() # thread safe queue https://docs.python.org/3/library/queue.html
# intialize rmr context
1. whether the a1 webserver is up (if it isn't, this won't even be called, so even entering this function confirms it is)
2. checks whether the rmr thread is running and has completed a loop recently
TODO: make "seconds" to pass in a configurable parameter?
+ TODO: I've requested that SDL provide a "I'm connected to the backend" healthcheck that can be integrated here
"""
if a1rmr.healthcheck_rmr_thread():
return "", 200
"""
def delete_instance_handler():
- """
- here we send out the DELETEs but we don't delete the instance until a GET is called where we check the statuses
- We also set the status as deleted which would be reflected in a GET to ../status (before the DELETE completes)
- """
data.delete_policy_instance(policy_type_id, policy_instance_id)
# queue rmr send (best effort)
"""
Represents A1s database and database access functions.
-In the future, this may change to use a different backend, possibly dramatically.
-Hopefully, the access functions are a good api so nothing else has to change when this happens
-
-For now, the database is in memory.
-We use dict data structures (KV) with the expectation of having to move this into Redis
"""
# ==================================================================================
# Copyright (c) 2019-2020 Nokia
from threading import Thread
import msgpack
from mdclogpy import Logger
-
from ricsdl.syncstorage import SyncStorage
-
from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound, PolicyTypeAlreadyExists, CantDeleteNonEmptyType
mdc_logger = Logger(name=__name__)
def delete_policy_instance(policy_type_id, policy_instance_id):
"""
- initially sets has_been_deleted
+ initially sets has_been_deleted in the status
then launches a thread that waits until the relevent timer expires, and finally deletes the instance
"""
_instance_is_valid(policy_type_id, policy_instance_id)
7. [Spec is ahead] The spec defines that a query of all policy instances should return the full bodies, however right now the RIC A1m returns a list of IDs (assuming subsequent queries can fetch the bodies).
8. [?] The spec document details some very specific "types", but the RIC A1m allows these to be loaded in (see #1). For example, spec section 4.2.6.2. We believe this should be removed from the spec and rather defined as a type. Xapps can be created that define new types, so the spec will quickly become "stale" if "types" are defined in the spec.
+
+
+Resiliency
+----------
+
+A1 is resilient to the majority of failures, but not all currently (though a solution is known).
+
+A1 uses the RIC SDL library to persist all policy state information: this includes the policy types, policy instances, and policy statuses.
+If state is built up in A1, and A1 fails (where Kubernetes will then restart it), none of this state is lost.
+
+The tiny bit of state that *is currently* in A1 (volatile) is it's "next second" job queue.
+Specifically, when policy instances are created or deleted, A1 creates jobs in a job queue (in memory).
+An rmr thread polls that thread every second, dequeues the jobs, and performs them.
+
+If A1 were killed at *exactly* the right time, you could have jobs lost, meaning the PUT or DELETE of an instance wouldn't actually take.
+This isn't drastic, as the operations are idempotent and could always be re-performed.
+
+In order for A1 to be considered completely resilient, this job queue would need to be moved to SDL.
+SDL uses Redis as a backend, and Redis natively supports queues via LIST, LPUSH, RPOP.
+I've asked the SDL team to consider an extension to SDL to support these Redis operations.
commands_pre=
echo "WARNING: make sure you're running with latest docker builds!"
sleep 5
- helm install --devel testreceiver -n testreceiver
- helm install --devel a1mediator -n a1
- helm install --devel dbaas-service -n dbaas
+# helm v3 is helm install [name] [chart]
+ helm install --devel testreceiver testreceiver
+ helm install --devel a1 a1mediator
+ helm install --devel dbaas dbaas-service
# wait for helm charts
sleep 30
./portforward.sh
echo "linting"
helm lint a1mediator
helm lint testreceiver
+ helm lint dbaas-service
echo "running tavern"
# run tavern
- pytest --tavern-beta-new-traceback
+ pytest --tavern-beta-new-traceback test_a1.tavern.yaml
echo "running ab"
# run apache bench
ab -n 100 -c 10 -v 4 http://localhost:10000/a1-p/healthcheck
integration_tests/getlogs.sh
echo "teardown"
helm delete testreceiver
- helm del --purge testreceiver
helm delete a1
- helm del --purge a1
- helm del dbaas
- helm del --purge dbaas
+ helm delete dbaas
pkill -9 kubectl
sleep 10