Towards Resiliency.
[ric-plt/a1.git] / a1 / a1rmr.py
1 # ==================================================================================
2 #       Copyright (c) 2019 Nokia
3 #       Copyright (c) 2018-2019 AT&T Intellectual Property.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #          http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 # ==================================================================================
17 import os
18 import gevent
19 from rmr import rmr, helpers
20 from a1 import get_module_logger
21
22 logger = get_module_logger(__name__)
23
24
25 RETRY_TIMES = int(os.environ.get("RMR_RETRY_TIMES", 4))
26 MRC = None
27
28
29 def init_rmr():
30     """
31     called from run; not called for unit tests
32     """
33     global MRC
34     # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
35     # internal ring of messages, and receive calls read from that
36     # currently the size is 2048 messages, so this is fine for the foreseeable future
37     MRC = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
38
39     while rmr.rmr_ready(MRC) == 0:
40         gevent.sleep(1)
41         logger.debug("not yet ready")
42
43
44 def send(payload, message_type=0):
45     """
46     Sends a message up to RETRY_TIMES
47     If the message is sent successfully, it returns the transactionid
48     Does nothing otherwise
49     """
50     # we may be called many times in asynchronous loops, so for now, it is safer not to share buffers. We can investigate later whether this is really a problem.
51     sbuf = rmr.rmr_alloc_msg(MRC, 4096)
52     payload = payload if isinstance(payload, bytes) else payload.encode("utf-8")
53
54     # retry RETRY_TIMES to send the message
55     for _ in range(0, RETRY_TIMES):
56         # setup the send message
57         rmr.set_payload_and_length(payload, sbuf)
58         rmr.generate_and_set_transaction_id(sbuf)
59         sbuf.contents.state = 0
60         sbuf.contents.mtype = message_type
61         pre_send_summary = rmr.message_summary(sbuf)
62         logger.debug("Pre message send summary: %s", pre_send_summary)
63         transaction_id = pre_send_summary["transaction id"]  # save the transactionid because we need it later
64
65         # send
66         sbuf = rmr.rmr_send_msg(MRC, sbuf)
67         post_send_summary = rmr.message_summary(sbuf)
68         logger.debug("Post message send summary: %s", rmr.message_summary(sbuf))
69
70         # check success or failure
71         if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
72             # we are good
73             logger.debug("Message sent successfully!")
74             rmr.rmr_free_msg(sbuf)
75             return transaction_id
76
77     # we failed all RETRY_TIMES
78     logger.debug("Send failed all %s times, stopping", RETRY_TIMES)
79     rmr.rmr_free_msg(sbuf)
80     return None
81
82
83 def dequeue_all_waiting_messages(filter_type=[]):
84     """
85     dequeue all waiting rmr messages from rmr
86     """
87     return helpers.rmr_rcvall_msgs(MRC, filter_type)