Merge "Merge branch 'development'"
[ric-plt/a1.git] / a1 / a1rmr.py
1 """
2 a1s rmr functionality
3 """
4 # ==================================================================================
5 #       Copyright (c) 2019 Nokia
6 #       Copyright (c) 2018-2019 AT&T Intellectual Property.
7 #
8 #   Licensed under the Apache License, Version 2.0 (the "License");
9 #   you may not use this file except in compliance with the License.
10 #   You may obtain a copy of the License at
11 #
12 #          http://www.apache.org/licenses/LICENSE-2.0
13 #
14 #   Unless required by applicable law or agreed to in writing, software
15 #   distributed under the License is distributed on an "AS IS" BASIS,
16 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 #   See the License for the specific language governing permissions and
18 #   limitations under the License.
19 # ==================================================================================
20 import os
21 import queue
22 import time
23 import json
24 from threading import Thread
25 from rmr import rmr, helpers
26 from mdclogpy import Logger
27 from a1 import data
28 from a1.exceptions import PolicyTypeNotFound, PolicyInstanceNotFound
29
30 mdc_logger = Logger(name=__name__)
31
32
33 RETRY_TIMES = int(os.environ.get("A1_RMR_RETRY_TIMES", 4))
34
35
36 A1_POLICY_REQUEST = 20010
37 A1_POLICY_RESPONSE = 20011
38 A1_POLICY_QUERY = 20012
39
40
41 # Note; yes, globals are bad, but this is a private (to this module) global
42 # No other module can import/access this (well, python doesn't enforce this, but all linters will complain)
43 __RMR_LOOP__ = None
44
45
46 class _RmrLoop:
47     """
48     class represents an rmr loop that constantly reads from rmr and performs operations based on waiting messages
49     this launches a thread, it should probably only be called once; the public facing method to access these ensures this
50     """
51
52     def __init__(self, init_func_override=None, rcv_func_override=None):
53         self.keep_going = True
54         self.rcv_func = None
55         self.last_ran = time.time()
56         self.work_queue = queue.Queue()  # thread safe queue https://docs.python.org/3/library/queue.html
57
58         # intialize rmr context
59         if init_func_override:
60             self.mrc = init_func_override()
61         else:
62             mdc_logger.debug("Waiting for rmr to initialize..")
63             # rmr.RMRFL_MTCALL puts RMR into a multithreaded mode, where a receiving thread populates an
64             # internal ring of messages, and receive calls read from that
65             # currently the size is 2048 messages, so this is fine for the foreseeable future
66             self.mrc = rmr.rmr_init(b"4562", rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
67             while rmr.rmr_ready(self.mrc) == 0:
68                 time.sleep(0.5)
69
70         # set the receive function
71         # TODO: when policy query is implemented, add A1_POLICY_QUERY
72         self.rcv_func = (
73             rcv_func_override if rcv_func_override else lambda: helpers.rmr_rcvall_msgs(self.mrc, [A1_POLICY_RESPONSE])
74         )
75
76         # start the work loop
77         self.thread = Thread(target=self.loop)
78         self.thread.start()
79
80     def loop(self):
81         """
82         This loop runs forever, and has 3 jobs:
83         - send out any messages that have to go out (create instance, delete instance)
84         - read a1s mailbox and update the status of all instances based on acks from downstream policy handlers
85         - clean up the database (eg delete the instance) under certain conditions based on those statuses (NOT DONE YET)
86         """
87         # loop forever
88         mdc_logger.debug("Work loop starting")
89         while self.keep_going:
90
91             # send out all messages waiting for us
92             while not self.work_queue.empty():
93                 work_item = self.work_queue.get(block=False, timeout=None)
94
95                 pay = work_item["payload"].encode("utf-8")
96                 for _ in range(0, RETRY_TIMES):
97                     # Waiting on an rmr bugfix regarding the over-allocation: https://rancodev.atlassian.net/browse/RICPLT-2490
98                     sbuf = rmr.rmr_alloc_msg(self.mrc, 4096, pay, True, A1_POLICY_REQUEST)
99                     # TODO: after next rmr is released, this can be done in the alloc call. but that's not avail in pypi yet
100                     sbuf.contents.sub_id = work_item["ptid"]
101                     pre_send_summary = rmr.message_summary(sbuf)
102                     sbuf = rmr.rmr_send_msg(self.mrc, sbuf)  # send
103                     post_send_summary = rmr.message_summary(sbuf)
104                     mdc_logger.debug(
105                         "Pre-send summary: {0}, Post-send summary: {1}".format(pre_send_summary, post_send_summary)
106                     )
107                     rmr.rmr_free_msg(sbuf)  # free
108                     if post_send_summary["message state"] == 0 and post_send_summary["message status"] == "RMR_OK":
109                         mdc_logger.debug("Message sent successfully!")
110                         break
111
112             # read our mailbox and update statuses
113             for msg in self.rcv_func():
114                 try:
115                     pay = json.loads(msg["payload"])
116                     pti = pay["policy_type_id"]
117                     pii = pay["policy_instance_id"]
118                     data.set_policy_instance_status(pti, pii, pay["handler_id"], pay["status"])
119                 except (PolicyTypeNotFound, PolicyInstanceNotFound, KeyError, TypeError, json.decoder.JSONDecodeError):
120                     # TODO: in the future we may also have to catch SDL errors
121                     mdc_logger.debug("Dropping malformed or non applicable message: {0}".format(msg))
122
123             # TODO: what's a reasonable sleep time? we don't want to hammer redis too much, and a1 isn't a real time component
124             self.last_ran = time.time()
125             time.sleep(1)
126
127
128 # Public
129
130
131 def start_rmr_thread(init_func_override=None, rcv_func_override=None):
132     """
133     Start a1s rmr thread
134     """
135     global __RMR_LOOP__
136     if __RMR_LOOP__ is None:
137         __RMR_LOOP__ = _RmrLoop(init_func_override, rcv_func_override)
138
139
140 def stop_rmr_thread():
141     """
142     stops the rmr thread
143     """
144     __RMR_LOOP__.keep_going = False
145
146
147 def queue_work(item):
148     """
149     push an item into the work queue
150     currently the only type of work is to send out messages
151     """
152     __RMR_LOOP__.work_queue.put(item)
153
154
155 def healthcheck_rmr_thread(seconds=30):
156     """
157     returns a boolean representing whether the rmr loop is healthy, by checking two attributes:
158     1. is it running?,
159     2. is it stuck in a long (> seconds) loop?
160     """
161     return __RMR_LOOP__.thread.is_alive() and ((time.time() - __RMR_LOOP__.last_ran) < seconds)
162
163
164 def replace_rcv_func(rcv_func):
165     """purely for the ease of unit testing to test different rcv scenarios"""
166     __RMR_LOOP__.rcv_func = rcv_func