abbb84f3827db867cbd6538e3eaed6eaadc29b41
[ric-plt/a1.git] / a1 / a1rmr.py
1 # ==================================================================================
2 #       Copyright (c) 2019 Nokia
3 #       Copyright (c) 2018-2019 AT&T Intellectual Property.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #          http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 # ==================================================================================
17 import os
18 import gevent
19 from rmr import rmr
20 from a1 import get_module_logger
21
22 logger = get_module_logger(__name__)
23
24
25 RETRY_TIMES = int(os.environ.get("RMR_RETRY_TIMES", 4))
26 MRC = None
27
28
29 def init_rmr():
30     """
31     called from run; not called for unit tests
32     """
33     global MRC
34     MRC = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, 0x00)
35
36     while rmr.rmr_ready(MRC) == 0:
37         gevent.sleep(1)
38         logger.debug("not yet ready")
39
40
41 def send(payload, message_type=0):
42     """
43     Sends a message up to RETRY_TIMES
44     If the message is sent successfully, it returns the transactionid
45     Does nothing otherwise
46     """
47     # we may be called many times in asyncronous loops, so for now, it is safer not to share buffers. We can investifgate later whether this is really a problem.
48     sbuf = rmr.rmr_alloc_msg(MRC, 4096)
49     payload = payload if isinstance(payload, bytes) else payload.encode("utf-8")
50
51     # retry RETRY_TIMES to send the message
52     for _ in range(0, RETRY_TIMES):
53         # setup the send message
54         rmr.set_payload_and_length(payload, sbuf)
55         rmr.generate_and_set_transaction_id(sbuf)
56         sbuf.contents.state = 0
57         sbuf.contents.mtype = message_type
58         pre_send_summary = rmr.message_summary(sbuf)
59         logger.debug("Pre message send summary: %s", pre_send_summary)
60         transaction_id = pre_send_summary["transaction id"]  # save the transactionid because we need it later
61
62         # send
63         sbuf = rmr.rmr_send_msg(MRC, sbuf)
64         post_send_summary = rmr.message_summary(sbuf)
65         logger.debug("Post message send summary: %s", rmr.message_summary(sbuf))
66
67         # check success or failure
68         if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
69             # we are good
70             logger.debug("Message sent successfully!")
71             return transaction_id
72
73     # we failed all RETRY_TIMES
74     logger.debug("Send failed all %s times, stopping", RETRY_TIMES)
75     return None
76
77
78 def dequeue_all_waiting_messages(filter_type=None):
79     """
80     dequeue all waiting rmr messages from rmr
81     We only add messages of type 21024; we drop other "spam";
82     see https://rancodev.atlassian.net/wiki/spaces/RICPLT/pages/60784719/RIC+message+types
83     """
84     new_messages = []
85     sbuf = rmr.rmr_alloc_msg(MRC, 4096)
86     while True:
87         sbuf = rmr.rmr_torcv_msg(MRC, sbuf, 0)  # set the timeout to 0 so this doesn't block!!
88         summary = rmr.message_summary(sbuf)
89         if summary["message state"] == 12 and summary["message status"] == "RMR_ERR_TIMEOUT":
90             # no new messages
91             break
92         else:
93             if (not filter_type) or (summary["message type"] == filter_type):
94                 # message is relevent
95                 new_messages.append(summary)
96             else:
97                 # "spam", do nothing with message, effectively dropped
98                 logger.debug("A message was received by a1, but it was not desired, droping: %s", summary)
99
100     return new_messages