X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=a1%2Fa1rmr.py;h=050674c3a2d88805e0e4ea1ca161233de7874162;hb=e73baaad1c5251b2d98f1832e2fbd09dcd38a590;hp=3d0ff9535ca3df98cdf7d77ca6bca0ed53999bf1;hpb=2c1c4e9dd207289bbdc3453bfdb3e2dad68df8a8;p=ric-plt%2Fa1.git diff --git a/a1/a1rmr.py b/a1/a1rmr.py index 3d0ff95..050674c 100644 --- a/a1/a1rmr.py +++ b/a1/a1rmr.py @@ -1,6 +1,3 @@ -""" -a1s rmr functionality -""" # ================================================================================== # Copyright (c) 2019-2020 Nokia # Copyright (c) 2018-2020 AT&T Intellectual Property. @@ -17,6 +14,9 @@ a1s rmr functionality # See the License for the specific language governing permissions and # limitations under the License. # ================================================================================== +""" +A1 RMR functionality +""" import os import queue import time @@ -30,6 +30,11 @@ from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound mdc_logger = Logger(name=__name__) +# With Nanomsg and NNG it was possible for a send attempt to have a "soft" +# failure which did warrant some retries if the status of the send is RMR_ERR_RETRY. +# Because of the way NNG worked, it sometimes required many tens of retries, +# and a retry state happened often for even moderately "verbose" applications. +# With SI95 there is still a possibility that a retry is necessary, but it is very rare. RETRY_TIMES = int(os.environ.get("A1_RMR_RETRY_TIMES", 4)) A1_POLICY_REQUEST = 20010 A1_POLICY_RESPONSE = 20011 @@ -43,13 +48,27 @@ __RMR_LOOP__ = None class _RmrLoop: """ - class represents an rmr loop that constantly reads from rmr and performs operations based on waiting messages - this launches a thread, it should probably only be called once; the public facing method to access these ensures this + Class represents an rmr loop that constantly reads from rmr and performs operations + based on waiting messages. This launches a thread, it should probably only be called + once; the public facing method to access these ensures this. TODO: the xapp frame has a version of this looping structure. See if A1 can switch to that. """ def __init__(self, init_func_override=None, rcv_func_override=None): + """ + Init + + Parameters + ---------- + init_func_override: function (optional) + Function that initializes RMR and answers an RMR context. + Supply an empty function to skip initializing RMR. + + rcv_func_override: function (optional) + Function that receives messages from RMR and answers a list. + Supply a trivial function to skip reading from RMR. + """ self.keep_going = True self.rcv_func = None self.last_ran = time.time() @@ -62,8 +81,8 @@ class _RmrLoop: self.mrc = init_func_override() else: mdc_logger.debug("Waiting for rmr to initialize..") - # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an - # internal ring of messages, and receive calls read from that + # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread + # populates an internal ring of messages, and receive calls read from that. # currently the size is 2048 messages, so this is fine for the foreseeable future self.mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL) while rmr.rmr_ready(self.mrc) == 0: @@ -82,41 +101,51 @@ class _RmrLoop: def _assert_good_send(self, sbuf, pre_send_summary): """ - common helper function for _send_msg and _rts_msg + Extracts the send result and logs a detailed warning if the send failed. + Returns the message state, an integer that indicates the result. """ post_send_summary = rmr.message_summary(sbuf) - if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK": - return True - mdc_logger.debug("Message NOT sent!") - mdc_logger.debug("Pre-send summary: {0}, Post-send summary: {1}".format(pre_send_summary, post_send_summary)) - return False + if post_send_summary[rmr.RMR_MS_MSG_STATE] != rmr.RMR_OK: + mdc_logger.warning("RMR send failed; pre-send summary: {0}, post-send summary: {1}".format(pre_send_summary, post_send_summary)) + return post_send_summary[rmr.RMR_MS_MSG_STATE] def _send_msg(self, pay, mtype, subid): """ - sends a msg + Creates and sends a message via RMR's send-message feature with the specified payload + using the specified message type and subscription ID. """ + sbuf = rmr.rmr_alloc_msg(self.mrc, len(pay), payload=pay, gen_transaction_id=True, mtype=mtype, sub_id=subid) + sbuf.contents.sub_id = subid + pre_send_summary = rmr.message_summary(sbuf) for _ in range(0, RETRY_TIMES): - sbuf = rmr.rmr_alloc_msg(self.mrc, len(pay), payload=pay, gen_transaction_id=True, mtype=mtype, sub_id=subid) - sbuf.contents.sub_id = subid - pre_send_summary = rmr.message_summary(sbuf) - mdc_logger.debug("Trying to send message: {}".format(pre_send_summary)) - sbuf = rmr.rmr_send_msg(self.mrc, sbuf) # send - if self._assert_good_send(sbuf, pre_send_summary): - rmr.rmr_free_msg(sbuf) # free - return + mdc_logger.debug("_send_msg: sending: {}".format(pre_send_summary)) + sbuf = rmr.rmr_send_msg(self.mrc, sbuf) + msg_state = self._assert_good_send(sbuf, pre_send_summary) + mdc_logger.debug("_send_msg: result message state: {}".format(msg_state)) + if msg_state != rmr.RMR_ERR_RETRY: + break - mdc_logger.debug("A1 did NOT send the message successfully after {} retries!".format(RETRY_TIMES)) + rmr.rmr_free_msg(sbuf) + if msg_state != rmr.RMR_OK: + mdc_logger.warning("_send_msg: failed after {} retries".format(RETRY_TIMES)) def _rts_msg(self, pay, sbuf_rts, mtype): """ - sends a message using rts - we do not call free here because we may rts many times; it is called after the rts loop + Sends a message via RMR's return-to-sender feature. + This neither allocates nor frees a message buffer because we may rts many times. + Returns the message buffer from the RTS function, which may reallocate it. """ + pre_send_summary = rmr.message_summary(sbuf_rts) for _ in range(0, RETRY_TIMES): - pre_send_summary = rmr.message_summary(sbuf_rts) + mdc_logger.debug("_rts_msg: sending: {}".format(pre_send_summary)) sbuf_rts = rmr.rmr_rts_msg(self.mrc, sbuf_rts, payload=pay, mtype=mtype) - if self._assert_good_send(sbuf_rts, pre_send_summary): + msg_state = self._assert_good_send(sbuf_rts, pre_send_summary) + mdc_logger.debug("_rts_msg: result message state: {}".format(msg_state)) + if msg_state != rmr.RMR_ERR_RETRY: break + + if msg_state != rmr.RMR_OK: + mdc_logger.warning("_rts_msg: failed after {} retries".format(RETRY_TIMES)) return sbuf_rts # in some cases rts may return a new sbuf def _handle_sends(self): @@ -149,40 +178,40 @@ class _RmrLoop: for (msg, sbuf) in self.rcv_func(): # TODO: in the future we may also have to catch SDL errors try: - mtype = msg["message type"] + mtype = msg[rmr.RMR_MS_MSG_TYPE] except (KeyError, TypeError, json.decoder.JSONDecodeError): - mdc_logger.debug("Dropping malformed policy ack/query message: {0}".format(msg)) + mdc_logger.warning("Dropping malformed message: {0}".format(msg)) if mtype == A1_POLICY_RESPONSE: try: # got a policy response, update status - pay = json.loads(msg["payload"]) + pay = json.loads(msg[rmr.RMR_MS_PAYLOAD]) data.set_policy_instance_status( pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"] ) mdc_logger.debug("Successfully received status update: {0}".format(pay)) except (PolicyTypeNotFound, PolicyInstanceNotFound): - mdc_logger.debug("Received a response for a non-existent instance") + mdc_logger.warning("Received a response for a non-existent type/instance: {0}".format(msg)) except (KeyError, TypeError, json.decoder.JSONDecodeError): - mdc_logger.debug("Dropping malformed policy ack message: {0}".format(msg)) + mdc_logger.warning("Dropping malformed policy response: {0}".format(msg)) elif mtype == A1_POLICY_QUERY: try: # got a query, do a lookup and send out all instances - pti = json.loads(msg["payload"])["policy_type_id"] + pti = json.loads(msg[rmr.RMR_MS_PAYLOAD])["policy_type_id"] instance_list = data.get_instance_list(pti) # will raise if a bad type - mdc_logger.debug("Received a query for a good type: {0}".format(msg)) + mdc_logger.debug("Received a query for a known policy type: {0}".format(msg)) for pii in instance_list: instance = data.get_policy_instance(pti, pii) payload = json.dumps(messages.a1_to_handler("CREATE", pti, pii, instance)).encode("utf-8") sbuf = self._rts_msg(payload, sbuf, A1_POLICY_REQUEST) except (PolicyTypeNotFound): - mdc_logger.debug("Received a query for a non-existent type: {0}".format(msg)) + mdc_logger.warning("Received a policy query for a non-existent type: {0}".format(msg)) except (KeyError, TypeError, json.decoder.JSONDecodeError): - mdc_logger.debug("Dropping malformed policy query message: {0}".format(msg)) + mdc_logger.warning("Dropping malformed policy query: {0}".format(msg)) else: - mdc_logger.debug("Received message type {0} but A1 does not handle this".format(mtype)) + mdc_logger.warning("Received message type {0} but A1 does not handle this".format(mtype)) # we must free each sbuf rmr.rmr_free_msg(sbuf) @@ -199,6 +228,16 @@ class _RmrLoop: def start_rmr_thread(init_func_override=None, rcv_func_override=None): """ Start a1s rmr thread + + Parameters + ---------- + init_func_override: function (optional) + Function that initializes RMR and answers an RMR context. + Supply an empty function to skip initializing RMR. + + rcv_func_override: function (optional) + Function that receives messages from RMR and answers a list. + Supply a trivial function to skip reading from RMR. """ global __RMR_LOOP__ if __RMR_LOOP__ is None: