Use the RIC logging lib
[ric-plt/a1.git] / a1 / a1rmr.py
1 """
2 a1s rmr functionality
3 """
4 # ==================================================================================
5 #       Copyright (c) 2019 Nokia
6 #       Copyright (c) 2018-2019 AT&T Intellectual Property.
7 #
8 #   Licensed under the Apache License, Version 2.0 (the "License");
9 #   you may not use this file except in compliance with the License.
10 #   You may obtain a copy of the License at
11 #
12 #          http://www.apache.org/licenses/LICENSE-2.0
13 #
14 #   Unless required by applicable law or agreed to in writing, software
15 #   distributed under the License is distributed on an "AS IS" BASIS,
16 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 #   See the License for the specific language governing permissions and
18 #   limitations under the License.
19 # ==================================================================================
20 import os
21 import queue
22 import time
23 import json
24 from threading import Thread
25 from rmr import rmr, helpers
26 from mdclogpy import Logger
27 from a1 import data
28 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
29
30 mdc_logger = Logger(name=__name__)
31
32
33 RETRY_TIMES = int(os.environ.get("A1_RMR_RETRY_TIMES", 4))
34
35 # Note; yes, globals are bad, but this is a private (to this module) global
36 # No other module can import/access this (well, python doesn't enforce this, but all linters will complain)
37 __RMR_LOOP__ = None
38
39
40 class _RmrLoop:
41     """
42     class represents an rmr loop that constantly reads from rmr and performs operations based on waiting messages
43     this launches a thread, it should probably only be called once; the public facing method to access these ensures this
44     """
45
46     def __init__(self, init_func_override=None, rcv_func_override=None):
47         self.keep_going = True
48         self.rcv_func = None
49         self.last_ran = time.time()
50         self.work_queue = queue.Queue()  # thread safe queue https://docs.python.org/3/library/queue.html
51
52         # intialize rmr context
53         if init_func_override:
54             self.mrc = init_func_override()
55         else:
56             mdc_logger.debug("Waiting for rmr to initialize..")
57             # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
58             # internal ring of messages, and receive calls read from that
59             # currently the size is 2048 messages, so this is fine for the foreseeable future
60             self.mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
61             while rmr.rmr_ready(self.mrc) == 0:
62                 time.sleep(0.5)
63
64         # set the receive function
65         self.rcv_func = rcv_func_override if rcv_func_override else lambda: helpers.rmr_rcvall_msgs(self.mrc, [21024])
66
67         # start the work loop
68         self.thread = Thread(target=self.loop)
69         self.thread.start()
70
71     def loop(self):
72         """
73         This loop runs forever, and has 3 jobs:
74         - send out any messages that have to go out (create instance, delete instance)
75         - read a1s mailbox and update the status of all instances based on acks from downstream policy handlers
76         - clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET)
77         """
78         # loop forever
79         mdc_logger.debug("Work loop starting")
80         while self.keep_going:
81
82             # send out all messages waiting for us
83             while not self.work_queue.empty():
84                 work_item = self.work_queue.get(block=False, timeout=None)
85
86                 pay = work_item["payload"].encode("utf-8")
87                 for _ in range(0, RETRY_TIMES):
88                     # Waiting on an rmr bugfix regarding the over-allocation: https://rancodev.atlassian.net/browse/RICPLT-2490
89                     sbuf = rmr.rmr_alloc_msg(self.mrc, 4096, pay, True, work_item["msg type"])
90                     pre_send_summary = rmr.message_summary(sbuf)
91                     sbuf = rmr.rmr_send_msg(self.mrc, sbuf)  # send
92                     post_send_summary = rmr.message_summary(sbuf)
93                     mdc_logger.debug(
94                         "Pre-send summary: {0}, Post-send summary: {1}".format(pre_send_summary, post_send_summary)
95                     )
96                     rmr.rmr_free_msg(sbuf)  # free
97                     if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
98                         mdc_logger.debug("Message sent successfully!")
99                         break
100
101             # read our mailbox and update statuses
102             for msg in self.rcv_func():
103                 try:
104                     pay = json.loads(msg["payload"])
105                     pti = pay["policy_type_id"]
106                     pii = pay["policy_instance_id"]
107                     data.set_policy_instance_status(pti, pii, pay["handler_id"], pay["status"])
108                 except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError, TypeError, json.decoder.JSONDecodeError):
109                     # TODO: in the future we may also have to catch SDL errors
110                     mdc_logger.debug("Dropping malformed or non applicable message: {0}".format(msg))
111
112             # TODO: what's a reasonable sleep time? we don't want to hammer redis too much, and a1 isn't a real time component
113             self.last_ran = time.time()
114             time.sleep(1)
115
116
117 # Public
118
119
120 def start_rmr_thread(init_func_override=None, rcv_func_override=None):
121     """
122     Start a1s rmr thread
123     """
124     global __RMR_LOOP__
125     if __RMR_LOOP__ is None:
126         __RMR_LOOP__ = _RmrLoop(init_func_override, rcv_func_override)
127
128
129 def stop_rmr_thread():
130     """
131     stops the rmr thread
132     """
133     __RMR_LOOP__.keep_going = False
134
135
136 def queue_work(item):
137     """
138     push an item into the work queue
139     currently the only type of work is to send out messages
140     """
141     __RMR_LOOP__.work_queue.put(item)
142
143
144 def healthcheck_rmr_thread(seconds=30):
145     """
146     returns a boolean representing whether the rmr loop is healthy, by checking two attributes:
147     1. is it running?,
148     2. is it stuck in a long (> seconds) loop?
149     """
150     return __RMR_LOOP__.thread.is_alive() and ((time.time() - __RMR_LOOP__.last_ran) < seconds)
151
152
153 def replace_rcv_func(rcv_func):
154     """purely for the ease of unit testing to test different rcv scenarios"""
155     __RMR_LOOP__.rcv_func = rcv_func