Add sdlpy wrapping functions
[ric-plt/xapp-frame-py.git] / ricxappframe / xapp_frame.py
1 # ==================================================================================
2 #       Copyright (c) 2020 Nokia
3 #       Copyright (c) 2020 AT&T Intellectual Property.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #          http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 # ==================================================================================
17 """
18 This framework for Python Xapps provides classes that Xapp writers
19 should instantiate and/or subclass depending on their needs.
20 """
21
22 import json
23 import os
24 import queue
25 from threading import Thread
26 import inotify_simple
27 from mdclogpy import Logger
28 from ricxappframe import xapp_rmr
29 from ricxappframe.rmr import rmr
30 from ricxappframe.xapp_sdl import SDLWrapper
31
32 # message-type constants
33 RIC_HEALTH_CHECK_REQ = 100
34 RIC_HEALTH_CHECK_RESP = 101
35
36 # environment variable with path to configuration file
37 CONFIG_FILE_ENV = "CONFIG_FILE"
38
39
40 class _BaseXapp:
41     """
42     This class initializes RMR, starts a thread that checks for incoming
43     messages, provisions an SDL object and optionally creates a
44     config-file watcher.  This private base class should not be
45     instantiated by clients directly, but it defines many public methods
46     that may be used by clients.
47
48     If environment variable CONFIG_FILE is defined, and that variable
49     contains a path to an existing file, a watcher is defined to monitor
50     modifications (writes) to that file using the Linux kernel's inotify
51     feature. The watcher must be polled by calling method
52     config_check().
53
54     Parameters
55     ----------
56     rmr_port: int (optional, default is 4562)
57         Port on which the RMR library listens for incoming messages.
58
59     rmr_wait_for_ready: bool (optional, default is True)
60         If this is True, then init waits until RMR is ready to send,
61         which includes having a valid routing file. This can be set
62         to False if the client wants to *receive only*.
63
64     use_fake_sdl: bool (optional, default is False)
65         if this is True, it uses the DBaaS "fake dict backend" instead
66         of Redis or other backends. Set this to True when developing
67         an xapp or during unit testing to eliminate the need for DBaaS.
68
69     post_init: function (optional, default is None)
70         Runs this user-provided function at the end of the init method;
71         its signature should be post_init(self)
72     """
73
74     def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
75         """
76         Documented in the class comment.
77         """
78         # PUBLIC, can be used by xapps using self.(name):
79         self.logger = Logger(name=__name__)
80
81         # Start rmr rcv thread
82         self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready)
83         self._mrc = self._rmr_loop.mrc  # for convenience
84
85         # SDL
86         self.sdl = SDLWrapper(use_fake_sdl)
87
88         # Config
89         # The environment variable specifies the path to the Xapp config file
90         self._config_path = os.environ.get(CONFIG_FILE_ENV, None)
91         if self._config_path and os.path.isfile(self._config_path):
92             self._inotify = inotify_simple.INotify()
93             self._inotify.add_watch(self._config_path, inotify_simple.flags.MODIFY)
94             self.logger.debug("__init__: watching config file {}".format(self._config_path))
95         else:
96             self._inotify = None
97             self.logger.warning("__init__: NOT watching any config file")
98
99         # run the optionally provided user post init
100         if post_init:
101             post_init(self)
102
103     # Public rmr methods
104
105     def rmr_get_messages(self):
106         """
107         Returns a generator iterable over all items in the queue that
108         have not yet been read by the client xapp. Each item is a tuple
109         (S, sbuf) where S is a message summary dict and sbuf is the raw
110         message. The caller MUST call rmr.rmr_free_msg(sbuf) when
111         finished with each sbuf to prevent memory leaks!
112         """
113         while not self._rmr_loop.rcv_queue.empty():
114             (summary, sbuf) = self._rmr_loop.rcv_queue.get()
115             yield (summary, sbuf)
116
117     def rmr_send(self, payload, mtype, retries=100):
118         """
119         Allocates a buffer, sets payload and mtype, and sends
120
121         Parameters
122         ----------
123         payload: bytes
124             payload to set
125         mtype: int
126             message type
127         retries: int (optional)
128             Number of times to retry at the application level before excepting RMRFailure
129
130         Returns
131         -------
132         bool
133             whether or not the send worked after retries attempts
134         """
135         sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True, mtype=mtype)
136
137         for _ in range(retries):
138             sbuf = rmr.rmr_send_msg(self._mrc, sbuf)
139             if sbuf.contents.state == 0:
140                 self.rmr_free(sbuf)
141                 return True
142
143         self.rmr_free(sbuf)
144         return False
145
146     def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
147         """
148         Allows the xapp to return to sender, possibly adjusting the
149         payload and message type before doing so.  This does NOT free
150         the sbuf for the caller as the caller may wish to perform
151         multiple rts per buffer. The client needs to free.
152
153         Parameters
154         ----------
155         sbuf: ctypes c_void_p
156              Pointer to an rmr message buffer
157         new_payload: bytes (optional)
158             New payload to set
159         new_mtype: int (optional)
160             New message type (replaces the received message)
161         retries: int (optional, default 100)
162             Number of times to retry at the application level
163
164         Returns
165         -------
166         bool
167             whether or not the send worked after retries attempts
168         """
169         for _ in range(retries):
170             sbuf = rmr.rmr_rts_msg(self._mrc, sbuf, payload=new_payload, mtype=new_mtype)
171             if sbuf.contents.state == 0:
172                 return True
173
174         self.logger.warning("RTS Failed! Summary: {}".format(rmr.message_summary(sbuf)))
175         return False
176
177     def rmr_free(self, sbuf):
178         """
179         Frees an rmr message buffer after use
180
181         Note: this does not need to be a class method, self is not
182         used. However if we break it out as a function we need a home
183         for it.
184
185         Parameters
186         ----------
187         sbuf: ctypes c_void_p
188              Pointer to an rmr message buffer
189         """
190         rmr.rmr_free_msg(sbuf)
191
192     # Convenience (pass-thru) function for invoking SDL.
193
194     def sdl_set(self, namespace, key, value, usemsgpack=True):
195         """
196         ** Deprecate Warning **
197         ** Will be removed in a future function **
198
199         Stores a key-value pair to SDL, optionally serializing the value
200         to bytes using msgpack.
201
202         Parameters
203         ----------
204         namespace: string
205             SDL namespace
206         key: string
207             SDL key
208         value:
209             Object or byte array to store.  See the `usemsgpack` parameter.
210         usemsgpack: boolean (optional, default is True)
211             Determines whether the value is serialized using msgpack before storing.
212             If usemsgpack is True, the msgpack function `packb` is invoked
213             on the value to yield a byte array that is then sent to SDL.
214             Stated differently, if usemsgpack is True, the value can be anything
215             that is serializable by msgpack.
216             If usemsgpack is False, the value must be bytes.
217         """
218         self.sdl.set(namespace, key, value, usemsgpack)
219
220     def sdl_get(self, namespace, key, usemsgpack=True):
221         """
222         ** Deprecate Warning **
223         ** Will be removed in a future function **
224
225         Gets the value for the specified namespace and key from SDL,
226         optionally deserializing stored bytes using msgpack.
227
228         Parameters
229         ----------
230         namespace: string
231             SDL namespace
232         key: string
233             SDL key
234         usemsgpack: boolean (optional, default is True)
235             If usemsgpack is True, the byte array stored by SDL is deserialized
236             using msgpack to yield the original object that was stored.
237             If usemsgpack is False, the byte array stored by SDL is returned
238             without further processing.
239
240         Returns
241         -------
242         Value
243             See the usemsgpack parameter for an explanation of the returned value type.
244             Answers None if the key is not found.
245         """
246         return self.sdl.get(namespace, key, usemsgpack)
247
248     def sdl_find_and_get(self, namespace, prefix, usemsgpack=True):
249         """
250         ** Deprecate Warning **
251         ** Will be removed in a future function **
252
253         Gets all key-value pairs in the specified namespace
254         with keys that start with the specified prefix,
255         optionally deserializing stored bytes using msgpack.
256
257         Parameters
258         ----------
259         nnamespaces: string
260            SDL namespace
261         prefix: string
262             the key prefix
263         usemsgpack: boolean (optional, default is True)
264             If usemsgpack is True, the byte array stored by SDL is deserialized
265             using msgpack to yield the original value that was stored.
266             If usemsgpack is False, the byte array stored by SDL is returned
267             without further processing.
268
269         Returns
270         -------
271         Dictionary of key-value pairs
272             Each key has the specified prefix.
273             The value object (its type) depends on the usemsgpack parameter,
274             but is either a Python object or raw bytes as discussed above.
275             Answers an empty dictionary if no keys matched the prefix.
276         """
277         return self.sdl.find_and_get(namespace, prefix, usemsgpack)
278
279     def sdl_delete(self, namespace, key):
280         """
281         ** Deprecate Warning **
282         ** Will be removed in a future function **
283
284         Deletes the key-value pair with the specified key in the specified namespace.
285
286         Parameters
287         ----------
288         namespace: string
289            SDL namespace
290         key: string
291             SDL key
292         """
293         self.sdl.delete(namespace, key)
294
295     # Health
296
297     def healthcheck(self):
298         """
299         this needs to be understood how this is supposed to work
300         """
301         return self._rmr_loop.healthcheck() and self.sdl.healthcheck()
302
303     # Convenience function for discovering config change events
304
305     def config_check(self, timeout=0):
306         """
307         Checks the watcher for configuration-file events. The watcher
308         prerequisites and event mask are documented in __init__().
309
310         Parameters
311         ----------
312         timeout: int (optional)
313             Number of seconds to wait for a configuration-file event, default 0.
314
315         Returns
316         -------
317         List of Events, possibly empty
318             An event is a tuple with objects wd, mask, cookie and name.
319             For example::
320
321                 Event(wd=1, mask=1073742080, cookie=0, name='foo')
322
323         """
324         if not self._inotify:
325             return []
326         events = self._inotify.read(timeout=timeout)
327         return list(events)
328
329     def stop(self):
330         """
331         cleans up and stops the xapp rmr thread (currently). This is
332         critical for unit testing as pytest will never return if the
333         thread is running.
334
335         TODO: can we register a ctrl-c handler so this gets called on
336         ctrl-c? Because currently two ctrl-c are needed to stop.
337         """
338         self._rmr_loop.stop()
339
340
341 # Public classes that Xapp writers should instantiate or subclass
342 # to implement an Xapp.
343
344
345 class RMRXapp(_BaseXapp):
346     """
347     Represents an Xapp that reacts only to RMR messages; i.e., the Xapp
348     only performs an action when a message is received.  Clients should
349     invoke the run method, which has a loop that waits for RMR messages
350     and calls the appropriate client-registered consume callback on each.
351
352     If environment variable CONFIG_FILE is defined, and that variable
353     contains a path to an existing file, this class polls a watcher
354     defined on that file to detect file-write events, and invokes a
355     configuration-change handler on each event. The handler is also
356     invoked at startup.  If no handler function is supplied to the
357     constructor, this class defines a default handler that only logs a
358     message.
359
360     Parameters
361     ----------
362     default_handler: function
363         A function with the signature (summary, sbuf) to be called when a
364         message type is received for which no other handler is registered.
365     default_handler argument summary: dict
366         The RMR message summary, a dict of key-value pairs
367     default_handler argument sbuf: ctypes c_void_p
368         Pointer to an RMR message buffer. The user must call free on this when done.
369     config_handler: function (optional, default is documented above)
370         A function with the signature (json) to be called at startup and each time
371         a configuration-file change event is detected. The JSON object is read from
372         the configuration file, if the prerequisites are met.
373     config_handler argument json: dict
374         The contents of the configuration file, parsed as JSON.
375     rmr_port: integer (optional, default is 4562)
376         Initialize RMR to listen on this port
377     rmr_wait_for_ready: boolean (optional, default is True)
378         Wait for RMR to signal ready before starting the dispatch loop
379     use_fake_sdl: boolean (optional, default is False)
380         Use an in-memory store instead of the real SDL service
381     post_init: function (optional, default None)
382         Run this function after the app initializes and before the dispatch loop starts;
383         its signature should be post_init(self)
384     """
385
386     def __init__(self, default_handler, config_handler=None, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
387         """
388         Also see _BaseXapp
389         """
390         # init base
391         super().__init__(
392             rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl, post_init=post_init
393         )
394
395         # setup callbacks
396         self._default_handler = default_handler
397         self._config_handler = config_handler
398         self._dispatch = {}
399
400         # used for thread control
401         self._keep_going = True
402
403         # register a default healthcheck handler
404         # this default checks that rmr is working and SDL is working
405         # the user can override this and register their own handler
406         # if they wish since the "last registered callback wins".
407         def handle_healthcheck(self, summary, sbuf):
408             healthy = self.healthcheck()
409             payload = b"OK\n" if healthy else b"ERROR [RMR or SDL is unhealthy]\n"
410             self.rmr_rts(sbuf, new_payload=payload, new_mtype=RIC_HEALTH_CHECK_RESP)
411             self.rmr_free(sbuf)
412
413         self.register_callback(handle_healthcheck, RIC_HEALTH_CHECK_REQ)
414
415         # define a default configuration-change handler if none was provided.
416         if not config_handler:
417             def handle_config_change(self, config):
418                 self.logger.debug("xapp_frame: default config handler invoked")
419             self._config_handler = handle_config_change
420
421         # call the config handler at startup if prereqs were met
422         if self._inotify:
423             with open(self._config_path) as json_file:
424                 data = json.load(json_file)
425             self.logger.debug("run: invoking config handler at start")
426             self._config_handler(self, data)
427
428     def register_callback(self, handler, message_type):
429         """
430         registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type
431
432         Parameters
433         ----------
434         handler: function
435             a function with the signature (summary, sbuf) to be called
436             when a message of type message_type is received
437         summary: dict
438             the rmr message summary
439         sbuf: ctypes c_void_p
440             Pointer to an rmr message buffer. The user must call free on this when done.
441
442         message:type: int
443             the message type to look for
444
445         Note if this method is called multiple times for a single message type, the "last one wins".
446         """
447         self._dispatch[message_type] = handler
448
449     def run(self, thread=False, rmr_timeout=5, inotify_timeout=0):
450         """
451         This function should be called when the reactive Xapp is ready to start.
452         After start, the Xapp's handlers will be called on received messages.
453
454         Parameters
455         ----------
456         thread: bool (optional, default is False)
457             If False, execution is not returned and the framework loops forever.
458             If True, a thread is started to run the queue read/dispatch loop
459             and execution is returned to caller; the thread can be stopped
460             by calling the .stop() method.
461
462         rmr_timeout: integer (optional, default is 5 seconds)
463             Length of time to wait for an RMR message to arrive.
464
465         inotify_timeout: integer (optional, default is 0 seconds)
466             Length of time to wait for an inotify event to arrive.
467         """
468
469         def loop():
470             while self._keep_going:
471
472                 # poll RMR
473                 try:
474                     (summary, sbuf) = self._rmr_loop.rcv_queue.get(block=True, timeout=rmr_timeout)
475                     # dispatch
476                     func = self._dispatch.get(summary[rmr.RMR_MS_MSG_TYPE], None)
477                     if not func:
478                         func = self._default_handler
479                     self.logger.debug("run: invoking msg handler on type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
480                     func(self, summary, sbuf)
481                 except queue.Empty:
482                     # the get timed out
483                     pass
484
485                 # poll configuration file watcher
486                 try:
487                     events = self.config_check(timeout=inotify_timeout)
488                     for event in events:
489                         with open(self._config_path) as json_file:
490                             data = json.load(json_file)
491                         self.logger.debug("run: invoking config handler on change event {}".format(event))
492                         self._config_handler(self, data)
493                 except Exception as error:
494                     self.logger.error("run: configuration handler failed: {}".format(error))
495
496         if thread:
497             Thread(target=loop).start()
498         else:
499             loop()
500
501     def stop(self):
502         """
503         Sets the flag to end the dispatch loop.
504         """
505         super().stop()
506         self.logger.debug("Setting flag to end framework work loop.")
507         self._keep_going = False
508
509
510 class Xapp(_BaseXapp):
511     """
512     Represents a generic Xapp where the client provides a single function
513     for the framework to call at startup time (instead of providing callback
514     functions by message type). The Xapp writer must implement and provide a
515     function with a loop-forever construct similar to the `run` function in
516     the `RMRXapp` class.  That function should poll to retrieve RMR messages
517     and dispatch them appropriately, poll for configuration changes, etc.
518
519     Parameters
520     ----------
521     entrypoint: function
522         This function is called when the Xapp class's run method is invoked.
523         The function signature must be just function(self)
524     rmr_port: integer (optional, default is 4562)
525         Initialize RMR to listen on this port
526     rmr_wait_for_ready: boolean (optional, default is True)
527         Wait for RMR to signal ready before starting the dispatch loop
528     use_fake_sdl: boolean (optional, default is False)
529         Use an in-memory store instead of the real SDL service
530     """
531
532     def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
533         """
534         Parameters
535         ----------
536
537         For the other parameters, see class _BaseXapp.
538         """
539         # init base
540         super().__init__(rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl)
541         self._entrypoint = entrypoint
542
543     def run(self):
544         """
545         This function should be called when the general Xapp is ready to start.
546         """
547         self._entrypoint(self)
548
549     # there is no need for stop currently here (base has, and nothing
550     # special to do here)