1 # ==================================================================================
2 # Copyright (c) 2019 Nokia
3 # Copyright (c) 2018-2019 AT&T Intellectual Property.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ==================================================================================
21 from rmr import rmr, helpers
22 from a1 import get_module_logger
24 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
26 logger = get_module_logger(__name__)
29 RETRY_TIMES = int(os.environ.get("RMR_RETRY_TIMES", 4))
31 _SEND_QUEUE = queue.Queue() # thread safe queue https://docs.python.org/3/library/queue.html
37 This gets monkeypatched out for unit testing
39 # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
40 # internal ring of messages, and receive calls read from that
41 # currently the size is 2048 messages, so this is fine for the foreseeable future
42 mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
44 while rmr.rmr_ready(mrc) == 0:
50 def _send(mrc, payload, message_type=0):
52 Sends a message up to RETRY_TIMES
53 If the message is sent successfully, it returns the transactionid
54 Does nothing otherwise
56 # TODO: investigate moving this below and allocating the space based on the payload size
57 sbuf = rmr.rmr_alloc_msg(mrc, 4096)
58 payload = payload if isinstance(payload, bytes) else payload.encode("utf-8")
60 # retry RETRY_TIMES to send the message
61 for _ in range(0, RETRY_TIMES):
62 # setup the send message
63 rmr.set_payload_and_length(payload, sbuf)
64 rmr.generate_and_set_transaction_id(sbuf)
65 sbuf.contents.state = 0
66 sbuf.contents.mtype = message_type
67 pre_send_summary = rmr.message_summary(sbuf)
68 logger.debug("Pre message send summary: %s", pre_send_summary)
69 transaction_id = pre_send_summary["transaction id"] # save the transactionid because we need it later
72 sbuf = rmr.rmr_send_msg(mrc, sbuf)
73 post_send_summary = rmr.message_summary(sbuf)
74 logger.debug("Post message send summary: %s", rmr.message_summary(sbuf))
76 # check success or failure
77 if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
79 logger.debug("Message sent successfully!")
80 rmr.rmr_free_msg(sbuf)
83 # we failed all RETRY_TIMES
84 logger.debug("Send failed all %s times, stopping", RETRY_TIMES)
85 rmr.rmr_free_msg(sbuf)
89 def _update_all_statuses(mrc):
91 get all waiting messages, and try to parse them as status updates
92 (currently, those are the only messages a1 should get, this may have to be revisited later)
94 for msg in helpers.rmr_rcvall_msgs(mrc, [21024]):
96 pay = json.loads(msg["payload"])
97 data.set_status(pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"])
98 except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError):
99 logger.debug("Dropping malformed or non applicable message")
106 def queue_work(item):
108 push an item into the work queue
109 currently the only type of work is to send out messages
111 _SEND_QUEUE.put(item)
116 class represents an rmr loop meant to be called as a longstanding separate thread
119 def __init__(self, real_init=True):
120 self._rmr_is_ready = False
121 self._keep_going = True
122 self._real_init = real_init # useful for unit testing to turn off initialization
124 def rmr_is_ready(self):
125 """returns whether rmr has been initialized"""
126 return self._rmr_is_ready
129 """sets a flag for the loop to end"""
130 self._keep_going = False
134 This loop runs in an a1 thread forever, and has 3 jobs:
135 - send out any messages that have to go out (create instance, delete instance)
136 - read a1s mailbox and update the status of all instances based on acks from downstream policy handlers
137 - clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET)
142 logger.debug("Waiting for rmr to initialize...")
145 self._rmr_is_ready = True
146 logger.debug("Rmr is ready")
149 logger.debug("Work loop starting")
150 while self._keep_going:
152 We never raise an exception here. Log and keep moving
153 Bugs will eventually be caught be examining logs.
156 # First, send out all messages waiting for us
157 while not _SEND_QUEUE.empty():
158 work_item = _SEND_QUEUE.get(block=False, timeout=None)
159 _send(mrc, payload=work_item["payload"], message_type=work_item["msg type"])
161 # Next, update all statuses waiting in a1s mailbox
162 _update_all_statuses(mrc)
164 # TODO: next body of work is to try to clean up the database for any updated statuses
166 except Exception as e:
167 logger.debug("Polling thread encountered an unexpected exception, but it will continue:")