4 # ==================================================================================
5 # Copyright (c) 2019 Nokia
6 # Copyright (c) 2018-2019 AT&T Intellectual Property.
8 # Licensed under the Apache License, Version 2.0 (the "License");
9 # you may not use this file except in compliance with the License.
10 # You may obtain a copy of the License at
12 # http://www.apache.org/licenses/LICENSE-2.0
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS,
16 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 # See the License for the specific language governing permissions and
18 # limitations under the License.
19 # ==================================================================================
24 from threading import Thread
25 from rmr import rmr, helpers
26 from mdclogpy import Logger
28 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
30 mdc_logger = Logger(name=__name__)
33 RETRY_TIMES = int(os.environ.get("A1_RMR_RETRY_TIMES", 4))
35 # Note; yes, globals are bad, but this is a private (to this module) global
36 # No other module can import/access this (well, python doesn't enforce this, but all linters will complain)
42 class represents an rmr loop that constantly reads from rmr and performs operations based on waiting messages
43 this launches a thread, it should probably only be called once; the public facing method to access these ensures this
46 def __init__(self, init_func_override=None, rcv_func_override=None):
47 self.keep_going = True
49 self.last_ran = time.time()
50 self.work_queue = queue.Queue() # thread safe queue https://docs.python.org/3/library/queue.html
52 # intialize rmr context
53 if init_func_override:
54 self.mrc = init_func_override()
56 mdc_logger.debug("Waiting for rmr to initialize..")
57 # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
58 # internal ring of messages, and receive calls read from that
59 # currently the size is 2048 messages, so this is fine for the foreseeable future
60 self.mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
61 while rmr.rmr_ready(self.mrc) == 0:
64 # set the receive function
65 self.rcv_func = rcv_func_override if rcv_func_override else lambda: helpers.rmr_rcvall_msgs(self.mrc, [21024])
68 self.thread = Thread(target=self.loop)
73 This loop runs forever, and has 3 jobs:
74 - send out any messages that have to go out (create instance, delete instance)
75 - read a1s mailbox and update the status of all instances based on acks from downstream policy handlers
76 - clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET)
79 mdc_logger.debug("Work loop starting")
80 while self.keep_going:
82 # send out all messages waiting for us
83 while not self.work_queue.empty():
84 work_item = self.work_queue.get(block=False, timeout=None)
86 pay = work_item["payload"].encode("utf-8")
87 for _ in range(0, RETRY_TIMES):
88 # Waiting on an rmr bugfix regarding the over-allocation: https://rancodev.atlassian.net/browse/RICPLT-2490
89 sbuf = rmr.rmr_alloc_msg(self.mrc, 4096, pay, True, work_item["msg type"])
90 pre_send_summary = rmr.message_summary(sbuf)
91 sbuf = rmr.rmr_send_msg(self.mrc, sbuf) # send
92 post_send_summary = rmr.message_summary(sbuf)
94 "Pre-send summary: {0}, Post-send summary: {1}".format(pre_send_summary, post_send_summary)
96 rmr.rmr_free_msg(sbuf) # free
97 if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
98 mdc_logger.debug("Message sent successfully!")
101 # read our mailbox and update statuses
102 for msg in self.rcv_func():
104 pay = json.loads(msg["payload"])
105 pti = pay["policy_type_id"]
106 pii = pay["policy_instance_id"]
107 data.set_policy_instance_status(pti, pii, pay["handler_id"], pay["status"])
108 except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError, TypeError, json.decoder.JSONDecodeError):
109 # TODO: in the future we may also have to catch SDL errors
110 mdc_logger.debug("Dropping malformed or non applicable message: {0}".format(msg))
112 # TODO: what's a reasonable sleep time? we don't want to hammer redis too much, and a1 isn't a real time component
113 self.last_ran = time.time()
120 def start_rmr_thread(init_func_override=None, rcv_func_override=None):
125 if __RMR_LOOP__ is None:
126 __RMR_LOOP__ = _RmrLoop(init_func_override, rcv_func_override)
129 def stop_rmr_thread():
133 __RMR_LOOP__.keep_going = False
136 def queue_work(item):
138 push an item into the work queue
139 currently the only type of work is to send out messages
141 __RMR_LOOP__.work_queue.put(item)
144 def healthcheck_rmr_thread(seconds=30):
146 returns a boolean representing whether the rmr loop is healthy, by checking two attributes:
148 2. is it stuck in a long (> seconds) loop?
150 return __RMR_LOOP__.thread.is_alive() and ((time.time() - __RMR_LOOP__.last_ran) < seconds)
153 def replace_rcv_func(rcv_func):
154 """purely for the ease of unit testing to test different rcv scenarios"""
155 __RMR_LOOP__.rcv_func = rcv_func