d6114bf38e232112a91dfddb445eae1672e338d2
[ric-plt/a1.git] / a1 / a1rmr.py
1 # ==================================================================================
2 #       Copyright (c) 2019 Nokia
3 #       Copyright (c) 2018-2019 AT&T Intellectual Property.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #          http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 # ==================================================================================
17 import os
18 import queue
19 import time
20 import json
21 from rmr import rmr, helpers
22 from a1 import get_module_logger
23 from a1 import data
24 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
25
26 logger = get_module_logger(__name__)
27
28
29 RETRY_TIMES = int(os.environ.get("RMR_RETRY_TIMES", 4))
30
31 _SEND_QUEUE = queue.Queue()  # thread safe queue https://docs.python.org/3/library/queue.html
32
33
34 def _init_rmr():
35     """
36     init an rmr context
37     This gets monkeypatched out for unit testing
38     """
39     # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
40     # internal ring of messages, and receive calls read from that
41     # currently the size is 2048 messages, so this is fine for the foreseeable future
42     mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
43
44     while rmr.rmr_ready(mrc) == 0:
45         time.sleep(0.5)
46
47     return mrc
48
49
50 def _send(mrc, payload, message_type=0):
51     """
52     Sends a message up to RETRY_TIMES
53     If the message is sent successfully, it returns the transactionid
54     Does nothing otherwise
55     """
56     # TODO: investigate moving this below and allocating the space based on the payload size
57     sbuf = rmr.rmr_alloc_msg(mrc, 4096)
58     payload = payload if isinstance(payload, bytes) else payload.encode("utf-8")
59
60     # retry RETRY_TIMES to send the message
61     for _ in range(0, RETRY_TIMES):
62         # setup the send message
63         rmr.set_payload_and_length(payload, sbuf)
64         rmr.generate_and_set_transaction_id(sbuf)
65         sbuf.contents.state = 0
66         sbuf.contents.mtype = message_type
67         pre_send_summary = rmr.message_summary(sbuf)
68         logger.debug("Pre message send summary: %s", pre_send_summary)
69         transaction_id = pre_send_summary["transaction id"]  # save the transactionid because we need it later
70
71         # send
72         sbuf = rmr.rmr_send_msg(mrc, sbuf)
73         post_send_summary = rmr.message_summary(sbuf)
74         logger.debug("Post message send summary: %s", rmr.message_summary(sbuf))
75
76         # check success or failure
77         if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
78             # we are good
79             logger.debug("Message sent successfully!")
80             rmr.rmr_free_msg(sbuf)
81             return transaction_id
82
83     # we failed all RETRY_TIMES
84     logger.debug("Send failed all %s times, stopping", RETRY_TIMES)
85     rmr.rmr_free_msg(sbuf)
86     return None
87
88
89 def _update_all_statuses(mrc):
90     """
91     get all waiting messages, and try to parse them as status updates
92     (currently, those are the only messages a1 should get, this may have to be revisited later)
93     """
94     for msg in helpers.rmr_rcvall_msgs(mrc, [21024]):
95         try:
96             pay = json.loads(msg["payload"])
97             data.set_status(pay["policy_type_id"], pay["policy_instance_id"], pay["handler_id"], pay["status"])
98         except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError):
99             logger.debug("Dropping malformed or non applicable message")
100             logger.debug(msg)
101
102
103 # Public
104
105
106 def queue_work(item):
107     """
108     push an item into the work queue
109     currently the only type of work is to send out messages
110     """
111     _SEND_QUEUE.put(item)
112
113
114 class RmrLoop:
115     """
116     class represents an rmr loop meant to be called as a longstanding separate thread
117     """
118
119     def __init__(self, real_init=True):
120         self._rmr_is_ready = False
121         self._keep_going = True
122         self._real_init = real_init  # useful for unit testing to turn off initialization
123
124     def rmr_is_ready(self):
125         """returns whether rmr has been initialized"""
126         return self._rmr_is_ready
127
128     def stop(self):
129         """sets a flag for the loop to end"""
130         self._keep_going = False
131
132     def loop(self):
133         """
134         This loop runs in an a1 thread forever, and has 3 jobs:
135         - send out any messages that have to go out (create instance, delete instance)
136         - read a1s mailbox and update the status of all instances based on acks from downstream policy handlers
137         - clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET)
138         """
139
140         # get a context
141         mrc = None
142         logger.debug("Waiting for rmr to initialize...")
143         if self._real_init:
144             mrc = _init_rmr()
145         self._rmr_is_ready = True
146         logger.debug("Rmr is ready")
147
148         # loop forever
149         logger.debug("Work loop starting")
150         while self._keep_going:
151             """
152             We never raise an exception here. Log and keep moving
153             Bugs will eventually be caught be examining logs.
154             """
155             try:
156                 # First, send out all messages waiting for us
157                 while not _SEND_QUEUE.empty():
158                     work_item = _SEND_QUEUE.get(block=False, timeout=None)
159                     _send(mrc, payload=work_item["payload"], message_type=work_item["msg type"])
160
161                 # Next, update all statuses waiting in a1s mailbox
162                 _update_all_statuses(mrc)
163
164                 # TODO: next body of work is to try to clean up the database for any updated statuses
165
166             except Exception as e:
167                 logger.debug("Polling thread encountered an unexpected exception, but it will continue:")
168                 logger.exception(e)
169
170             time.sleep(1)