Use blocking get call w/ timeout to read msg queue
[ric-plt/xapp-frame-py.git] / ricxappframe / xapp_frame.py
1 # ==================================================================================
2 #       Copyright (c) 2020 Nokia
3 #       Copyright (c) 2020 AT&T Intellectual Property.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #          http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 # ==================================================================================
17 """
18 Framework for python xapps
19 Framework here means Xapp classes that can be subclassed
20 """
21
22 import queue
23 from threading import Thread
24 from ricxappframe import xapp_rmr
25 from ricxappframe.rmr import rmr
26 from ricxappframe.xapp_sdl import SDLWrapper
27 from mdclogpy import Logger
28
29 # constants
30 RIC_HEALTH_CHECK_REQ = 100
31 RIC_HEALTH_CHECK_RESP = 101
32
33
34 # Private base class; not for direct client use
35
36
37 class _BaseXapp:
38     """
39     Base xapp; not for client use directly
40     """
41
42     def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
43         """
44         Init
45
46         Parameters
47         ----------
48         rmr_port: int
49             port to listen on
50
51         rmr_wait_for_ready: bool (optional)
52
53             if this is True, then init waits until rmr is ready to send, which
54             includes having a valid routing file. This can be set to
55             False if the client only wants to *receive only*.
56
57         use_fake_sdl: bool (optional)
58             if this is True, it uses dbaas' "fake dict backend" instead
59             of Redis or other backends. Set this to true when developing
60             your xapp or during unit testing to completely avoid needing
61             a dbaas running or any network at all.
62
63         post_init: function (optional)
64             runs this user provided function after the base xapp is
65             initialized; its signature should be post_init(self)
66         """
67         # PUBLIC, can be used by xapps using self.(name):
68         self.logger = Logger(name=__name__)
69
70         # Start rmr rcv thread
71         self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready)
72         self._mrc = self._rmr_loop.mrc  # for convenience
73
74         # SDL
75         self._sdl = SDLWrapper(use_fake_sdl)
76
77         # run the optionally provided user post init
78         if post_init:
79             post_init(self)
80
81     # Public rmr methods
82
83     def rmr_get_messages(self):
84         """
85         Returns a generator iterable over all items in the queue that
86         have not yet been read by the client xapp. Each item is a tuple
87         (S, sbuf) where S is a message summary dict and sbuf is the raw
88         message. The caller MUST call rmr.rmr_free_msg(sbuf) when
89         finished with each sbuf to prevent memory leaks!
90         """
91         while not self._rmr_loop.rcv_queue.empty():
92             (summary, sbuf) = self._rmr_loop.rcv_queue.get()
93             yield (summary, sbuf)
94
95     def rmr_send(self, payload, mtype, retries=100):
96         """
97         Allocates a buffer, sets payload and mtype, and sends
98
99         Parameters
100         ----------
101         payload: bytes
102             payload to set
103         mtype: int
104             message type
105         retries: int (optional)
106             Number of times to retry at the application level before excepting RMRFailure
107
108         Returns
109         -------
110         bool
111             whether or not the send worked after retries attempts
112         """
113         sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True, mtype=mtype)
114
115         for _ in range(retries):
116             sbuf = rmr.rmr_send_msg(self._mrc, sbuf)
117             if sbuf.contents.state == 0:
118                 self.rmr_free(sbuf)
119                 return True
120
121         self.rmr_free(sbuf)
122         return False
123
124     def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
125         """
126         Allows the xapp to return to sender, possibly adjusting the
127         payload and message type before doing so.  This does NOT free
128         the sbuf for the caller as the caller may wish to perform
129         multiple rts per buffer. The client needs to free.
130
131         Parameters
132         ----------
133         sbuf: ctypes c_void_p
134              Pointer to an rmr message buffer
135         new_payload: bytes (optional)
136             New payload to set
137         new_mtype: int (optional)
138             New message type (replaces the received message)
139         retries: int (optional)
140             Number of times to retry at the application level before
141             throwing exception RMRFailure
142
143         Returns
144         -------
145         bool
146             whether or not the send worked after retries attempts
147         """
148         for _ in range(retries):
149             sbuf = rmr.rmr_rts_msg(self._mrc, sbuf, payload=new_payload, mtype=new_mtype)
150             if sbuf.contents.state == 0:
151                 return True
152
153         self.logger.info("RTS Failed! Summary: {}".format(rmr.message_summary(sbuf)))
154         return False
155
156     def rmr_free(self, sbuf):
157         """
158         Frees an rmr message buffer after use
159
160         Note: this does not need to be a class method, self is not
161         used. However if we break it out as a function we need a home
162         for it.
163
164         Parameters
165         ----------
166         sbuf: ctypes c_void_p
167              Pointer to an rmr message buffer
168         """
169         rmr.rmr_free_msg(sbuf)
170
171     # SDL
172     # NOTE, even though these are passthroughs, the seperate SDL wrapper
173     # is useful for other applications like A1. Therefore, we don't
174     # embed that SDLWrapper functionality here so that it can be
175     # instantiated on its own.
176
177     def sdl_set(self, ns, key, value, usemsgpack=True):
178         """
179         set a key
180
181         Parameters
182         ----------
183         ns: string
184            the sdl namespace
185         key: string
186             the sdl key
187         value:
188             if usemsgpack is True, value can be anything serializable by msgpack
189             if usemsgpack is False, value must be bytes
190         usemsgpack: boolean (optional)
191             determines whether the value is serialized using msgpack
192         """
193         self._sdl.set(ns, key, value, usemsgpack)
194
195     def sdl_get(self, ns, key, usemsgpack=True):
196         """
197         get a key
198
199         Parameters
200         ----------
201         ns: string
202            the sdl namespace
203         key: string
204             the sdl key
205         usemsgpack: boolean (optional)
206             if usemsgpack is True, the value is deserialized using msgpack
207             if usemsgpack is False, the value is returned as raw bytes
208
209         Returns
210         -------
211         None (if not exist) or see above; depends on usemsgpack
212         """
213         return self._sdl.get(ns, key, usemsgpack)
214
215     def sdl_find_and_get(self, ns, prefix, usemsgpack=True):
216         """
217         get all k v pairs that start with prefix
218
219         Parameters
220         ----------
221         ns: string
222            the sdl namespace
223         key: string
224             the sdl key
225         prefix: string
226             the prefix
227         usemsgpack: boolean (optional)
228             if usemsgpack is True, the value returned is a dict where each value has been deserialized using msgpack
229             if usemsgpack is False, the value returned is as a dict mapping keys to raw bytes
230
231         Returns
232         -------
233         {} (if no keys match) or see above; depends on usemsgpack
234         """
235         return self._sdl.find_and_get(ns, prefix, usemsgpack)
236
237     def sdl_delete(self, ns, key):
238         """
239         delete a key
240
241         Parameters
242         ----------
243         ns: string
244            the sdl namespace
245         key: string
246             the sdl key
247         """
248         self._sdl.delete(ns, key)
249
250     # Health
251
252     def healthcheck(self):
253         """
254         this needs to be understood how this is supposed to work
255         """
256         return self._rmr_loop.healthcheck() and self._sdl.healthcheck()
257
258     def stop(self):
259         """
260         cleans up and stops the xapp rmr thread (currently). This is
261         critical for unit testing as pytest will never return if the
262         thread is running.
263
264         TODO: can we register a ctrl-c handler so this gets called on
265         ctrl-c? Because currently two ctrl-c are needed to stop.
266         """
267         self._rmr_loop.stop()
268
269
270 # Public Classes to subclass (these subclass _BaseXapp)
271
272
273 class RMRXapp(_BaseXapp):
274     """
275     Represents an xapp that is purely driven by RMR messages; i.e., when
276     messages are received, the xapp does something. When run is called,
277     the xapp framework waits for rmr messages, and calls the
278     client-provided consume callback on every one.
279     """
280
281     def __init__(self, default_handler, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
282         """
283         Parameters
284         ----------
285         default_handler: function
286             a function with the signature (summary, sbuf) to be called
287             when a message of type message_type is received.
288         summary: dict
289             the rmr message summary
290         sbuf: ctypes c_void_p
291             Pointer to an rmr message buffer. The user must call free on
292             this when done.
293         post_init: function (optional)
294             optionally runs this function after the app initializes and
295         before the run loop; its signature should be post_init(self)
296
297         For the other parameters, see _BaseXapp
298         """
299         # init base
300         super().__init__(
301             rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl, post_init=post_init
302         )
303
304         # setup callbacks
305         self._default_handler = default_handler
306         self._dispatch = {}
307
308         # used for thread control
309         self._keep_going = True
310
311         # register a default healthcheck handler
312         # this default checks that rmr is working and SDL is working
313         # the user can override this and register their own handler
314         # if they wish since the "last registered callback wins".
315         def handle_healthcheck(self, summary, sbuf):
316             ok = self.healthcheck()
317             payload = b"OK\n" if ok else b"ERROR [RMR or SDL is unhealthy]\n"
318             self.rmr_rts(sbuf, new_payload=payload, new_mtype=RIC_HEALTH_CHECK_RESP)
319             self.rmr_free(sbuf)
320
321         self.register_callback(handle_healthcheck, RIC_HEALTH_CHECK_REQ)
322
323     def register_callback(self, handler, message_type):
324         """
325         registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type
326
327         Parameters
328         ----------
329         handler: function
330             a function with the signature (summary, sbuf) to be called
331             when a message of type message_type is received
332         summary: dict
333             the rmr message summary
334         sbuf: ctypes c_void_p
335             Pointer to an rmr message buffer. The user must call free on this when done.
336
337         message:type: int
338             the message type to look for
339
340         Note if this method is called multiple times for a single message type, the "last one wins".
341         """
342         self._dispatch[message_type] = handler
343
344     def run(self, thread=False):
345         """
346         This function should be called when the client xapp is ready to
347         wait for its handlers to be called on received messages.
348
349         Parameters
350         ----------
351         thread: bool (optional)
352             If True, a thread is started to run the queue read/dispatch loop
353             and execution is returned to caller; the thread can be stopped
354             by calling .stop(). If False (the default), execution is not
355             returned and the framework loops forever.
356         """
357
358         def loop():
359             while self._keep_going:
360                 try:
361                     (summary, sbuf) = self._rmr_loop.rcv_queue.get(block=True, timeout=5)
362                     # dispatch
363                     func = self._dispatch.get(summary[rmr.RMR_MS_MSG_TYPE], None)
364                     if not func:
365                         func = self._default_handler
366                     func(self, summary, sbuf)
367                 except queue.Empty:
368                     # the get timed out
369                     pass
370
371         if thread:
372             Thread(target=loop).start()
373         else:
374             loop()
375
376     def stop(self):
377         """
378         Sets the flag to end the dispatch loop.
379         """
380         super().stop()
381         self.logger.debug("Setting flag to end framework work loop.")
382         self._keep_going = False
383
384
385 class Xapp(_BaseXapp):
386     """
387     Represents an xapp where the client provides a generic function to
388     call, which is mostly likely a loop-forever loop.
389     """
390
391     def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
392         """
393         Parameters
394         ----------
395         entrypoint: function
396             this function is called when the xapp runs; this is the user code.
397             its signature should be function(self)
398
399         For the other parameters, see _BaseXapp
400         """
401         # init base
402         super().__init__(rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl)
403         self._entrypoint = entrypoint
404
405     def run(self):
406         """
407         This function should be called when the client xapp is ready to
408         start their code.
409         """
410         self._entrypoint(self)
411
412     # there is no need for stop currently here (base has, and nothing
413     # special to do here)