38f837357642970440d7fd294b8285fc17bf80b6
[ric-plt/a1.git] / a1 / a1rmr.py
1 """
2 a1s rmr functionality
3 """
4 # ==================================================================================
5 #       Copyright (c) 2019 Nokia
6 #       Copyright (c) 2018-2019 AT&T Intellectual Property.
7 #
8 #   Licensed under the Apache License, Version 2.0 (the "License");
9 #   you may not use this file except in compliance with the License.
10 #   You may obtain a copy of the License at
11 #
12 #          http://www.apache.org/licenses/LICENSE-2.0
13 #
14 #   Unless required by applicable law or agreed to in writing, software
15 #   distributed under the License is distributed on an "AS IS" BASIS,
16 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 #   See the License for the specific language governing permissions and
18 #   limitations under the License.
19 # ==================================================================================
20 import os
21 import queue
22 import time
23 import json
24 from threading import Thread
25 from rmr import rmr, helpers
26 from a1 import get_module_logger
27 from a1 import data
28 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
29
30 logger = get_module_logger(__name__)
31
32
33 RETRY_TIMES = int(os.environ.get("A1_RMR_RETRY_TIMES", 4))
34
35 # Note; yes, globals are bad, but this is a private (to this module) global
36 # No other module can import/access this (well, python doesn't enforce this, but all linters will complain)
37 __RMR_LOOP__ = None
38
39
40 class _RmrLoop:
41     """
42     class represents an rmr loop that constantly reads from rmr and performs operations based on waiting messages
43     this launches a thread, it should probably only be called once; the public facing method to access these ensures this
44     """
45
46     def __init__(self, init_func_override=None, rcv_func_override=None):
47         self.keep_going = True
48         self.rcv_func = None
49         self.last_ran = time.time()
50         self.work_queue = queue.Queue()  # thread safe queue https://docs.python.org/3/library/queue.html
51
52         # intialize rmr context
53         if init_func_override:
54             self.mrc = init_func_override()
55         else:
56             logger.debug("Waiting for rmr to initialize..")
57             # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
58             # internal ring of messages, and receive calls read from that
59             # currently the size is 2048 messages, so this is fine for the foreseeable future
60             self.mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
61             while rmr.rmr_ready(self.mrc) == 0:
62                 time.sleep(0.5)
63
64         # set the receive function
65         self.rcv_func = rcv_func_override if rcv_func_override else lambda: helpers.rmr_rcvall_msgs(self.mrc, [21024])
66
67         # start the work loop
68         self.thread = Thread(target=self.loop)
69         self.thread.start()
70
71     def loop(self):
72         """
73         This loop runs forever, and has 3 jobs:
74         - send out any messages that have to go out (create instance, delete instance)
75         - read a1s mailbox and update the status of all instances based on acks from downstream policy handlers
76         - clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET)
77         """
78         # loop forever
79         logger.debug("Work loop starting")
80         while self.keep_going:
81
82             # send out all messages waiting for us
83             while not self.work_queue.empty():
84                 work_item = self.work_queue.get(block=False, timeout=None)
85
86                 pay = work_item["payload"].encode("utf-8")
87                 for _ in range(0, RETRY_TIMES):
88                     # Waiting on an rmr bugfix regarding the over-allocation: https://rancodev.atlassian.net/browse/RICPLT-2490
89                     sbuf = rmr.rmr_alloc_msg(self.mrc, 4096, pay, True, work_item["msg type"])
90                     pre_send_summary = rmr.message_summary(sbuf)
91                     sbuf = rmr.rmr_send_msg(self.mrc, sbuf)  # send
92                     post_send_summary = rmr.message_summary(sbuf)
93                     logger.debug("Pre-send summary: %s, Post-send summary: %s", pre_send_summary, post_send_summary)
94                     rmr.rmr_free_msg(sbuf)  # free
95                     if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
96                         logger.debug("Message sent successfully!")
97                         break
98
99             # read our mailbox and update statuses
100             for msg in self.rcv_func():
101                 try:
102                     pay = json.loads(msg["payload"])
103                     pti = pay["policy_type_id"]
104                     pii = pay["policy_instance_id"]
105                     data.set_policy_instance_status(pti, pii, pay["handler_id"], pay["status"])
106                 except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError, json.decoder.JSONDecodeError):
107                     # TODO: in the future we may also have to catch SDL errors
108                     logger.debug(("Dropping malformed or non applicable message", msg))
109
110             # TODO: what's a reasonable sleep time? we don't want to hammer redis too much, and a1 isn't a real time component
111             self.last_ran = time.time()
112             time.sleep(1)
113
114
115 # Public
116
117
118 def start_rmr_thread(init_func_override=None, rcv_func_override=None):
119     """
120     Start a1s rmr thread
121     """
122     global __RMR_LOOP__
123     if __RMR_LOOP__ is None:
124         __RMR_LOOP__ = _RmrLoop(init_func_override, rcv_func_override)
125
126
127 def stop_rmr_thread():
128     """
129     stops the rmr thread
130     """
131     __RMR_LOOP__.keep_going = False
132
133
134 def queue_work(item):
135     """
136     push an item into the work queue
137     currently the only type of work is to send out messages
138     """
139     __RMR_LOOP__.work_queue.put(item)
140
141
142 def healthcheck_rmr_thread(seconds=30):
143     """
144     returns a boolean representing whether the rmr loop is healthy, by checking two attributes:
145     1. is it running?,
146     2. is it stuck in a long (> seconds) loop?
147     """
148     return __RMR_LOOP__.thread.is_alive() and ((time.time() - __RMR_LOOP__.last_ran) < seconds)
149
150
151 def replace_rcv_func(rcv_func):
152     """purely for the ease of unit testing to test different rcv scenarios"""
153     __RMR_LOOP__.rcv_func = rcv_func