Merge "Add configuration-change API"
[ric-plt/xapp-frame-py.git] / ricxappframe / xapp_frame.py
1 # ==================================================================================
2 #       Copyright (c) 2020 Nokia
3 #       Copyright (c) 2020 AT&T Intellectual Property.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #          http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 # ==================================================================================
17 """
18 This framework for Python Xapps provides classes that Xapp writers should
19 instantiate and/or subclass depending on their needs.
20 """
21
22 import json
23 import os
24 import queue
25 from threading import Thread
26 import inotify_simple
27 from mdclogpy import Logger
28 from ricxappframe import xapp_rmr
29 from ricxappframe.rmr import rmr
30 from ricxappframe.xapp_sdl import SDLWrapper
31
32 # message-type constants
33 RIC_HEALTH_CHECK_REQ = 100
34 RIC_HEALTH_CHECK_RESP = 101
35
36 # environment variable with path to configuration file
37 CONFIG_FILE_ENV = "CONFIG_FILE"
38
39 # Private base class; not for direct client use
40
41
42 class _BaseXapp:
43     """
44     This base class initializes RMR by starting a thread that checks for
45     incoming messages, and provisions an SDL object.
46
47     If environment variable CONFIG_FILE_ENV is defined, and that value is a
48     path to an existing file, a watcher is defined to monitor modifications
49     (writes) to that file using the Linux kernel's inotify feature, and the
50     configuration-change handler function is invoked.  The watcher can be
51     polled by calling method config_check().
52
53     Parameters
54     ----------
55     rmr_port: int
56         port to listen on
57
58     rmr_wait_for_ready: bool (optional)
59         If this is True, then init waits until rmr is ready to send, which
60         includes having a valid routing file. This can be set to
61         False if the client only wants to *receive only*.
62
63     use_fake_sdl: bool (optional)
64         if this is True, it uses the dbaas "fake dict backend" instead
65         of Redis or other backends. Set this to true when developing
66         an xapp or during unit testing to eliminate the need for DBAAS.
67
68     post_init: function (optional)
69         Runs this user-provided function after the base xapp is
70         initialized; its signature should be post_init(self)
71     """
72
73     def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
74         """
75         Documented in the class comment.
76         """
77         # PUBLIC, can be used by xapps using self.(name):
78         self.logger = Logger(name=__name__)
79
80         # Start rmr rcv thread
81         self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready)
82         self._mrc = self._rmr_loop.mrc  # for convenience
83
84         # SDL
85         self._sdl = SDLWrapper(use_fake_sdl)
86
87         # Config
88         # The environment variable specifies the path to the Xapp config file
89         self._config_path = os.environ.get(CONFIG_FILE_ENV, None)
90         if self._config_path and os.path.isfile(self._config_path):
91             self._inotify = inotify_simple.INotify()
92             self._inotify.add_watch(self._config_path, inotify_simple.flags.MODIFY)
93             self.logger.debug("__init__: watching config file {}".format(self._config_path))
94         else:
95             self._inotify = None
96             self.logger.warning("__init__: NOT watching any config file")
97
98         # run the optionally provided user post init
99         if post_init:
100             post_init(self)
101
102     # Public rmr methods
103
104     def rmr_get_messages(self):
105         """
106         Returns a generator iterable over all items in the queue that
107         have not yet been read by the client xapp. Each item is a tuple
108         (S, sbuf) where S is a message summary dict and sbuf is the raw
109         message. The caller MUST call rmr.rmr_free_msg(sbuf) when
110         finished with each sbuf to prevent memory leaks!
111         """
112         while not self._rmr_loop.rcv_queue.empty():
113             (summary, sbuf) = self._rmr_loop.rcv_queue.get()
114             yield (summary, sbuf)
115
116     def rmr_send(self, payload, mtype, retries=100):
117         """
118         Allocates a buffer, sets payload and mtype, and sends
119
120         Parameters
121         ----------
122         payload: bytes
123             payload to set
124         mtype: int
125             message type
126         retries: int (optional)
127             Number of times to retry at the application level before excepting RMRFailure
128
129         Returns
130         -------
131         bool
132             whether or not the send worked after retries attempts
133         """
134         sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True, mtype=mtype)
135
136         for _ in range(retries):
137             sbuf = rmr.rmr_send_msg(self._mrc, sbuf)
138             if sbuf.contents.state == 0:
139                 self.rmr_free(sbuf)
140                 return True
141
142         self.rmr_free(sbuf)
143         return False
144
145     def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
146         """
147         Allows the xapp to return to sender, possibly adjusting the
148         payload and message type before doing so.  This does NOT free
149         the sbuf for the caller as the caller may wish to perform
150         multiple rts per buffer. The client needs to free.
151
152         Parameters
153         ----------
154         sbuf: ctypes c_void_p
155              Pointer to an rmr message buffer
156         new_payload: bytes (optional)
157             New payload to set
158         new_mtype: int (optional)
159             New message type (replaces the received message)
160         retries: int (optional)
161             Number of times to retry at the application level before
162             throwing exception RMRFailure
163
164         Returns
165         -------
166         bool
167             whether or not the send worked after retries attempts
168         """
169         for _ in range(retries):
170             sbuf = rmr.rmr_rts_msg(self._mrc, sbuf, payload=new_payload, mtype=new_mtype)
171             if sbuf.contents.state == 0:
172                 return True
173
174         self.logger.warning("RTS Failed! Summary: {}".format(rmr.message_summary(sbuf)))
175         return False
176
177     def rmr_free(self, sbuf):
178         """
179         Frees an rmr message buffer after use
180
181         Note: this does not need to be a class method, self is not
182         used. However if we break it out as a function we need a home
183         for it.
184
185         Parameters
186         ----------
187         sbuf: ctypes c_void_p
188              Pointer to an rmr message buffer
189         """
190         rmr.rmr_free_msg(sbuf)
191
192     # Convenience (pass-thru) function for invoking SDL.
193
194     def sdl_set(self, ns, key, value, usemsgpack=True):
195         """
196         Stores a key-value pair to SDL, optionally serializing the value
197         to bytes using msgpack.
198
199         Parameters
200         ----------
201         ns: string
202             SDL namespace
203         key: string
204             SDL key
205         value:
206             Object or byte array to store.  See the `usemsgpack` parameter.
207         usemsgpack: boolean (optional, default is True)
208             Determines whether the value is serialized using msgpack before storing.
209             If usemsgpack is True, the msgpack function `packb` is invoked
210             on the value to yield a byte array that is then sent to SDL.
211             Stated differently, if usemsgpack is True, the value can be anything
212             that is serializable by msgpack.
213             If usemsgpack is False, the value must be bytes.
214         """
215         self._sdl.set(ns, key, value, usemsgpack)
216
217     def sdl_get(self, ns, key, usemsgpack=True):
218         """
219         Gets the value for the specified namespace and key from SDL,
220         optionally deserializing stored bytes using msgpack.
221
222         Parameters
223         ----------
224         ns: string
225             SDL namespace
226         key: string
227             SDL key
228         usemsgpack: boolean (optional, default is True)
229             If usemsgpack is True, the byte array stored by SDL is deserialized
230             using msgpack to yield the original object that was stored.
231             If usemsgpack is False, the byte array stored by SDL is returned
232             without further processing.
233
234         Returns
235         -------
236         Value
237             See the usemsgpack parameter for an explanation of the returned value type.
238             Answers None if the key is not found.
239         """
240         return self._sdl.get(ns, key, usemsgpack)
241
242     def sdl_find_and_get(self, ns, prefix, usemsgpack=True):
243         """
244         Gets all key-value pairs in the specified namespace
245         with keys that start with the specified prefix,
246         optionally deserializing stored bytes using msgpack.
247
248         Parameters
249         ----------
250         ns: string
251            SDL namespace
252         prefix: string
253             the key prefix
254         usemsgpack: boolean (optional, default is True)
255             If usemsgpack is True, the byte array stored by SDL is deserialized
256             using msgpack to yield the original value that was stored.
257             If usemsgpack is False, the byte array stored by SDL is returned
258             without further processing.
259
260         Returns
261         -------
262         Dictionary of key-value pairs
263             Each key has the specified prefix.
264             The value object (its type) depends on the usemsgpack parameter,
265             but is either a Python object or raw bytes as discussed above.
266             Answers an empty dictionary if no keys matched the prefix.
267         """
268         return self._sdl.find_and_get(ns, prefix, usemsgpack)
269
270     def sdl_delete(self, ns, key):
271         """
272         Deletes the key-value pair with the specified key in the specified namespace.
273
274         Parameters
275         ----------
276         ns: string
277            SDL namespace
278         key: string
279             SDL key
280         """
281         self._sdl.delete(ns, key)
282
283     # Health
284
285     def healthcheck(self):
286         """
287         this needs to be understood how this is supposed to work
288         """
289         return self._rmr_loop.healthcheck() and self._sdl.healthcheck()
290
291     # Convenience function for discovering config change events
292
293     def config_check(self, timeout=0):
294         """
295         Checks the watcher for configuration-file events. The watcher
296         prerequisites and event mask are documented in __init__().
297
298         Parameters
299         ----------
300         timeout: int (optional)
301             Number of seconds to wait for a configuration-file event, default 0.
302
303         Returns
304         -------
305         List of Events, possibly empty
306             An event is a tuple with objects wd, mask, cookie and name.
307             For example::
308
309                 Event(wd=1, mask=1073742080, cookie=0, name='foo')
310
311         """
312         if not self._inotify:
313             return []
314         events = self._inotify.read(timeout=timeout)
315         return list(events)
316
317     def stop(self):
318         """
319         cleans up and stops the xapp rmr thread (currently). This is
320         critical for unit testing as pytest will never return if the
321         thread is running.
322
323         TODO: can we register a ctrl-c handler so this gets called on
324         ctrl-c? Because currently two ctrl-c are needed to stop.
325         """
326         self._rmr_loop.stop()
327
328
329 # Public classes that Xapp writers should instantiate or subclass
330 # to implement an Xapp.
331
332
333 class RMRXapp(_BaseXapp):
334     """
335     Represents an Xapp that reacts only to RMR messages; i.e., when
336     messages are received, the Xapp does something. When run is called,
337     the xapp framework waits for RMR messages, and calls the appropriate
338     client-registered consume callback on each.
339
340     If environment variable CONFIG_FILE_ENV is defined, and that value is a
341     path to an existing file, the configuration-change handler is invoked at
342     startup and on each configuration-file write event. If no handler is
343     supplied, this class defines a default handler that logs each invocation.
344
345     Parameters
346     ----------
347     default_handler: function
348         A function with the signature (summary, sbuf) to be called
349         when a message type is received for which no other handler is registered.
350     default_handler argument summary: dict
351         The RMR message summary, a dict of key-value pairs
352     default_handler argument sbuf: ctypes c_void_p
353         Pointer to an RMR message buffer. The user must call free on this when done.
354     config_handler: function (optional, default is documented above)
355         A function with the signature (json) to be called at startup and each time
356         a configuration-file change event is detected. The JSON object is read from
357         the configuration file, if the prerequisites are met.
358     config_handler argument json: dict
359         The contents of the configuration file, parsed as JSON.
360     rmr_port: integer (optional, default is 4562)
361         Initialize RMR to listen on this port
362     rmr_wait_for_ready: boolean (optional, default is True)
363         Wait for RMR to signal ready before starting the dispatch loop
364     use_fake_sdl: boolean (optional, default is False)
365         Use an in-memory store instead of the real SDL service
366     post_init: function (optional, default None)
367         Run this function after the app initializes and before the dispatch loop starts;
368         its signature should be post_init(self)
369     """
370
371     def __init__(self, default_handler, config_handler=None, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
372         """
373         Also see _BaseXapp
374         """
375         # init base
376         super().__init__(
377             rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl, post_init=post_init
378         )
379
380         # setup callbacks
381         self._default_handler = default_handler
382         self._config_handler = config_handler
383         self._dispatch = {}
384
385         # used for thread control
386         self._keep_going = True
387
388         # register a default healthcheck handler
389         # this default checks that rmr is working and SDL is working
390         # the user can override this and register their own handler
391         # if they wish since the "last registered callback wins".
392         def handle_healthcheck(self, summary, sbuf):
393             ok = self.healthcheck()
394             payload = b"OK\n" if ok else b"ERROR [RMR or SDL is unhealthy]\n"
395             self.rmr_rts(sbuf, new_payload=payload, new_mtype=RIC_HEALTH_CHECK_RESP)
396             self.rmr_free(sbuf)
397
398         self.register_callback(handle_healthcheck, RIC_HEALTH_CHECK_REQ)
399
400         # define a default configuration-change handler if none was provided.
401         if not config_handler:
402             def handle_config_change(self, config):
403                 self.logger.debug("xapp_frame: default config handler invoked")
404             self._config_handler = handle_config_change
405
406         # call the config handler at startup if prereqs were met
407         if self._inotify:
408             with open(self._config_path) as json_file:
409                 data = json.load(json_file)
410             self.logger.debug("run: invoking config handler at start")
411             self._config_handler(self, data)
412
413     def register_callback(self, handler, message_type):
414         """
415         registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type
416
417         Parameters
418         ----------
419         handler: function
420             a function with the signature (summary, sbuf) to be called
421             when a message of type message_type is received
422         summary: dict
423             the rmr message summary
424         sbuf: ctypes c_void_p
425             Pointer to an rmr message buffer. The user must call free on this when done.
426
427         message:type: int
428             the message type to look for
429
430         Note if this method is called multiple times for a single message type, the "last one wins".
431         """
432         self._dispatch[message_type] = handler
433
434     def run(self, thread=False, rmr_timeout=5, inotify_timeout=0):
435         """
436         This function should be called when the reactive Xapp is ready to start.
437         After start, the Xapp's handlers will be called on received messages.
438
439         Parameters
440         ----------
441         thread: bool (optional, default is False)
442             If False, execution is not returned and the framework loops forever.
443             If True, a thread is started to run the queue read/dispatch loop
444             and execution is returned to caller; the thread can be stopped
445             by calling the .stop() method.
446
447         rmr_timeout: integer (optional, default is 5 seconds)
448             Length of time to wait for an RMR message to arrive.
449
450         inotify_timeout: integer (optional, default is 0 seconds)
451             Length of time to wait for an inotify event to arrive.
452         """
453
454         def loop():
455             while self._keep_going:
456
457                 # poll RMR
458                 try:
459                     (summary, sbuf) = self._rmr_loop.rcv_queue.get(block=True, timeout=rmr_timeout)
460                     # dispatch
461                     func = self._dispatch.get(summary[rmr.RMR_MS_MSG_TYPE], None)
462                     if not func:
463                         func = self._default_handler
464                     self.logger.debug("run: invoking msg handler on type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
465                     func(self, summary, sbuf)
466                 except queue.Empty:
467                     # the get timed out
468                     pass
469
470                 # poll configuration file watcher
471                 try:
472                     events = self.config_check(timeout=inotify_timeout)
473                     for event in events:
474                         with open(self._config_path) as json_file:
475                             data = json.load(json_file)
476                         self.logger.debug("run: invoking config handler on change event {}".format(event))
477                         self._config_handler(self, data)
478                 except Exception as error:
479                     self.logger.error("run: configuration handler failed: {}".format(error))
480
481         if thread:
482             Thread(target=loop).start()
483         else:
484             loop()
485
486     def stop(self):
487         """
488         Sets the flag to end the dispatch loop.
489         """
490         super().stop()
491         self.logger.debug("Setting flag to end framework work loop.")
492         self._keep_going = False
493
494
495 class Xapp(_BaseXapp):
496     """
497     Represents a generic Xapp where the client provides a single function
498     for the framework to call at startup time (instead of providing callback
499     functions by message type). The Xapp writer must implement and provide a
500     function with a loop-forever construct similar to the `run` function in
501     the `RMRXapp` class.  That function should poll to retrieve RMR messages
502     and dispatch them appropriately, poll for configuration changes, etc.
503
504     Parameters
505     ----------
506     entrypoint: function
507         This function is called when the Xapp class's run method is invoked.
508         The function signature must be just function(self)
509     rmr_port: integer (optional, default is 4562)
510         Initialize RMR to listen on this port
511     rmr_wait_for_ready: boolean (optional, default is True)
512         Wait for RMR to signal ready before starting the dispatch loop
513     use_fake_sdl: boolean (optional, default is False)
514         Use an in-memory store instead of the real SDL service
515     """
516
517     def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
518         """
519         Parameters
520         ----------
521
522         For the other parameters, see class _BaseXapp.
523         """
524         # init base
525         super().__init__(rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl)
526         self._entrypoint = entrypoint
527
528     def run(self):
529         """
530         This function should be called when the general Xapp is ready to start.
531         """
532         self._entrypoint(self)
533
534     # there is no need for stop currently here (base has, and nothing
535     # special to do here)