4 # ==================================================================================
5 # Copyright (c) 2019 Nokia
6 # Copyright (c) 2018-2019 AT&T Intellectual Property.
8 # Licensed under the Apache License, Version 2.0 (the "License");
9 # you may not use this file except in compliance with the License.
10 # You may obtain a copy of the License at
12 # http://www.apache.org/licenses/LICENSE-2.0
14 # Unless required by applicable law or agreed to in writing, software
15 # distributed under the License is distributed on an "AS IS" BASIS,
16 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 # See the License for the specific language governing permissions and
18 # limitations under the License.
19 # ==================================================================================
24 from threading import Thread
25 from rmr import rmr, helpers
26 from mdclogpy import Logger
28 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
30 mdc_logger = Logger(name=__name__)
33 RETRY_TIMES = int(os.environ.get("A1_RMR_RETRY_TIMES", 4))
36 A1_POLICY_REQUEST = 20010
37 A1_POLICY_RESPONSE = 20011
38 A1_POLICY_QUERY = 20012
41 # Note; yes, globals are bad, but this is a private (to this module) global
42 # No other module can import/access this (well, python doesn't enforce this, but all linters will complain)
48 class represents an rmr loop that constantly reads from rmr and performs operations based on waiting messages
49 this launches a thread, it should probably only be called once; the public facing method to access these ensures this
52 def __init__(self, init_func_override=None, rcv_func_override=None):
53 self.keep_going = True
55 self.last_ran = time.time()
56 self.work_queue = queue.Queue() # thread safe queue https://docs.python.org/3/library/queue.html
58 # intialize rmr context
59 if init_func_override:
60 self.mrc = init_func_override()
62 mdc_logger.debug("Waiting for rmr to initialize..")
63 # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
64 # internal ring of messages, and receive calls read from that
65 # currently the size is 2048 messages, so this is fine for the foreseeable future
66 self.mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
67 while rmr.rmr_ready(self.mrc) == 0:
70 # set the receive function
71 # TODO: when policy query is implemented, add A1_POLICY_QUERY
73 rcv_func_override if rcv_func_override else lambda: helpers.rmr_rcvall_msgs(self.mrc, [A1_POLICY_RESPONSE])
77 self.thread = Thread(target=self.loop)
82 This loop runs forever, and has 3 jobs:
83 - send out any messages that have to go out (create instance, delete instance)
84 - read a1s mailbox and update the status of all instances based on acks from downstream policy handlers
85 - clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET)
88 mdc_logger.debug("Work loop starting")
89 while self.keep_going:
91 # send out all messages waiting for us
92 while not self.work_queue.empty():
93 work_item = self.work_queue.get(block=False, timeout=None)
95 pay = work_item["payload"].encode("utf-8")
96 for _ in range(0, RETRY_TIMES):
97 # Waiting on an rmr bugfix regarding the over-allocation: https://rancodev.atlassian.net/browse/RICPLT-2490
98 sbuf = rmr.rmr_alloc_msg(self.mrc, 4096, pay, True, A1_POLICY_REQUEST)
99 # TODO: after next rmr is released, this can be done in the alloc call. but that's not avail in pypi yet
100 sbuf.contents.sub_id = work_item["ptid"]
101 pre_send_summary = rmr.message_summary(sbuf)
102 sbuf = rmr.rmr_send_msg(self.mrc, sbuf) # send
103 post_send_summary = rmr.message_summary(sbuf)
105 "Pre-send summary: {0}, Post-send summary: {1}".format(pre_send_summary, post_send_summary)
107 rmr.rmr_free_msg(sbuf) # free
108 if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
109 mdc_logger.debug("Message sent successfully!")
112 # read our mailbox and update statuses
113 for msg in self.rcv_func():
115 pay = json.loads(msg["payload"])
116 pti = pay["policy_type_id"]
117 pii = pay["policy_instance_id"]
118 data.set_policy_instance_status(pti, pii, pay["handler_id"], pay["status"])
119 except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError, TypeError, json.decoder.JSONDecodeError):
120 # TODO: in the future we may also have to catch SDL errors
121 mdc_logger.debug("Dropping malformed or non applicable message: {0}".format(msg))
123 # TODO: what's a reasonable sleep time? we don't want to hammer redis too much, and a1 isn't a real time component
124 self.last_ran = time.time()
131 def start_rmr_thread(init_func_override=None, rcv_func_override=None):
136 if __RMR_LOOP__ is None:
137 __RMR_LOOP__ = _RmrLoop(init_func_override, rcv_func_override)
140 def stop_rmr_thread():
144 __RMR_LOOP__.keep_going = False
147 def queue_work(item):
149 push an item into the work queue
150 currently the only type of work is to send out messages
152 __RMR_LOOP__.work_queue.put(item)
155 def healthcheck_rmr_thread(seconds=30):
157 returns a boolean representing whether the rmr loop is healthy, by checking two attributes:
159 2. is it stuck in a long (> seconds) loop?
161 return __RMR_LOOP__.thread.is_alive() and ((time.time() - __RMR_LOOP__.last_ran) < seconds)
164 def replace_rcv_func(rcv_func):
165 """purely for the ease of unit testing to test different rcv scenarios"""
166 __RMR_LOOP__.rcv_func = rcv_func