Use blocking get call w/ timeout to read msg queue
[ric-plt/xapp-frame-py.git] / ricxappframe / xapp_rmr.py
1 # ==================================================================================
2 #       Copyright (c) 2020 Nokia
3 #       Copyright (c) 2020 AT&T Intellectual Property.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #          http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 # ==================================================================================
17
18 """
19 Contains RMR functionality specific to the xapp.
20 The general rmr API is via "rmr"
21 """
22
23 import time
24 import queue
25 from threading import Thread
26 from mdclogpy import Logger
27 from ricxappframe.rmr import rmr, helpers
28
29
30 mdc_logger = Logger(name=__name__)
31
32
33 class RmrLoop:
34     """
35     Class represents an RMR loop that constantly reads from RMR.
36
37     Note, we use a queue here, and a thread, rather than the xapp
38     frame just looping and calling consume, so that a possibly slow
39     running consume function does not block the reading of new messages.
40     """
41
42     def __init__(self, port, wait_for_ready=True):
43         """
44         sets up RMR, then launches a thread that reads and injects
45         messages into a queue.
46
47         Parameters
48         ----------
49         port: int
50             port to listen on
51
52         wait_for_ready: bool (optional)
53             If True, then this function hangs until RMR is ready to
54             send, which includes having a valid routing file. This can
55             be set to False if the client only wants to *receive only*.
56         """
57
58         # Public
59         # thread safe queue https://docs.python.org/3/library/queue.html
60         # We use a thread and a queue so that a long running consume callback function can
61         # never block reads. IE a consume implementation could take a long time and the ring
62         # size for rmr blows up here and messages are lost.
63         self.rcv_queue = queue.Queue()
64
65         # RMR context; RMRFL_MTCALL puts RMR into a multithreaded mode, where a thread
66         # populates a ring of messages that receive calls read from
67         self.mrc = rmr.rmr_init(str(port).encode(), rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
68
69         if wait_for_ready:
70             mdc_logger.debug("Waiting for rmr to init on port {}..".format(port))
71             while rmr.rmr_ready(self.mrc) == 0:
72                 time.sleep(0.1)
73
74         # Private
75         self._keep_going = True  # used to tell this thread to stop
76         self._last_ran = time.time()  # used for healthcheck
77         self._loop_is_running = False  # used in stop to know when it's safe to kill the mrc
78
79         def loop():
80             mdc_logger.debug("Work loop starts")
81             self._loop_is_running = True
82             while self._keep_going:
83
84                 # read our mailbox
85                 # TODO: take a flag as to whether RAW is needed or not
86                 # RAW allows for RTS however the caller must free, and
87                 # the caller may not need RTS. Currently after
88                 # consuming, callers must call rmr.rmr_free_msg(sbuf)
89                 # Use a non-trivial timeout to avoid spinning the CPU.
90                 # The function returns if no messages arrive for that
91                 # interval, which allows a stop request to be processed.
92                 for (msg, sbuf) in helpers.rmr_rcvall_msgs_raw(self.mrc, timeout=5000):
93                     self.rcv_queue.put((msg, sbuf))
94
95                 self._last_ran = time.time()
96
97             self._loop_is_running = False
98             mdc_logger.debug("Work loop ends")
99
100         # start the work loop
101         mdc_logger.debug("Starting loop thread")
102         self._thread = Thread(target=loop)
103         self._thread.start()
104
105     def stop(self):
106         """
107         sets a flag that will cleanly stop the thread
108         """
109         # wait until the current batch of messages is done, then kill
110         # the rmr connection. I debated putting this in "loop" however
111         # if the while loop was still going, setting mrc to close here
112         # would blow up any processing still currently happening.
113         # Probably more polite to at least finish the current batch
114         # and then close. So here we wait until the current batch is
115         # done, then we kill the mrc.
116         mdc_logger.debug("Setting flag to end RMR work loop.")
117         self._keep_going = False
118         while self._loop_is_running:
119             time.sleep(1)
120             mdc_logger.debug("Waiting for RMR work loop to end")
121         mdc_logger.debug("Closing RMR connection")
122         rmr.rmr_close(self.mrc)
123
124     def healthcheck(self, seconds=30):
125         """
126         returns a boolean representing whether the rmr loop is healthy, by checking two attributes:
127         1. is it running?,
128         2. is it stuck in a long (> seconds) loop?
129
130         Parameters
131         ----------
132         seconds: int (optional)
133             the rmr loop is determined healthy if it has completed in the last (seconds)
134         """
135         return self._thread.is_alive() and ((time.time() - self._last_ran) < seconds)