X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=a1%2Fa1rmr.py;fp=a1%2Fa1rmr.py;h=359d2e0659a68ee871cc3612f90326bcb221fc46;hb=8bcc51a6d44d40a1a338fb6a721b5ee8f992f323;hp=d6114bf38e232112a91dfddb445eae1672e338d2;hpb=6b69910923309e05820706dc025e1441463906c9;p=ric-plt%2Fa1.git diff --git a/a1/a1rmr.py b/a1/a1rmr.py index d6114bf..359d2e0 100644 --- a/a1/a1rmr.py +++ b/a1/a1rmr.py @@ -18,6 +18,7 @@ import os import queue import time import json +from threading import Thread from rmr import rmr, helpers from a1 import get_module_logger from a1 import data @@ -39,8 +40,8 @@ def _init_rmr(): # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an # internal ring of messages, and receive calls read from that # currently the size is 2048 messages, so this is fine for the foreseeable future + logger.debug("Waiting for rmr to initialize..") mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL) - while rmr.rmr_ready(mrc) == 0: time.sleep(0.5) @@ -86,20 +87,6 @@ def _send(mrc, payload, message_type=0): return None -def _update_all_statuses(mrc): - """ - get all waiting messages, and try to parse them as status updates - (currently, those are the only messages a1 should get, this may have to be revisited later) - """ - for msg in helpers.rmr_rcvall_msgs(mrc, [21024]): - try: - pay = json.loads(msg["payload"]) - data.set_status(pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"]) - except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError): - logger.debug("Dropping malformed or non applicable message") - logger.debug(msg) - - # Public @@ -116,10 +103,12 @@ class RmrLoop: class represents an rmr loop meant to be called as a longstanding separate thread """ - def __init__(self, real_init=True): + def __init__(self, _init_func_override=None, rcv_func_override=None): self._rmr_is_ready = False self._keep_going = True - self._real_init = real_init # useful for unit testing to turn off initialization + self._init_func_override = _init_func_override # useful for unit testing + self._rcv_func_override = rcv_func_override # useful for unit testing to mock certain recieve scenarios + self._rcv_func = None def rmr_is_ready(self): """returns whether rmr has been initialized""" @@ -138,33 +127,53 @@ class RmrLoop: """ # get a context - mrc = None - logger.debug("Waiting for rmr to initialize...") - if self._real_init: - mrc = _init_rmr() + mrc = self._init_func_override() if self._init_func_override else _init_rmr() self._rmr_is_ready = True logger.debug("Rmr is ready") + # set the receive function called below + self._rcv_func = ( + self._rcv_func_override if self._rcv_func_override else lambda: helpers.rmr_rcvall_msgs(mrc, [21024]) + ) + # loop forever logger.debug("Work loop starting") while self._keep_going: - """ - We never raise an exception here. Log and keep moving - Bugs will eventually be caught be examining logs. - """ - try: - # First, send out all messages waiting for us - while not _SEND_QUEUE.empty(): - work_item = _SEND_QUEUE.get(block=False, timeout=None) - _send(mrc, payload=work_item["payload"], message_type=work_item["msg type"]) - - # Next, update all statuses waiting in a1s mailbox - _update_all_statuses(mrc) - - # TODO: next body of work is to try to clean up the database for any updated statuses - - except Exception as e: - logger.debug("Polling thread encountered an unexpected exception, but it will continue:") - logger.exception(e) - - time.sleep(1) + # send out all messages waiting for us + while not _SEND_QUEUE.empty(): + work_item = _SEND_QUEUE.get(block=False, timeout=None) + _send(mrc, payload=work_item["payload"], message_type=work_item["msg type"]) + + # read our mailbox and update statuses + updated_instances = set() + for msg in self._rcv_func(): + try: + pay = json.loads(msg["payload"]) + pti = pay["policy_type_id"] + pii = pay["policy_instance_id"] + data.set_status(pti, pii, pay["handler_id"], pay["status"]) + updated_instances.add((pti, pii)) + except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError, json.decoder.JSONDecodeError): + # TODO: in the future we may also have to catch SDL errors + logger.debug(("Dropping malformed or non applicable message", msg)) + + # for all updated instances, see if we can trigger a delete + # should be no catch needed here, since the status update would have failed if it was a bad pair + for ut in updated_instances: + data.clean_up_instance(ut[0], ut[1]) + + # TODO: what's a reasonable sleep time? we don't want to hammer redis too much, and a1 isn't a real time component + time.sleep(1) + + +def start_rmr_thread(init_func_override=None, rcv_func_override=None): + """ + Start a1s rmr thread + Also called during unit testing + """ + rmr_loop = RmrLoop(init_func_override, rcv_func_override) + thread = Thread(target=rmr_loop.loop) + thread.start() + while not rmr_loop.rmr_is_ready(): + time.sleep(0.5) + return rmr_loop # return the handle; useful during unit testing