from jsonschema import validate
from jsonschema.exceptions import ValidationError
import connexion
+from prometheus_client import Counter
from mdclogpy import Logger
from ricsdl.exceptions import RejectedByBackend, NotConnected, BackendError
from a1 import a1rmr, exceptions, data
mdc_logger = Logger(name=__name__)
+mdc_logger.mdclog_format_init(configmap_monitor=True)
+
+a1_counters = Counter('A1Policy', 'Policy type and instance counters', ['counter'])
def _log_build_http_resp(exception, http_resp_code):
3. checks that our SDL connection is healthy
"""
if not a1rmr.healthcheck_rmr_thread():
- mdc_logger.debug("A1 is not healthy due to the rmr thread")
+ mdc_logger.error("A1 is not healthy due to the rmr thread")
return "rmr thread is unhealthy", 500
if not data.SDL.healthcheck():
- mdc_logger.debug("A1 is not healthy because it does not have a connection to SDL")
+ mdc_logger.error("A1 is not healthy because it does not have a connection to SDL")
return "sdl connection is unhealthy", 500
return "", 200
"""
Handles PUT /a1-p/policytypes/policy_type_id
"""
+ a1_counters.labels(counter='CreatePolicyTypeReqs').inc()
def put_type_handler():
data.store_policy_type(policy_type_id, body)
"""
Handles DELETE /a1-p/policytypes/policy_type_id
"""
+ a1_counters.labels(counter='DeletePolicyTypeReqs').inc()
def delete_policy_type_handler():
data.delete_policy_type(policy_type_id)
"""
Handles PUT /a1-p/policytypes/polidyid/policies/policy_instance_id
"""
+ a1_counters.labels(counter='CreatePolicyInstanceReqs').inc()
instance = connexion.request.json
def put_instance_handler():
validate(instance=instance, schema=schema)
# store the instance
- data.store_policy_instance(policy_type_id, policy_instance_id, instance)
+ operation = data.store_policy_instance(policy_type_id, policy_instance_id, instance)
# queue rmr send (best effort)
- a1rmr.queue_instance_send(("CREATE", policy_type_id, policy_instance_id, instance))
+ a1rmr.queue_instance_send((operation, policy_type_id, policy_instance_id, instance))
return "", 202
"""
Handles DELETE /a1-p/policytypes/polidyid/policies/policy_instance_id
"""
+ a1_counters.labels(counter='DeletePolicyInstanceReqs').inc()
def delete_instance_handler():
data.delete_policy_instance(policy_type_id, policy_instance_id)
return "", 202
return _try_func_return(delete_instance_handler)
+
+
+# data delivery
+
+
+def data_delivery():
+ """
+ Handle data delivery /data-delivery
+ """
+
+ def data_delivery_handler():
+ mdc_logger.debug("data: {}".format(connexion.request.json))
+ ei_job_result_json = connexion.request.json
+ mdc_logger.debug("jobid: {}".format(ei_job_result_json.get("job")))
+ a1rmr.queue_ei_job_result((ei_job_result_json.get("job"), ei_job_result_json))
+ return "", 200
+
+ return _try_func_return(data_delivery_handler)