Threading pt 2 (of 3, likely)
[ric-plt/a1.git] / a1 / a1rmr.py
1 # ==================================================================================
2 #       Copyright (c) 2019 Nokia
3 #       Copyright (c) 2018-2019 AT&T Intellectual Property.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #          http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 # ==================================================================================
17 import os
18 import queue
19 import time
20 import json
21 from threading import Thread
22 from rmr import rmr, helpers
23 from a1 import get_module_logger
24 from a1 import data
25 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
26
27 logger = get_module_logger(__name__)
28
29
30 RETRY_TIMES = int(os.environ.get("RMR_RETRY_TIMES", 4))
31
32 _SEND_QUEUE = queue.Queue()  # thread safe queue https://docs.python.org/3/library/queue.html
33
34
35 def _init_rmr():
36     """
37     init an rmr context
38     This gets monkeypatched out for unit testing
39     """
40     # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
41     # internal ring of messages, and receive calls read from that
42     # currently the size is 2048 messages, so this is fine for the foreseeable future
43     logger.debug("Waiting for rmr to initialize..")
44     mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
45     while rmr.rmr_ready(mrc) == 0:
46         time.sleep(0.5)
47
48     return mrc
49
50
51 def _send(mrc, payload, message_type=0):
52     """
53     Sends a message up to RETRY_TIMES
54     If the message is sent successfully, it returns the transactionid
55     Does nothing otherwise
56     """
57     # TODO: investigate moving this below and allocating the space based on the payload size
58     sbuf = rmr.rmr_alloc_msg(mrc, 4096)
59     payload = payload if isinstance(payload, bytes) else payload.encode("utf-8")
60
61     # retry RETRY_TIMES to send the message
62     for _ in range(0, RETRY_TIMES):
63         # setup the send message
64         rmr.set_payload_and_length(payload, sbuf)
65         rmr.generate_and_set_transaction_id(sbuf)
66         sbuf.contents.state = 0
67         sbuf.contents.mtype = message_type
68         pre_send_summary = rmr.message_summary(sbuf)
69         logger.debug("Pre message send summary: %s", pre_send_summary)
70         transaction_id = pre_send_summary["transaction id"]  # save the transactionid because we need it later
71
72         # send
73         sbuf = rmr.rmr_send_msg(mrc, sbuf)
74         post_send_summary = rmr.message_summary(sbuf)
75         logger.debug("Post message send summary: %s", rmr.message_summary(sbuf))
76
77         # check success or failure
78         if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
79             # we are good
80             logger.debug("Message sent successfully!")
81             rmr.rmr_free_msg(sbuf)
82             return transaction_id
83
84     # we failed all RETRY_TIMES
85     logger.debug("Send failed all %s times, stopping", RETRY_TIMES)
86     rmr.rmr_free_msg(sbuf)
87     return None
88
89
90 # Public
91
92
93 def queue_work(item):
94     """
95     push an item into the work queue
96     currently the only type of work is to send out messages
97     """
98     _SEND_QUEUE.put(item)
99
100
101 class RmrLoop:
102     """
103     class represents an rmr loop meant to be called as a longstanding separate thread
104     """
105
106     def __init__(self, _init_func_override=None, rcv_func_override=None):
107         self._rmr_is_ready = False
108         self._keep_going = True
109         self._init_func_override = _init_func_override  # useful for unit testing
110         self._rcv_func_override = rcv_func_override  # useful for unit testing to mock certain recieve scenarios
111         self._rcv_func = None
112
113     def rmr_is_ready(self):
114         """returns whether rmr has been initialized"""
115         return self._rmr_is_ready
116
117     def stop(self):
118         """sets a flag for the loop to end"""
119         self._keep_going = False
120
121     def loop(self):
122         """
123         This loop runs in an a1 thread forever, and has 3 jobs:
124         - send out any messages that have to go out (create instance, delete instance)
125         - read a1s mailbox and update the status of all instances based on acks from downstream policy handlers
126         - clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET)
127         """
128
129         # get a context
130         mrc = self._init_func_override() if self._init_func_override else _init_rmr()
131         self._rmr_is_ready = True
132         logger.debug("Rmr is ready")
133
134         # set the receive function called below
135         self._rcv_func = (
136             self._rcv_func_override if self._rcv_func_override else lambda: helpers.rmr_rcvall_msgs(mrc, [21024])
137         )
138
139         # loop forever
140         logger.debug("Work loop starting")
141         while self._keep_going:
142             # send out all messages waiting for us
143             while not _SEND_QUEUE.empty():
144                 work_item = _SEND_QUEUE.get(block=False, timeout=None)
145                 _send(mrc, payload=work_item["payload"], message_type=work_item["msg type"])
146
147             # read our mailbox and update statuses
148             updated_instances = set()
149             for msg in self._rcv_func():
150                 try:
151                     pay = json.loads(msg["payload"])
152                     pti = pay["policy_type_id"]
153                     pii = pay["policy_instance_id"]
154                     data.set_status(pti, pii, pay["handler_id"], pay["status"])
155                     updated_instances.add((pti, pii))
156                 except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError, json.decoder.JSONDecodeError):
157                     # TODO: in the future we may also have to catch SDL errors
158                     logger.debug(("Dropping malformed or non applicable message", msg))
159
160             # for all updated instances, see if we can trigger a delete
161             # should be no catch needed here, since the status update would have failed if it was a bad pair
162             for ut in updated_instances:
163                 data.clean_up_instance(ut[0], ut[1])
164
165         # TODO: what's a reasonable sleep time? we don't want to hammer redis too much, and a1 isn't a real time component
166         time.sleep(1)
167
168
169 def start_rmr_thread(init_func_override=None, rcv_func_override=None):
170     """
171     Start a1s rmr thread
172     Also called during unit testing
173     """
174     rmr_loop = RmrLoop(init_func_override, rcv_func_override)
175     thread = Thread(target=rmr_loop.loop)
176     thread.start()
177     while not rmr_loop.rmr_is_ready():
178         time.sleep(0.5)
179     return rmr_loop  # return the handle; useful during unit testing