X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=a1%2Fcontroller.py;h=19b8a26b4ee27e08a4de90964fa9f4d16ca66587;hb=5e1198877328ed29a72e67c585825f8362b4d862;hp=118a67e4bfe832ad3b8b26133104878528896706;hpb=9f30fc12970e991d7ae07c56ce2f7bb559c74698;p=ric-plt%2Fa1.git diff --git a/a1/controller.py b/a1/controller.py index 118a67e..19b8a26 100644 --- a/a1/controller.py +++ b/a1/controller.py @@ -20,12 +20,16 @@ Main a1 controller from jsonschema import validate from jsonschema.exceptions import ValidationError import connexion +from prometheus_client import Counter from mdclogpy import Logger from ricsdl.exceptions import RejectedByBackend, NotConnected, BackendError from a1 import a1rmr, exceptions, data mdc_logger = Logger(name=__name__) +mdc_logger.mdclog_format_init(configmap_monitor=True) + +a1_counters = Counter('A1Policy', 'Policy type and instance counters', ['counter']) def _log_build_http_resp(exception, http_resp_code): @@ -73,10 +77,10 @@ def get_healthcheck(): 3. checks that our SDL connection is healthy """ if not a1rmr.healthcheck_rmr_thread(): - mdc_logger.debug("A1 is not healthy due to the rmr thread") + mdc_logger.error("A1 is not healthy due to the rmr thread") return "rmr thread is unhealthy", 500 if not data.SDL.healthcheck(): - mdc_logger.debug("A1 is not healthy because it does not have a connection to SDL") + mdc_logger.error("A1 is not healthy because it does not have a connection to SDL") return "sdl connection is unhealthy", 500 return "", 200 @@ -95,6 +99,7 @@ def create_policy_type(policy_type_id): """ Handles PUT /a1-p/policytypes/policy_type_id """ + a1_counters.labels(counter='CreatePolicyTypeReqs').inc() def put_type_handler(): data.store_policy_type(policy_type_id, body) @@ -116,6 +121,7 @@ def delete_policy_type(policy_type_id): """ Handles DELETE /a1-p/policytypes/policy_type_id """ + a1_counters.labels(counter='DeletePolicyTypeReqs').inc() def delete_policy_type_handler(): data.delete_policy_type(policy_type_id) @@ -158,6 +164,7 @@ def create_or_replace_policy_instance(policy_type_id, policy_instance_id): """ Handles PUT /a1-p/policytypes/polidyid/policies/policy_instance_id """ + a1_counters.labels(counter='CreatePolicyInstanceReqs').inc() instance = connexion.request.json def put_instance_handler(): @@ -171,10 +178,10 @@ def create_or_replace_policy_instance(policy_type_id, policy_instance_id): validate(instance=instance, schema=schema) # store the instance - data.store_policy_instance(policy_type_id, policy_instance_id, instance) + operation = data.store_policy_instance(policy_type_id, policy_instance_id, instance) # queue rmr send (best effort) - a1rmr.queue_instance_send(("CREATE", policy_type_id, policy_instance_id, instance)) + a1rmr.queue_instance_send((operation, policy_type_id, policy_instance_id, instance)) return "", 202 @@ -185,6 +192,7 @@ def delete_policy_instance(policy_type_id, policy_instance_id): """ Handles DELETE /a1-p/policytypes/polidyid/policies/policy_instance_id """ + a1_counters.labels(counter='DeletePolicyInstanceReqs').inc() def delete_instance_handler(): data.delete_policy_instance(policy_type_id, policy_instance_id) @@ -195,3 +203,21 @@ def delete_policy_instance(policy_type_id, policy_instance_id): return "", 202 return _try_func_return(delete_instance_handler) + + +# data delivery + + +def data_delivery(): + """ + Handle data delivery /data-delivery + """ + + def data_delivery_handler(): + mdc_logger.debug("data: {}".format(connexion.request.json)) + ei_job_result_json = connexion.request.json + mdc_logger.debug("jobid: {}".format(ei_job_result_json.get("job"))) + a1rmr.queue_ei_job_result((ei_job_result_json.get("job"), ei_job_result_json)) + return "", 200 + + return _try_func_return(data_delivery_handler)