4 # ==================================================================================
5 # Copyright (c) 2019 Nokia
6 # Copyright (c) 2018-2019 AT&T Intellectual Property.
8 # Licensed under the Apache License, Version 2.0 (the "License");
9 # you may not use this file except in compliance with the License.
10 # You may obtain a copy of the License at
12 # http://www.apache.org/licenses/LICENSE-2.0
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS,
16 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 # See the License for the specific language governing permissions and
18 # limitations under the License.
19 # ==================================================================================
24 from threading import Thread
25 from rmr import rmr, helpers
26 from a1 import get_module_logger
28 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
30 logger = get_module_logger(__name__)
33 RETRY_TIMES = int(os.environ.get("RMR_RETRY_TIMES", 4))
35 # Note; yes, globals are bad, but this is a private (to this module) global
36 # No other module can import/access this (well, python doesn't enforce this, but all linters will complain)
42 class represents an rmr loop that constantly reads from rmr and performs operations based on waiting messages
43 this launches a thread, it should probably only be called once; the public facing method to access these ensures this
46 def __init__(self, init_func_override=None, rcv_func_override=None):
47 self.keep_going = True
49 self.last_ran = time.time()
50 self.work_queue = queue.Queue() # thread safe queue https://docs.python.org/3/library/queue.html
52 # intialize rmr context
53 if init_func_override:
54 self.mrc = init_func_override()
56 logger.debug("Waiting for rmr to initialize..")
57 # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
58 # internal ring of messages, and receive calls read from that
59 # currently the size is 2048 messages, so this is fine for the foreseeable future
60 self.mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
61 while rmr.rmr_ready(self.mrc) == 0:
64 # set the receive function
65 self.rcv_func = rcv_func_override if rcv_func_override else lambda: helpers.rmr_rcvall_msgs(self.mrc, [21024])
68 self.thread = Thread(target=self.loop)
73 This loop runs forever, and has 3 jobs:
74 - send out any messages that have to go out (create instance, delete instance)
75 - read a1s mailbox and update the status of all instances based on acks from downstream policy handlers
76 - clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET)
79 logger.debug("Work loop starting")
80 while self.keep_going:
82 # send out all messages waiting for us
83 while not self.work_queue.empty():
84 work_item = self.work_queue.get(block=False, timeout=None)
86 pay = work_item["payload"].encode("utf-8")
87 for _ in range(0, RETRY_TIMES):
88 # Waiting on an rmr bugfix regarding the over-allocation: https://rancodev.atlassian.net/browse/RICPLT-2490
89 sbuf = rmr.rmr_alloc_msg(self.mrc, 4096, pay, True, work_item["msg type"])
90 pre_send_summary = rmr.message_summary(sbuf)
91 sbuf = rmr.rmr_send_msg(self.mrc, sbuf) # send
92 post_send_summary = rmr.message_summary(sbuf)
93 logger.debug("Pre-send summary: %s, Post-send summary: %s", pre_send_summary, post_send_summary)
94 rmr.rmr_free_msg(sbuf) # free
95 if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
96 logger.debug("Message sent successfully!")
99 # read our mailbox and update statuses
100 updated_instances = set()
101 for msg in self.rcv_func():
103 pay = json.loads(msg["payload"])
104 pti = pay["policy_type_id"]
105 pii = pay["policy_instance_id"]
106 data.set_status(pti, pii, pay["handler_id"], pay["status"])
107 updated_instances.add((pti, pii))
108 except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError, json.decoder.JSONDecodeError):
109 # TODO: in the future we may also have to catch SDL errors
110 logger.debug(("Dropping malformed or non applicable message", msg))
112 # for all updated instances, see if we can trigger a delete
113 # should be no catch needed here, since the status update would have failed if it was a bad pair
114 for ut in updated_instances:
115 data.clean_up_instance(ut[0], ut[1])
117 # TODO: what's a reasonable sleep time? we don't want to hammer redis too much, and a1 isn't a real time component
118 self.last_ran = time.time()
125 def start_rmr_thread(init_func_override=None, rcv_func_override=None):
130 if __RMR_LOOP__ is None:
131 __RMR_LOOP__ = _RmrLoop(init_func_override, rcv_func_override)
134 def stop_rmr_thread():
138 __RMR_LOOP__.keep_going = False
141 def queue_work(item):
143 push an item into the work queue
144 currently the only type of work is to send out messages
146 __RMR_LOOP__.work_queue.put(item)
149 def healthcheck_rmr_thread(seconds=30):
151 returns a boolean representing whether the rmr loop is healthy, by checking two attributes:
153 2. is it stuck in a long (> seconds) loop?
155 return __RMR_LOOP__.thread.is_alive() and ((time.time() - __RMR_LOOP__.last_ran) < seconds)
158 def replace_rcv_func(rcv_func):
159 """purely for the ease of unit testing to test different rcv scenarios"""
160 __RMR_LOOP__.rcv_func = rcv_func