X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=a1%2Fa1rmr.py;h=ae2bf00800b0df804567da0da5dcef085b4805d3;hb=6fceb807d55359d5c50a96d455fc8f47111f2733;hp=fd0d87fd4f181c2f739b301c03c0a6a87ba8462b;hpb=0b42dfc507b22b49669f360883a1cecaa50cda7b;p=ric-plt%2Fa1.git diff --git a/a1/a1rmr.py b/a1/a1rmr.py index fd0d87f..ae2bf00 100644 --- a/a1/a1rmr.py +++ b/a1/a1rmr.py @@ -23,14 +23,20 @@ import time import json from threading import Thread from rmr import rmr, helpers -from a1 import get_module_logger +from mdclogpy import Logger from a1 import data from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound -logger = get_module_logger(__name__) +mdc_logger = Logger(name=__name__) -RETRY_TIMES = int(os.environ.get("RMR_RETRY_TIMES", 4)) +RETRY_TIMES = int(os.environ.get("A1_RMR_RETRY_TIMES", 4)) + + +A1_POLICY_REQUEST = 20010 +A1_POLICY_RESPONSE = 20011 +A1_POLICY_QUERY = 20012 + # Note; yes, globals are bad, but this is a private (to this module) global # No other module can import/access this (well, python doesn't enforce this, but all linters will complain) @@ -53,7 +59,7 @@ class _RmrLoop: if init_func_override: self.mrc = init_func_override() else: - logger.debug("Waiting for rmr to initialize..") + mdc_logger.debug("Waiting for rmr to initialize..") # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an # internal ring of messages, and receive calls read from that # currently the size is 2048 messages, so this is fine for the foreseeable future @@ -62,7 +68,10 @@ class _RmrLoop: time.sleep(0.5) # set the receive function - self.rcv_func = rcv_func_override if rcv_func_override else lambda: helpers.rmr_rcvall_msgs(self.mrc, [21024]) + # TODO: when policy query is implemented, add A1_POLICY_QUERY + self.rcv_func = ( + rcv_func_override if rcv_func_override else lambda: helpers.rmr_rcvall_msgs(self.mrc, [A1_POLICY_RESPONSE]) + ) # start the work loop self.thread = Thread(target=self.loop) @@ -76,7 +85,7 @@ class _RmrLoop: - clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET) """ # loop forever - logger.debug("Work loop starting") + mdc_logger.debug("Work loop starting") while self.keep_going: # send out all messages waiting for us @@ -86,33 +95,30 @@ class _RmrLoop: pay = work_item["payload"].encode("utf-8") for _ in range(0, RETRY_TIMES): # Waiting on an rmr bugfix regarding the over-allocation: https://rancodev.atlassian.net/browse/RICPLT-2490 - sbuf = rmr.rmr_alloc_msg(self.mrc, 4096, pay, True, work_item["msg type"]) + sbuf = rmr.rmr_alloc_msg(self.mrc, 4096, pay, True, A1_POLICY_REQUEST) + # TODO: after next rmr is released, this can be done in the alloc call. but that's not avail in pypi yet + sbuf.contents.sub_id = work_item["ptid"] pre_send_summary = rmr.message_summary(sbuf) sbuf = rmr.rmr_send_msg(self.mrc, sbuf) # send post_send_summary = rmr.message_summary(sbuf) - logger.debug("Pre-send summary: %s, Post-send summary: %s", pre_send_summary, post_send_summary) + mdc_logger.debug( + "Pre-send summary: {0}, Post-send summary: {1}".format(pre_send_summary, post_send_summary) + ) rmr.rmr_free_msg(sbuf) # free if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK": - logger.debug("Message sent successfully!") + mdc_logger.debug("Message sent successfully!") break # read our mailbox and update statuses - updated_instances = set() for msg in self.rcv_func(): try: pay = json.loads(msg["payload"]) pti = pay["policy_type_id"] pii = pay["policy_instance_id"] - data.set_status(pti, pii, pay["handler_id"], pay["status"]) - updated_instances.add((pti, pii)) - except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError, json.decoder.JSONDecodeError): + data.set_policy_instance_status(pti, pii, pay["handler_id"], pay["status"]) + except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError, TypeError, json.decoder.JSONDecodeError): # TODO: in the future we may also have to catch SDL errors - logger.debug(("Dropping malformed or non applicable message", msg)) - - # for all updated instances, see if we can trigger a delete - # should be no catch needed here, since the status update would have failed if it was a bad pair - for ut in updated_instances: - data.clean_up_instance(ut[0], ut[1]) + mdc_logger.debug("Dropping malformed or non applicable message: {0}".format(msg)) # TODO: what's a reasonable sleep time? we don't want to hammer redis too much, and a1 isn't a real time component self.last_ran = time.time()