93eaeeb7523628aaa0caf69e6e8ef4d4dea9f189
[ric-plt/xapp-frame-py.git] / ricxappframe / xapp_rmr.py
1 # ==================================================================================
2 #       Copyright (c) 2020 Nokia
3 #       Copyright (c) 2020 AT&T Intellectual Property.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #          http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 # ==================================================================================
17
18 """
19 Contains RMR functionality specific to the xapp.
20 The general rmr API is via "rmr"
21 """
22
23 import time
24 import queue
25 from threading import Thread
26 from mdclogpy import Logger
27 from ricxappframe.rmr import rmr, helpers
28
29
30 mdc_logger = Logger(name=__name__)
31
32
33 class RmrLoop:
34     """
35     Class represents an RMR loop that constantly reads from RMR.
36
37     Note, we use a queue here, and a thread, rather than the xapp frame just looping
38     and calling consume, so that a possibly slow running consume function does not
39     block the reading of new messages
40     """
41
42     def __init__(self, port, wait_for_ready=True):
43         """
44         sets up RMR, then launches a thread that reads and injects messages into a queue.
45
46         Parameters
47         ----------
48         port: int
49             port to listen on
50
51         wait_for_ready: bool (optional)
52             If True, then this function hangs until RMR is ready to send, which includes
53             having a valid routing file. This can be set to False if the client only wants
54             to *receive only*.
55         """
56
57         # Public
58         # thread safe queue https://docs.python.org/3/library/queue.html
59         # We use a thread and a queue so that a long running consume callback function can
60         # never block reads. IE a consume implementation could take a long time and the ring
61         # size for rmr blows up here and messages are lost.
62         self.rcv_queue = queue.Queue()
63
64         # RMR context; RMRFL_MTCALL puts RMR into a multithreaded mode, where a thread
65         # populates a ring of messages that receive calls read from
66         self.mrc = rmr.rmr_init(str(port).encode(), rmr.RMR_MAX_RCV_BYTES, rmr.RMRFL_MTCALL)
67
68         if wait_for_ready:
69             mdc_logger.debug("Waiting for rmr to init on port {}..".format(port))
70             while rmr.rmr_ready(self.mrc) == 0:
71                 time.sleep(0.1)
72
73         # Private
74         self._keep_going = True  # used to tell this thread to stop
75         self._last_ran = time.time()  # used for healthcheck
76         self._loop_is_running = False  # used in stop to know when it's safe to kill the mrc
77
78         def loop():
79             mdc_logger.debug("Work loop starts")
80             self._loop_is_running = True
81             while self._keep_going:
82
83                 # read our mailbox
84                 # TODO: take a flag as to whether RAW is needed or not
85                 # RAW allows for RTS however the caller must free, and the caller may not need RTS.
86                 # Currently after consuming, callers should do rmr.rmr_free_msg(sbuf)
87
88                 for (msg, sbuf) in helpers.rmr_rcvall_msgs_raw(self.mrc, timeout=1000):
89                     self.rcv_queue.put((msg, sbuf))
90
91                 self._last_ran = time.time()
92
93             self._loop_is_running = False
94             mdc_logger.debug("Work loop ends")
95
96         # start the work loop
97         mdc_logger.debug("Starting loop thread")
98         self._thread = Thread(target=loop)
99         self._thread.start()
100
101     def stop(self):
102         """
103         sets a flag that will cleanly stop the thread
104         """
105         mdc_logger.debug("Stopping RMR thread. Waiting for last iteration to finish..")
106         self._keep_going = False
107         # wait until the current batch of messages is done, then kill the rmr connection
108         # note; I debated putting this in "loop" however if the while loop was still going
109         # setting mrc to close here would blow up any processing still currently happening
110         # probably more polite to at least finish the current batch and then close. So here
111         # we wait until the current batch is done, then we kill the mrc
112         while self._loop_is_running:
113             time.sleep(0.5)
114         mdc_logger.debug("Closing rmr connection")
115         rmr.rmr_close(self.mrc)
116
117     def healthcheck(self, seconds=30):
118         """
119         returns a boolean representing whether the rmr loop is healthy, by checking two attributes:
120         1. is it running?,
121         2. is it stuck in a long (> seconds) loop?
122
123         Parameters
124         ----------
125         seconds: int (optional)
126             the rmr loop is determined healthy if it has completed in the last (seconds)
127         """
128         return self._thread.is_alive() and ((time.time() - self._last_ran) < seconds)