87ab27e8e839430933e4cb1a9b5b993b333c3c7b
[ric-plt/xapp-frame-py.git] / ricxappframe / xapp_frame.py
1 # ==================================================================================
2 #       Copyright (c) 2020 Nokia
3 #       Copyright (c) 2020 AT&T Intellectual Property.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #          http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 # ==================================================================================
17 """
18 Framework for python xapps
19 Framework here means Xapp classes that can be subclassed
20 """
21
22 import queue
23 from threading import Thread
24 from ricxappframe import xapp_rmr
25 from ricxappframe.rmr import rmr
26 from ricxappframe.xapp_sdl import SDLWrapper
27 from mdclogpy import Logger
28
29 # constants
30 RIC_HEALTH_CHECK_REQ = 100
31 RIC_HEALTH_CHECK_RESP = 101
32
33
34 # Private base class; not for direct client use
35
36
37 class _BaseXapp:
38     """
39     Base xapp; not for client use directly
40     """
41
42     def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
43         """
44         Init
45
46         Parameters
47         ----------
48         rmr_port: int
49             port to listen on
50
51         rmr_wait_for_ready: bool (optional)
52
53             if this is True, then init waits until rmr is ready to send, which
54             includes having a valid routing file. This can be set to
55             False if the client only wants to *receive only*.
56
57         use_fake_sdl: bool (optional)
58             if this is True, it uses dbaas' "fake dict backend" instead
59             of Redis or other backends. Set this to true when developing
60             your xapp or during unit testing to completely avoid needing
61             a dbaas running or any network at all.
62
63         post_init: function (optional)
64             runs this user provided function after the base xapp is
65             initialized; its signature should be post_init(self)
66         """
67         # PUBLIC, can be used by xapps using self.(name):
68         self.logger = Logger(name=__name__)
69
70         # Start rmr rcv thread
71         self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready)
72         self._mrc = self._rmr_loop.mrc  # for convenience
73
74         # SDL
75         self._sdl = SDLWrapper(use_fake_sdl)
76
77         # run the optionally provided user post init
78         if post_init:
79             post_init(self)
80
81     # Public rmr methods
82
83     def rmr_get_messages(self):
84         """
85         Returns a generator iterable over all items in the queue that
86         have not yet been read by the client xapp. Each item is a tuple
87         (S, sbuf) where S is a message summary dict and sbuf is the raw
88         message. The caller MUST call rmr.rmr_free_msg(sbuf) when
89         finished with each sbuf to prevent memory leaks!
90         """
91         while not self._rmr_loop.rcv_queue.empty():
92             (summary, sbuf) = self._rmr_loop.rcv_queue.get()
93             yield (summary, sbuf)
94
95     def rmr_send(self, payload, mtype, retries=100):
96         """
97         Allocates a buffer, sets payload and mtype, and sends
98
99         Parameters
100         ----------
101         payload: bytes
102             payload to set
103         mtype: int
104             message type
105         retries: int (optional)
106             Number of times to retry at the application level before excepting RMRFailure
107
108         Returns
109         -------
110         bool
111             whether or not the send worked after retries attempts
112         """
113         sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True, mtype=mtype)
114
115         for _ in range(retries):
116             sbuf = rmr.rmr_send_msg(self._mrc, sbuf)
117             if sbuf.contents.state == 0:
118                 self.rmr_free(sbuf)
119                 return True
120
121         self.rmr_free(sbuf)
122         return False
123
124     def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
125         """
126         Allows the xapp to return to sender, possibly adjusting the
127         payload and message type before doing so.  This does NOT free
128         the sbuf for the caller as the caller may wish to perform
129         multiple rts per buffer. The client needs to free.
130
131         Parameters
132         ----------
133         sbuf: ctypes c_void_p
134              Pointer to an rmr message buffer
135         new_payload: bytes (optional)
136             New payload to set
137         new_mtype: int (optional)
138             New message type (replaces the received message)
139         retries: int (optional)
140             Number of times to retry at the application level before
141             throwing exception RMRFailure
142
143         Returns
144         -------
145         bool
146             whether or not the send worked after retries attempts
147         """
148         for _ in range(retries):
149             sbuf = rmr.rmr_rts_msg(self._mrc, sbuf, payload=new_payload, mtype=new_mtype)
150             if sbuf.contents.state == 0:
151                 return True
152
153         self.logger.info("RTS Failed! Summary: {}".format(rmr.message_summary(sbuf)))
154         return False
155
156     def rmr_free(self, sbuf):
157         """
158         Frees an rmr message buffer after use
159
160         Note: this does not need to be a class method, self is not
161         used. However if we break it out as a function we need a home
162         for it.
163
164         Parameters
165         ----------
166         sbuf: ctypes c_void_p
167              Pointer to an rmr message buffer
168         """
169         rmr.rmr_free_msg(sbuf)
170
171     # SDL
172     # NOTE, even though these are passthroughs, the separate SDL wrapper
173     # is useful for other applications like A1. Therefore, we don't
174     # embed that SDLWrapper functionality here so that it can be
175     # instantiated on its own.
176
177     def sdl_set(self, ns, key, value, usemsgpack=True):
178         """
179         Stores a key-value pair,
180         optionally serializing the value to bytes using msgpack.
181
182         Parameters
183         ----------
184         ns: string
185             SDL namespace
186         key: string
187             SDL key
188         value:
189             Object or byte array to store.  See the `usemsgpack` parameter.
190         usemsgpack: boolean (optional, default is True)
191             Determines whether the value is serialized using msgpack before storing.
192             If usemsgpack is True, the msgpack function `packb` is invoked
193             on the value to yield a byte array that is then sent to SDL.
194             Stated differently, if usemsgpack is True, the value can be anything
195             that is serializable by msgpack.
196             If usemsgpack is False, the value must be bytes.
197         """
198         self._sdl.set(ns, key, value, usemsgpack)
199
200     def sdl_get(self, ns, key, usemsgpack=True):
201         """
202         Gets the value for the specified namespace and key,
203         optionally deserializing stored bytes using msgpack.
204
205         Parameters
206         ----------
207         ns: string
208             SDL namespace
209         key: string
210             SDL key
211         usemsgpack: boolean (optional, default is True)
212             If usemsgpack is True, the byte array stored by SDL is deserialized
213             using msgpack to yield the original object that was stored.
214             If usemsgpack is False, the byte array stored by SDL is returned
215             without further processing.
216
217         Returns
218         -------
219         Value
220             See the usemsgpack parameter for an explanation of the returned value type.
221             Answers None if the key is not found.
222         """
223         return self._sdl.get(ns, key, usemsgpack)
224
225     def sdl_find_and_get(self, ns, prefix, usemsgpack=True):
226         """
227         Gets all key-value pairs in the specified namespace
228         with keys that start with the specified prefix,
229         optionally deserializing stored bytes using msgpack.
230
231         Parameters
232         ----------
233         ns: string
234            SDL namespace
235         prefix: string
236             the key prefix
237         usemsgpack: boolean (optional, default is True)
238             If usemsgpack is True, the byte array stored by SDL is deserialized
239             using msgpack to yield the original value that was stored.
240             If usemsgpack is False, the byte array stored by SDL is returned
241             without further processing.
242
243         Returns
244         -------
245         Dictionary of key-value pairs
246             Each key has the specified prefix.
247             The value object (its type) depends on the usemsgpack parameter,
248             but is either a Python object or raw bytes as discussed above.
249             Answers an empty dictionary if no keys matched the prefix.
250         """
251         return self._sdl.find_and_get(ns, prefix, usemsgpack)
252
253     def sdl_delete(self, ns, key):
254         """
255         Deletes the key-value pair with the specified key in the specified namespace.
256
257         Parameters
258         ----------
259         ns: string
260            SDL namespace
261         key: string
262             SDL key
263         """
264         self._sdl.delete(ns, key)
265
266     # Health
267
268     def healthcheck(self):
269         """
270         this needs to be understood how this is supposed to work
271         """
272         return self._rmr_loop.healthcheck() and self._sdl.healthcheck()
273
274     def stop(self):
275         """
276         cleans up and stops the xapp rmr thread (currently). This is
277         critical for unit testing as pytest will never return if the
278         thread is running.
279
280         TODO: can we register a ctrl-c handler so this gets called on
281         ctrl-c? Because currently two ctrl-c are needed to stop.
282         """
283         self._rmr_loop.stop()
284
285
286 # Public Classes to subclass (these subclass _BaseXapp)
287
288
289 class RMRXapp(_BaseXapp):
290     """
291     Represents an Xapp that reacts only to RMR messages; i.e., when
292     messages are received, the Xapp does something. When run is called,
293     the xapp framework waits for RMR messages, and calls the appropriate
294     client-registered consume callback on each.
295
296     Parameters
297     ----------
298     default_handler: function
299         A function with the signature (summary, sbuf) to be called
300         when a message type is received for which no other handler is registered.
301     default_handler argument summary: dict
302         The RMR message summary, a dict of key-value pairs
303     default_handler argument sbuf: ctypes c_void_p
304         Pointer to an RMR message buffer. The user must call free on this when done.
305     rmr_port: integer (optional, default is 4562)
306         Initialize RMR to listen on this port
307     rmr_wait_for_ready: boolean (optional, default is True)
308         Wait for RMR to signal ready before starting the dispatch loop
309     use_fake_sdl: boolean (optional, default is False)
310         Use an in-memory store instead of the real SDL service
311     post_init: function (optional, default None)
312         Run this function after the app initializes and before the dispatch loop starts;
313         its signature should be post_init(self)
314     """
315
316     def __init__(self, default_handler, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
317         """
318         Also see _BaseXapp
319         """
320         # init base
321         super().__init__(
322             rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl, post_init=post_init
323         )
324
325         # setup callbacks
326         self._default_handler = default_handler
327         self._dispatch = {}
328
329         # used for thread control
330         self._keep_going = True
331
332         # register a default healthcheck handler
333         # this default checks that rmr is working and SDL is working
334         # the user can override this and register their own handler
335         # if they wish since the "last registered callback wins".
336         def handle_healthcheck(self, summary, sbuf):
337             ok = self.healthcheck()
338             payload = b"OK\n" if ok else b"ERROR [RMR or SDL is unhealthy]\n"
339             self.rmr_rts(sbuf, new_payload=payload, new_mtype=RIC_HEALTH_CHECK_RESP)
340             self.rmr_free(sbuf)
341
342         self.register_callback(handle_healthcheck, RIC_HEALTH_CHECK_REQ)
343
344     def register_callback(self, handler, message_type):
345         """
346         registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type
347
348         Parameters
349         ----------
350         handler: function
351             a function with the signature (summary, sbuf) to be called
352             when a message of type message_type is received
353         summary: dict
354             the rmr message summary
355         sbuf: ctypes c_void_p
356             Pointer to an rmr message buffer. The user must call free on this when done.
357
358         message:type: int
359             the message type to look for
360
361         Note if this method is called multiple times for a single message type, the "last one wins".
362         """
363         self._dispatch[message_type] = handler
364
365     def run(self, thread=False):
366         """
367         This function should be called when the reactive Xapp is ready to start.
368         After start, the Xapp's handlers will be called on received messages.
369
370         Parameters
371         ----------
372         thread: bool (optional, default is False)
373             If False, execution is not returned and the framework loops forever.
374             If True, a thread is started to run the queue read/dispatch loop
375             and execution is returned to caller; the thread can be stopped
376             by calling the .stop() method.
377         """
378
379         def loop():
380             while self._keep_going:
381                 try:
382                     (summary, sbuf) = self._rmr_loop.rcv_queue.get(block=True, timeout=5)
383                     # dispatch
384                     func = self._dispatch.get(summary[rmr.RMR_MS_MSG_TYPE], None)
385                     if not func:
386                         func = self._default_handler
387                     func(self, summary, sbuf)
388                 except queue.Empty:
389                     # the get timed out
390                     pass
391
392         if thread:
393             Thread(target=loop).start()
394         else:
395             loop()
396
397     def stop(self):
398         """
399         Sets the flag to end the dispatch loop.
400         """
401         super().stop()
402         self.logger.debug("Setting flag to end framework work loop.")
403         self._keep_going = False
404
405
406 class Xapp(_BaseXapp):
407     """
408     Represents a generic Xapp where the client provides a function for the framework to call,
409     which usually contains a loop-forever construct.
410
411     Parameters
412     ----------
413     entrypoint: function
414         This function is called when the Xapp class's run method is invoked.
415         The function signature must be just function(self)
416     rmr_port: integer (optional, default is 4562)
417         Initialize RMR to listen on this port
418     rmr_wait_for_ready: boolean (optional, default is True)
419         Wait for RMR to signal ready before starting the dispatch loop
420     use_fake_sdl: boolean (optional, default is False)
421         Use an in-memory store instead of the real SDL service
422     """
423
424     def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
425         """
426         Parameters
427         ----------
428
429         For the other parameters, see class _BaseXapp.
430         """
431         # init base
432         super().__init__(rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl)
433         self._entrypoint = entrypoint
434
435     def run(self):
436         """
437         This function should be called when the general Xapp is ready to start.
438         """
439         self._entrypoint(self)
440
441     # there is no need for stop currently here (base has, and nothing
442     # special to do here)