1 # ==================================================================================
2 # Copyright (c) 2019 Nokia
3 # Copyright (c) 2018-2019 AT&T Intellectual Property.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ==================================================================================
21 from threading import Thread
22 from rmr import rmr, helpers
23 from a1 import get_module_logger
25 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
27 logger = get_module_logger(__name__)
30 RETRY_TIMES = int(os.environ.get("RMR_RETRY_TIMES", 4))
32 _SEND_QUEUE = queue.Queue() # thread safe queue https://docs.python.org/3/library/queue.html
38 This gets monkeypatched out for unit testing
40 # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
41 # internal ring of messages, and receive calls read from that
42 # currently the size is 2048 messages, so this is fine for the foreseeable future
43 logger.debug("Waiting for rmr to initialize..")
44 mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
45 while rmr.rmr_ready(mrc) == 0:
51 def _send(mrc, payload, message_type=0):
53 Sends a message up to RETRY_TIMES
54 If the message is sent successfully, it returns the transactionid
55 Does nothing otherwise
57 # TODO: investigate moving this below and allocating the space based on the payload size
58 sbuf = rmr.rmr_alloc_msg(mrc, 4096)
59 payload = payload if isinstance(payload, bytes) else payload.encode("utf-8")
61 # retry RETRY_TIMES to send the message
62 for _ in range(0, RETRY_TIMES):
63 # setup the send message
64 rmr.set_payload_and_length(payload, sbuf)
65 rmr.generate_and_set_transaction_id(sbuf)
66 sbuf.contents.state = 0
67 sbuf.contents.mtype = message_type
68 pre_send_summary = rmr.message_summary(sbuf)
69 logger.debug("Pre message send summary: %s", pre_send_summary)
70 transaction_id = pre_send_summary["transaction id"] # save the transactionid because we need it later
73 sbuf = rmr.rmr_send_msg(mrc, sbuf)
74 post_send_summary = rmr.message_summary(sbuf)
75 logger.debug("Post message send summary: %s", rmr.message_summary(sbuf))
77 # check success or failure
78 if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
80 logger.debug("Message sent successfully!")
81 rmr.rmr_free_msg(sbuf)
84 # we failed all RETRY_TIMES
85 logger.debug("Send failed all %s times, stopping", RETRY_TIMES)
86 rmr.rmr_free_msg(sbuf)
95 push an item into the work queue
96 currently the only type of work is to send out messages
103 class represents an rmr loop meant to be called as a longstanding separate thread
106 def __init__(self, _init_func_override=None, rcv_func_override=None):
107 self._rmr_is_ready = False
108 self._keep_going = True
109 self._init_func_override = _init_func_override # useful for unit testing
110 self._rcv_func_override = rcv_func_override # useful for unit testing to mock certain recieve scenarios
111 self._rcv_func = None
113 def rmr_is_ready(self):
114 """returns whether rmr has been initialized"""
115 return self._rmr_is_ready
118 """sets a flag for the loop to end"""
119 self._keep_going = False
123 This loop runs in an a1 thread forever, and has 3 jobs:
124 - send out any messages that have to go out (create instance, delete instance)
125 - read a1s mailbox and update the status of all instances based on acks from downstream policy handlers
126 - clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET)
130 mrc = self._init_func_override() if self._init_func_override else _init_rmr()
131 self._rmr_is_ready = True
132 logger.debug("Rmr is ready")
134 # set the receive function called below
136 self._rcv_func_override if self._rcv_func_override else lambda: helpers.rmr_rcvall_msgs(mrc, [21024])
140 logger.debug("Work loop starting")
141 while self._keep_going:
142 # send out all messages waiting for us
143 while not _SEND_QUEUE.empty():
144 work_item = _SEND_QUEUE.get(block=False, timeout=None)
145 _send(mrc, payload=work_item["payload"], message_type=work_item["msg type"])
147 # read our mailbox and update statuses
148 updated_instances = set()
149 for msg in self._rcv_func():
151 pay = json.loads(msg["payload"])
152 pti = pay["policy_type_id"]
153 pii = pay["policy_instance_id"]
154 data.set_status(pti, pii, pay["handler_id"], pay["status"])
155 updated_instances.add((pti, pii))
156 except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError, json.decoder.JSONDecodeError):
157 # TODO: in the future we may also have to catch SDL errors
158 logger.debug(("Dropping malformed or non applicable message", msg))
160 # for all updated instances, see if we can trigger a delete
161 # should be no catch needed here, since the status update would have failed if it was a bad pair
162 for ut in updated_instances:
163 data.clean_up_instance(ut[0], ut[1])
165 # TODO: what's a reasonable sleep time? we don't want to hammer redis too much, and a1 isn't a real time component
169 def start_rmr_thread(init_func_override=None, rcv_func_override=None):
172 Also called during unit testing
174 rmr_loop = RmrLoop(init_func_override, rcv_func_override)
175 thread = Thread(target=rmr_loop.loop)
177 while not rmr_loop.rmr_is_ready():
179 return rmr_loop # return the handle; useful during unit testing