0d37f288c8fd3eb22299c91fb222470490749b43
[ric-plt/xapp-frame-py.git] / ricxappframe / xapp_frame.py
1 # ==================================================================================
2 #       Copyright (c) 2020 Nokia
3 #       Copyright (c) 2020 AT&T Intellectual Property.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #          http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 # ==================================================================================
17 """
18 This framework for Python Xapps provides classes that Xapp writers
19 should instantiate and/or subclass depending on their needs.
20 """
21
22 import json
23 import os
24 import queue
25 from threading import Thread
26 import inotify_simple
27 from mdclogpy import Logger
28 from ricxappframe import xapp_rmr
29 from ricxappframe.rmr import rmr
30 from ricxappframe.xapp_sdl import SDLWrapper
31
32 # message-type constants
33 RIC_HEALTH_CHECK_REQ = 100
34 RIC_HEALTH_CHECK_RESP = 101
35
36 # environment variable with path to configuration file
37 CONFIG_FILE_ENV = "CONFIG_FILE"
38
39
40 class _BaseXapp:
41     """
42     This class initializes RMR, starts a thread that checks for incoming
43     messages, provisions an SDL object and optionally creates a
44     config-file watcher.  This private base class should not be
45     instantiated by clients directly, but it defines many public methods
46     that may be used by clients.
47
48     If environment variable CONFIG_FILE is defined, and that variable
49     contains a path to an existing file, a watcher is defined to monitor
50     modifications (writes) to that file using the Linux kernel's inotify
51     feature. The watcher must be polled by calling method
52     config_check().
53
54     Parameters
55     ----------
56     rmr_port: int (optional, default is 4562)
57         Port on which the RMR library listens for incoming messages.
58
59     rmr_wait_for_ready: bool (optional, default is True)
60         If this is True, then init waits until RMR is ready to send,
61         which includes having a valid routing file. This can be set
62         to False if the client wants to *receive only*.
63
64     use_fake_sdl: bool (optional, default is False)
65         if this is True, it uses the DBaaS "fake dict backend" instead
66         of Redis or other backends. Set this to True when developing
67         an xapp or during unit testing to eliminate the need for DBaaS.
68
69     post_init: function (optional, default is None)
70         Runs this user-provided function at the end of the init method;
71         its signature should be post_init(self)
72     """
73
74     def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
75         """
76         Documented in the class comment.
77         """
78         # PUBLIC, can be used by xapps using self.(name):
79         self.logger = Logger(name=__name__)
80
81         # Start rmr rcv thread
82         self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready)
83         self._mrc = self._rmr_loop.mrc  # for convenience
84
85         # SDL
86         self._sdl = SDLWrapper(use_fake_sdl)
87
88         # Config
89         # The environment variable specifies the path to the Xapp config file
90         self._config_path = os.environ.get(CONFIG_FILE_ENV, None)
91         if self._config_path and os.path.isfile(self._config_path):
92             self._inotify = inotify_simple.INotify()
93             self._inotify.add_watch(self._config_path, inotify_simple.flags.MODIFY)
94             self.logger.debug("__init__: watching config file {}".format(self._config_path))
95         else:
96             self._inotify = None
97             self.logger.warning("__init__: NOT watching any config file")
98
99         # run the optionally provided user post init
100         if post_init:
101             post_init(self)
102
103     # Public rmr methods
104
105     def rmr_get_messages(self):
106         """
107         Returns a generator iterable over all items in the queue that
108         have not yet been read by the client xapp. Each item is a tuple
109         (S, sbuf) where S is a message summary dict and sbuf is the raw
110         message. The caller MUST call rmr.rmr_free_msg(sbuf) when
111         finished with each sbuf to prevent memory leaks!
112         """
113         while not self._rmr_loop.rcv_queue.empty():
114             (summary, sbuf) = self._rmr_loop.rcv_queue.get()
115             yield (summary, sbuf)
116
117     def rmr_send(self, payload, mtype, retries=100):
118         """
119         Allocates a buffer, sets payload and mtype, and sends
120
121         Parameters
122         ----------
123         payload: bytes
124             payload to set
125         mtype: int
126             message type
127         retries: int (optional)
128             Number of times to retry at the application level before excepting RMRFailure
129
130         Returns
131         -------
132         bool
133             whether or not the send worked after retries attempts
134         """
135         sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True, mtype=mtype)
136
137         for _ in range(retries):
138             sbuf = rmr.rmr_send_msg(self._mrc, sbuf)
139             if sbuf.contents.state == 0:
140                 self.rmr_free(sbuf)
141                 return True
142
143         self.rmr_free(sbuf)
144         return False
145
146     def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
147         """
148         Allows the xapp to return to sender, possibly adjusting the
149         payload and message type before doing so.  This does NOT free
150         the sbuf for the caller as the caller may wish to perform
151         multiple rts per buffer. The client needs to free.
152
153         Parameters
154         ----------
155         sbuf: ctypes c_void_p
156              Pointer to an rmr message buffer
157         new_payload: bytes (optional)
158             New payload to set
159         new_mtype: int (optional)
160             New message type (replaces the received message)
161         retries: int (optional)
162             Number of times to retry at the application level before
163             throwing exception RMRFailure
164
165         Returns
166         -------
167         bool
168             whether or not the send worked after retries attempts
169         """
170         for _ in range(retries):
171             sbuf = rmr.rmr_rts_msg(self._mrc, sbuf, payload=new_payload, mtype=new_mtype)
172             if sbuf.contents.state == 0:
173                 return True
174
175         self.logger.warning("RTS Failed! Summary: {}".format(rmr.message_summary(sbuf)))
176         return False
177
178     def rmr_free(self, sbuf):
179         """
180         Frees an rmr message buffer after use
181
182         Note: this does not need to be a class method, self is not
183         used. However if we break it out as a function we need a home
184         for it.
185
186         Parameters
187         ----------
188         sbuf: ctypes c_void_p
189              Pointer to an rmr message buffer
190         """
191         rmr.rmr_free_msg(sbuf)
192
193     # Convenience (pass-thru) function for invoking SDL.
194
195     def sdl_set(self, namespace, key, value, usemsgpack=True):
196         """
197         Stores a key-value pair to SDL, optionally serializing the value
198         to bytes using msgpack.
199
200         Parameters
201         ----------
202         namespace: string
203             SDL namespace
204         key: string
205             SDL key
206         value:
207             Object or byte array to store.  See the `usemsgpack` parameter.
208         usemsgpack: boolean (optional, default is True)
209             Determines whether the value is serialized using msgpack before storing.
210             If usemsgpack is True, the msgpack function `packb` is invoked
211             on the value to yield a byte array that is then sent to SDL.
212             Stated differently, if usemsgpack is True, the value can be anything
213             that is serializable by msgpack.
214             If usemsgpack is False, the value must be bytes.
215         """
216         self._sdl.set(namespace, key, value, usemsgpack)
217
218     def sdl_get(self, namespace, key, usemsgpack=True):
219         """
220         Gets the value for the specified namespace and key from SDL,
221         optionally deserializing stored bytes using msgpack.
222
223         Parameters
224         ----------
225         namespace: string
226             SDL namespace
227         key: string
228             SDL key
229         usemsgpack: boolean (optional, default is True)
230             If usemsgpack is True, the byte array stored by SDL is deserialized
231             using msgpack to yield the original object that was stored.
232             If usemsgpack is False, the byte array stored by SDL is returned
233             without further processing.
234
235         Returns
236         -------
237         Value
238             See the usemsgpack parameter for an explanation of the returned value type.
239             Answers None if the key is not found.
240         """
241         return self._sdl.get(namespace, key, usemsgpack)
242
243     def sdl_find_and_get(self, namespace, prefix, usemsgpack=True):
244         """
245         Gets all key-value pairs in the specified namespace
246         with keys that start with the specified prefix,
247         optionally deserializing stored bytes using msgpack.
248
249         Parameters
250         ----------
251         nnamespaces: string
252            SDL namespace
253         prefix: string
254             the key prefix
255         usemsgpack: boolean (optional, default is True)
256             If usemsgpack is True, the byte array stored by SDL is deserialized
257             using msgpack to yield the original value that was stored.
258             If usemsgpack is False, the byte array stored by SDL is returned
259             without further processing.
260
261         Returns
262         -------
263         Dictionary of key-value pairs
264             Each key has the specified prefix.
265             The value object (its type) depends on the usemsgpack parameter,
266             but is either a Python object or raw bytes as discussed above.
267             Answers an empty dictionary if no keys matched the prefix.
268         """
269         return self._sdl.find_and_get(namespace, prefix, usemsgpack)
270
271     def sdl_delete(self, namespace, key):
272         """
273         Deletes the key-value pair with the specified key in the specified namespace.
274
275         Parameters
276         ----------
277         namespace: string
278            SDL namespace
279         key: string
280             SDL key
281         """
282         self._sdl.delete(namespace, key)
283
284     # Health
285
286     def healthcheck(self):
287         """
288         this needs to be understood how this is supposed to work
289         """
290         return self._rmr_loop.healthcheck() and self._sdl.healthcheck()
291
292     # Convenience function for discovering config change events
293
294     def config_check(self, timeout=0):
295         """
296         Checks the watcher for configuration-file events. The watcher
297         prerequisites and event mask are documented in __init__().
298
299         Parameters
300         ----------
301         timeout: int (optional)
302             Number of seconds to wait for a configuration-file event, default 0.
303
304         Returns
305         -------
306         List of Events, possibly empty
307             An event is a tuple with objects wd, mask, cookie and name.
308             For example::
309
310                 Event(wd=1, mask=1073742080, cookie=0, name='foo')
311
312         """
313         if not self._inotify:
314             return []
315         events = self._inotify.read(timeout=timeout)
316         return list(events)
317
318     def stop(self):
319         """
320         cleans up and stops the xapp rmr thread (currently). This is
321         critical for unit testing as pytest will never return if the
322         thread is running.
323
324         TODO: can we register a ctrl-c handler so this gets called on
325         ctrl-c? Because currently two ctrl-c are needed to stop.
326         """
327         self._rmr_loop.stop()
328
329
330 # Public classes that Xapp writers should instantiate or subclass
331 # to implement an Xapp.
332
333
334 class RMRXapp(_BaseXapp):
335     """
336     Represents an Xapp that reacts only to RMR messages; i.e., the Xapp
337     only performs an action when a message is received.  Clients should
338     invoke the run method, which has a loop that waits for RMR messages
339     and calls the appropriate client-registered consume callback on each.
340
341     If environment variable CONFIG_FILE is defined, and that variable
342     contains a path to an existing file, this class polls a watcher
343     defined on that file to detect file-write events, and invokes a
344     configuration-change handler on each event. The handler is also
345     invoked at startup.  If no handler function is supplied to the
346     constructor, this class defines a default handler that only logs a
347     message.
348
349     Parameters
350     ----------
351     default_handler: function
352         A function with the signature (summary, sbuf) to be called when a
353         message type is received for which no other handler is registered.
354     default_handler argument summary: dict
355         The RMR message summary, a dict of key-value pairs
356     default_handler argument sbuf: ctypes c_void_p
357         Pointer to an RMR message buffer. The user must call free on this when done.
358     config_handler: function (optional, default is documented above)
359         A function with the signature (json) to be called at startup and each time
360         a configuration-file change event is detected. The JSON object is read from
361         the configuration file, if the prerequisites are met.
362     config_handler argument json: dict
363         The contents of the configuration file, parsed as JSON.
364     rmr_port: integer (optional, default is 4562)
365         Initialize RMR to listen on this port
366     rmr_wait_for_ready: boolean (optional, default is True)
367         Wait for RMR to signal ready before starting the dispatch loop
368     use_fake_sdl: boolean (optional, default is False)
369         Use an in-memory store instead of the real SDL service
370     post_init: function (optional, default None)
371         Run this function after the app initializes and before the dispatch loop starts;
372         its signature should be post_init(self)
373     """
374
375     def __init__(self, default_handler, config_handler=None, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
376         """
377         Also see _BaseXapp
378         """
379         # init base
380         super().__init__(
381             rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl, post_init=post_init
382         )
383
384         # setup callbacks
385         self._default_handler = default_handler
386         self._config_handler = config_handler
387         self._dispatch = {}
388
389         # used for thread control
390         self._keep_going = True
391
392         # register a default healthcheck handler
393         # this default checks that rmr is working and SDL is working
394         # the user can override this and register their own handler
395         # if they wish since the "last registered callback wins".
396         def handle_healthcheck(self, summary, sbuf):
397             healthy = self.healthcheck()
398             payload = b"OK\n" if healthy else b"ERROR [RMR or SDL is unhealthy]\n"
399             self.rmr_rts(sbuf, new_payload=payload, new_mtype=RIC_HEALTH_CHECK_RESP)
400             self.rmr_free(sbuf)
401
402         self.register_callback(handle_healthcheck, RIC_HEALTH_CHECK_REQ)
403
404         # define a default configuration-change handler if none was provided.
405         if not config_handler:
406             def handle_config_change(self, config):
407                 self.logger.debug("xapp_frame: default config handler invoked")
408             self._config_handler = handle_config_change
409
410         # call the config handler at startup if prereqs were met
411         if self._inotify:
412             with open(self._config_path) as json_file:
413                 data = json.load(json_file)
414             self.logger.debug("run: invoking config handler at start")
415             self._config_handler(self, data)
416
417     def register_callback(self, handler, message_type):
418         """
419         registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type
420
421         Parameters
422         ----------
423         handler: function
424             a function with the signature (summary, sbuf) to be called
425             when a message of type message_type is received
426         summary: dict
427             the rmr message summary
428         sbuf: ctypes c_void_p
429             Pointer to an rmr message buffer. The user must call free on this when done.
430
431         message:type: int
432             the message type to look for
433
434         Note if this method is called multiple times for a single message type, the "last one wins".
435         """
436         self._dispatch[message_type] = handler
437
438     def run(self, thread=False, rmr_timeout=5, inotify_timeout=0):
439         """
440         This function should be called when the reactive Xapp is ready to start.
441         After start, the Xapp's handlers will be called on received messages.
442
443         Parameters
444         ----------
445         thread: bool (optional, default is False)
446             If False, execution is not returned and the framework loops forever.
447             If True, a thread is started to run the queue read/dispatch loop
448             and execution is returned to caller; the thread can be stopped
449             by calling the .stop() method.
450
451         rmr_timeout: integer (optional, default is 5 seconds)
452             Length of time to wait for an RMR message to arrive.
453
454         inotify_timeout: integer (optional, default is 0 seconds)
455             Length of time to wait for an inotify event to arrive.
456         """
457
458         def loop():
459             while self._keep_going:
460
461                 # poll RMR
462                 try:
463                     (summary, sbuf) = self._rmr_loop.rcv_queue.get(block=True, timeout=rmr_timeout)
464                     # dispatch
465                     func = self._dispatch.get(summary[rmr.RMR_MS_MSG_TYPE], None)
466                     if not func:
467                         func = self._default_handler
468                     self.logger.debug("run: invoking msg handler on type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
469                     func(self, summary, sbuf)
470                 except queue.Empty:
471                     # the get timed out
472                     pass
473
474                 # poll configuration file watcher
475                 try:
476                     events = self.config_check(timeout=inotify_timeout)
477                     for event in events:
478                         with open(self._config_path) as json_file:
479                             data = json.load(json_file)
480                         self.logger.debug("run: invoking config handler on change event {}".format(event))
481                         self._config_handler(self, data)
482                 except Exception as error:
483                     self.logger.error("run: configuration handler failed: {}".format(error))
484
485         if thread:
486             Thread(target=loop).start()
487         else:
488             loop()
489
490     def stop(self):
491         """
492         Sets the flag to end the dispatch loop.
493         """
494         super().stop()
495         self.logger.debug("Setting flag to end framework work loop.")
496         self._keep_going = False
497
498
499 class Xapp(_BaseXapp):
500     """
501     Represents a generic Xapp where the client provides a single function
502     for the framework to call at startup time (instead of providing callback
503     functions by message type). The Xapp writer must implement and provide a
504     function with a loop-forever construct similar to the `run` function in
505     the `RMRXapp` class.  That function should poll to retrieve RMR messages
506     and dispatch them appropriately, poll for configuration changes, etc.
507
508     Parameters
509     ----------
510     entrypoint: function
511         This function is called when the Xapp class's run method is invoked.
512         The function signature must be just function(self)
513     rmr_port: integer (optional, default is 4562)
514         Initialize RMR to listen on this port
515     rmr_wait_for_ready: boolean (optional, default is True)
516         Wait for RMR to signal ready before starting the dispatch loop
517     use_fake_sdl: boolean (optional, default is False)
518         Use an in-memory store instead of the real SDL service
519     """
520
521     def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
522         """
523         Parameters
524         ----------
525
526         For the other parameters, see class _BaseXapp.
527         """
528         # init base
529         super().__init__(rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl)
530         self._entrypoint = entrypoint
531
532     def run(self):
533         """
534         This function should be called when the general Xapp is ready to start.
535         """
536         self._entrypoint(self)
537
538     # there is no need for stop currently here (base has, and nothing
539     # special to do here)