Block on RMR read to avoid 100% CPU usage on wait
[ric-plt/xapp-frame-py.git] / ricxappframe / xapp_frame.py
1 """
2 Framework for python xapps
3 Framework here means Xapp classes that can be subclassed
4 """
5 # ==================================================================================
6 #       Copyright (c) 2020 Nokia
7 #       Copyright (c) 2020 AT&T Intellectual Property.
8 #
9 #   Licensed under the Apache License, Version 2.0 (the "License");
10 #   you may not use this file except in compliance with the License.
11 #   You may obtain a copy of the License at
12 #
13 #          http://www.apache.org/licenses/LICENSE-2.0
14 #
15 #   Unless required by applicable law or agreed to in writing, software
16 #   distributed under the License is distributed on an "AS IS" BASIS,
17 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 #   See the License for the specific language governing permissions and
19 #   limitations under the License.
20 # ==================================================================================
21 from threading import Thread
22 from ricxappframe import xapp_rmr
23 from ricxappframe.xapp_sdl import SDLWrapper
24 from ricxappframe.rmr import rmr
25 from mdclogpy import Logger
26
27 # constants
28 RIC_HEALTH_CHECK_REQ = 100
29 RIC_HEALTH_CHECK_RESP = 101
30
31
32 # Private base class; not for direct client use
33
34
35 class _BaseXapp:
36     """
37     Base xapp; not for client use directly
38     """
39
40     def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
41         """
42         Init
43
44         Parameters
45         ----------
46         rmr_port: int
47             port to listen on
48
49         rmr_wait_for_ready: bool (optional)
50             if this is True, then init waits until rmr is ready to send, which includes having a valid routing file.
51             this can be set to False if the client only wants to *receive only*
52
53         use_fake_sdl: bool (optional)
54             if this is True, it uses dbaas' "fake dict backend" instead of Redis or other backends.
55             Set this to true when developing your xapp or during unit testing to completely avoid needing a dbaas running or any network at all
56
57         post_init: function (optional)
58             runs this user provided function after the base xapp is initialized
59             it's signature should be post_init(self)
60         """
61         # PUBLIC, can be used by xapps using self.(name):
62         self.logger = Logger(name=__name__)
63
64         # Start rmr rcv thread
65         self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready)
66         self._mrc = self._rmr_loop.mrc  # for convenience
67
68         # SDL
69         self._sdl = SDLWrapper(use_fake_sdl)
70
71         # run the optionally provided user post init
72         if post_init:
73             post_init(self)
74
75     # Public rmr methods
76
77     def rmr_get_messages(self):
78         """
79         Returns a generator iterable over all items in the queue that have not yet been read by the client xapp.
80         Each item is a tuple (S, sbuf) where S is a message summary dict and sbuf is the raw message.
81         The caller MUST call rmr.rmr_free_msg(sbuf) when finished with each sbuf to prevent memory leaks!
82         """
83         while not self._rmr_loop.rcv_queue.empty():
84             (summary, sbuf) = self._rmr_loop.rcv_queue.get()
85             yield (summary, sbuf)
86
87     def rmr_send(self, payload, mtype, retries=100):
88         """
89         Allocates a buffer, sets payload and mtype, and sends
90
91         Parameters
92         ----------
93         payload: bytes
94             payload to set
95         mtype: int
96             message type
97         retries: int (optional)
98             Number of times to retry at the application level before excepting RMRFailure
99
100         Returns
101         -------
102         bool
103             whether or not the send worked after retries attempts
104         """
105         sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True, mtype=mtype)
106
107         for _ in range(retries):
108             sbuf = rmr.rmr_send_msg(self._mrc, sbuf)
109             if sbuf.contents.state == 0:
110                 self.rmr_free(sbuf)
111                 return True
112
113         self.rmr_free(sbuf)
114         return False
115
116     def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
117         """
118         Allows the xapp to return to sender, possibly adjusting the payload and message type before doing so
119
120         This does NOT free the sbuf for the caller as the caller may wish to perform multiple rts per buffer.
121         The client needs to free.
122
123         Parameters
124         ----------
125         sbuf: ctypes c_void_p
126              Pointer to an rmr message buffer
127         new_payload: bytes (optional)
128             New payload to set
129         new_mtype: int (optional)
130             New message type (replaces the received message)
131         retries: int (optional)
132             Number of times to retry at the application level before excepting RMRFailure
133
134         Returns
135         -------
136         bool
137             whether or not the send worked after retries attempts
138         """
139         for _ in range(retries):
140             sbuf = rmr.rmr_rts_msg(self._mrc, sbuf, payload=new_payload, mtype=new_mtype)
141             if sbuf.contents.state == 0:
142                 return True
143
144         self.logger.info("RTS Failed! Summary: {}".format(rmr.message_summary(sbuf)))
145         return False
146
147     def rmr_free(self, sbuf):
148         """
149         Free an rmr message buffer after use
150
151         Note: this does not need to be a class method, self is not used. However if we break it out as a function we need a home for it.
152         Parameters
153         ----------
154         sbuf: ctypes c_void_p
155              Pointer to an rmr message buffer
156         """
157         rmr.rmr_free_msg(sbuf)
158
159     # SDL
160     # NOTE, even though these are passthroughs, the seperate SDL wrapper is useful for other applications like A1.
161     # Therefore, we don't embed that SDLWrapper functionality here so that it can be instantiated on it's own.
162
163     def sdl_set(self, ns, key, value, usemsgpack=True):
164         """
165         set a key
166
167         Parameters
168         ----------
169         ns: string
170            the sdl namespace
171         key: string
172             the sdl key
173         value:
174             if usemsgpack is True, value can be anything serializable by msgpack
175             if usemsgpack is False, value must be bytes
176         usemsgpack: boolean (optional)
177             determines whether the value is serialized using msgpack
178         """
179         self._sdl.set(ns, key, value, usemsgpack)
180
181     def sdl_get(self, ns, key, usemsgpack=True):
182         """
183         get a key
184
185         Parameters
186         ----------
187         ns: string
188            the sdl namespace
189         key: string
190             the sdl key
191         usemsgpack: boolean (optional)
192             if usemsgpack is True, the value is deserialized using msgpack
193             if usemsgpack is False, the value is returned as raw bytes
194
195         Returns
196         -------
197         None (if not exist) or see above; depends on usemsgpack
198         """
199         return self._sdl.get(ns, key, usemsgpack)
200
201     def sdl_find_and_get(self, ns, prefix, usemsgpack=True):
202         """
203         get all k v pairs that start with prefix
204
205         Parameters
206         ----------
207         ns: string
208            the sdl namespace
209         key: string
210             the sdl key
211         prefix: string
212             the prefix
213         usemsgpack: boolean (optional)
214             if usemsgpack is True, the value returned is a dict where each value has been deserialized using msgpack
215             if usemsgpack is False, the value returned is as a dict mapping keys to raw bytes
216
217         Returns
218         -------
219         {} (if no keys match) or see above; depends on usemsgpack
220         """
221         return self._sdl.find_and_get(ns, prefix, usemsgpack)
222
223     def sdl_delete(self, ns, key):
224         """
225         delete a key
226
227         Parameters
228         ----------
229         ns: string
230            the sdl namespace
231         key: string
232             the sdl key
233         """
234         self._sdl.delete(ns, key)
235
236     # Health
237
238     def healthcheck(self):
239         """
240         this needs to be understood how this is supposed to work
241         """
242         return self._rmr_loop.healthcheck() and self._sdl.healthcheck()
243
244     def stop(self):
245         """
246         cleans up and stops the xapp rmr thread (currently)
247         This is critical for unit testing as pytest will never return if the thread is running.
248
249         TODO: can we register a ctrl-c handler so this gets called on ctrl-c? Because currently two ctrl-c are needed to stop
250         """
251         self._rmr_loop.stop()
252
253
254 # Public Classes to subclass (these subclass _BaseXapp)
255
256
257 class RMRXapp(_BaseXapp):
258     """
259     Represents an xapp that is purely driven by rmr messages (i.e., when messages are received, the xapp does something
260     When run is called, the xapp framework waits for rmr messages, and calls the client provided consume callback on every one
261     """
262
263     def __init__(self, default_handler, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
264         """
265         Parameters
266         ----------
267         default_handler: function
268             a function with the signature (summary, sbuf) to be called when a message of type message_type is received
269             summary: dict
270                 the rmr message summary
271             sbuf: ctypes c_void_p
272                 Pointer to an rmr message buffer. The user must call free on this when done.
273
274         post_init: function (optional)
275             optionally runs this function after the app initializes and before the run loop
276             it's signature should be post_init(self)
277
278         For the other parameters, see _BaseXapp
279         """
280         # init base
281         super().__init__(
282             rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl, post_init=post_init
283         )
284
285         # setup callbacks
286         self._default_handler = default_handler
287         self._dispatch = {}
288
289         # used for thread control
290         self._keep_going = True
291
292         # register a default healthcheck handler
293         # this default checks that rmr is working and SDL is working
294         # the user can override this and register their own handler if they wish since the "last registered callback wins".
295         def handle_healthcheck(self, summary, sbuf):
296             ok = self.healthcheck()
297             payload = b"OK\n" if ok else b"ERROR [RMR or SDL is unhealthy]\n"
298             self.rmr_rts(sbuf, new_payload=payload, new_mtype=RIC_HEALTH_CHECK_RESP)
299             self.rmr_free(sbuf)
300
301         self.register_callback(handle_healthcheck, RIC_HEALTH_CHECK_REQ)
302
303     def register_callback(self, handler, message_type):
304         """
305         registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type
306
307         Parameters
308         ----------
309         handler: function
310             a function with the signature (summary, sbuf) to be called when a message of type message_type is received
311             summary: dict
312                 the rmr message summary
313             sbuf: ctypes c_void_p
314                 Pointer to an rmr message buffer. The user must call free on this when done.
315
316         message:type: int
317             the message type to look for
318
319         Note if this method is called multiple times for a single message type, the "last one wins".
320         """
321         self._dispatch[message_type] = handler
322
323     def run(self, thread=False):
324         """
325         This function should be called when the client xapp is ready to wait for their handlers to be called on received messages
326
327         Parameters
328         ----------
329         thread: bool (optional)
330             if thread is True, execution is returned to caller and the queue read loop is executed in a thread.
331             The thread can be stopped using .stop()
332             if False, execution is not returned and the framework loops
333         """
334
335         def loop():
336             while self._keep_going:
337                 if not self._rmr_loop.rcv_queue.empty():
338                     (summary, sbuf) = self._rmr_loop.rcv_queue.get()
339                     # dispatch
340                     func = self._dispatch.get(summary["message type"], None)
341                     if not func:
342                         func = self._default_handler
343                     func(self, summary, sbuf)
344
345         if thread:
346             Thread(target=loop).start()
347         else:
348             loop()
349
350     def stop(self):
351         """
352         stops the rmr xapp completely.
353         """
354         super().stop()
355         self.logger.debug("Stopping queue reading thread..")
356         self._keep_going = False
357
358
359 class Xapp(_BaseXapp):
360     """
361     Represents an xapp where the client provides a generic function to call, which is mostly likely a loop-forever loop
362     """
363
364     def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
365         """
366         Parameters
367         ----------
368         entrypoint: function
369             this function is called when the xapp runs; this is the user code
370             it's signature should be function(self)
371
372         For the other parameters, see _BaseXapp
373         """
374         # init base
375         super().__init__(rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl)
376         self._entrypoint = entrypoint
377
378     def run(self):
379         """
380         This function should be called when the client xapp is ready to start their code
381         """
382         self._entrypoint(self)
383
384     # there is no need for stop currently here (base has, and nothing special to do here)