Revise Alarm manager to send via RMR wormhole
[ric-plt/xapp-frame-py.git] / ricxappframe / xapp_frame.py
1 # ==================================================================================
2 #       Copyright (c) 2020 Nokia
3 #       Copyright (c) 2020 AT&T Intellectual Property.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #          http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 # ==================================================================================
17 """
18 This framework for Python Xapps provides classes that Xapp writers
19 should instantiate and/or subclass depending on their needs.
20 """
21
22 import json
23 import os
24 import queue
25 from threading import Thread
26 import inotify_simple
27 from mdclogpy import Logger
28 from ricxappframe import xapp_rmr
29 from ricxappframe.rmr import rmr
30 from ricxappframe.xapp_sdl import SDLWrapper
31
32 # message-type constants
33 RIC_HEALTH_CHECK_REQ = 100
34 RIC_HEALTH_CHECK_RESP = 101
35
36 # environment variable with path to configuration file
37 CONFIG_FILE_ENV = "CONFIG_FILE"
38
39
40 class _BaseXapp:
41     """
42     This class initializes RMR, starts a thread that checks for incoming
43     messages, provisions an SDL object and optionally creates a
44     config-file watcher.  This private base class should not be
45     instantiated by clients directly, but it defines many public methods
46     that may be used by clients.
47
48     If environment variable CONFIG_FILE is defined, and that variable
49     contains a path to an existing file, a watcher is defined to monitor
50     modifications (writes) to that file using the Linux kernel's inotify
51     feature. The watcher must be polled by calling method
52     config_check().
53
54     Parameters
55     ----------
56     rmr_port: int (optional, default is 4562)
57         Port on which the RMR library listens for incoming messages.
58
59     rmr_wait_for_ready: bool (optional, default is True)
60         If this is True, then init waits until RMR is ready to send,
61         which includes having a valid routing file. This can be set
62         to False if the client wants to *receive only*.
63
64     use_fake_sdl: bool (optional, default is False)
65         if this is True, it uses the DBaaS "fake dict backend" instead
66         of Redis or other backends. Set this to True when developing
67         an xapp or during unit testing to eliminate the need for DBaaS.
68
69     post_init: function (optional, default is None)
70         Runs this user-provided function at the end of the init method;
71         its signature should be post_init(self)
72     """
73
74     def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
75         """
76         Documented in the class comment.
77         """
78         # PUBLIC, can be used by xapps using self.(name):
79         self.logger = Logger(name=__name__)
80
81         # Start rmr rcv thread
82         self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready)
83         self._mrc = self._rmr_loop.mrc  # for convenience
84
85         # SDL
86         self._sdl = SDLWrapper(use_fake_sdl)
87
88         # Config
89         # The environment variable specifies the path to the Xapp config file
90         self._config_path = os.environ.get(CONFIG_FILE_ENV, None)
91         if self._config_path and os.path.isfile(self._config_path):
92             self._inotify = inotify_simple.INotify()
93             self._inotify.add_watch(self._config_path, inotify_simple.flags.MODIFY)
94             self.logger.debug("__init__: watching config file {}".format(self._config_path))
95         else:
96             self._inotify = None
97             self.logger.warning("__init__: NOT watching any config file")
98
99         # run the optionally provided user post init
100         if post_init:
101             post_init(self)
102
103     # Public rmr methods
104
105     def rmr_get_messages(self):
106         """
107         Returns a generator iterable over all items in the queue that
108         have not yet been read by the client xapp. Each item is a tuple
109         (S, sbuf) where S is a message summary dict and sbuf is the raw
110         message. The caller MUST call rmr.rmr_free_msg(sbuf) when
111         finished with each sbuf to prevent memory leaks!
112         """
113         while not self._rmr_loop.rcv_queue.empty():
114             (summary, sbuf) = self._rmr_loop.rcv_queue.get()
115             yield (summary, sbuf)
116
117     def rmr_send(self, payload, mtype, retries=100):
118         """
119         Allocates a buffer, sets payload and mtype, and sends
120
121         Parameters
122         ----------
123         payload: bytes
124             payload to set
125         mtype: int
126             message type
127         retries: int (optional)
128             Number of times to retry at the application level before excepting RMRFailure
129
130         Returns
131         -------
132         bool
133             whether or not the send worked after retries attempts
134         """
135         sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True, mtype=mtype)
136
137         for _ in range(retries):
138             sbuf = rmr.rmr_send_msg(self._mrc, sbuf)
139             if sbuf.contents.state == 0:
140                 self.rmr_free(sbuf)
141                 return True
142
143         self.rmr_free(sbuf)
144         return False
145
146     def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
147         """
148         Allows the xapp to return to sender, possibly adjusting the
149         payload and message type before doing so.  This does NOT free
150         the sbuf for the caller as the caller may wish to perform
151         multiple rts per buffer. The client needs to free.
152
153         Parameters
154         ----------
155         sbuf: ctypes c_void_p
156              Pointer to an rmr message buffer
157         new_payload: bytes (optional)
158             New payload to set
159         new_mtype: int (optional)
160             New message type (replaces the received message)
161         retries: int (optional, default 100)
162             Number of times to retry at the application level
163
164         Returns
165         -------
166         bool
167             whether or not the send worked after retries attempts
168         """
169         for _ in range(retries):
170             sbuf = rmr.rmr_rts_msg(self._mrc, sbuf, payload=new_payload, mtype=new_mtype)
171             if sbuf.contents.state == 0:
172                 return True
173
174         self.logger.warning("RTS Failed! Summary: {}".format(rmr.message_summary(sbuf)))
175         return False
176
177     def rmr_free(self, sbuf):
178         """
179         Frees an rmr message buffer after use
180
181         Note: this does not need to be a class method, self is not
182         used. However if we break it out as a function we need a home
183         for it.
184
185         Parameters
186         ----------
187         sbuf: ctypes c_void_p
188              Pointer to an rmr message buffer
189         """
190         rmr.rmr_free_msg(sbuf)
191
192     # Convenience (pass-thru) function for invoking SDL.
193
194     def sdl_set(self, namespace, key, value, usemsgpack=True):
195         """
196         Stores a key-value pair to SDL, optionally serializing the value
197         to bytes using msgpack.
198
199         Parameters
200         ----------
201         namespace: string
202             SDL namespace
203         key: string
204             SDL key
205         value:
206             Object or byte array to store.  See the `usemsgpack` parameter.
207         usemsgpack: boolean (optional, default is True)
208             Determines whether the value is serialized using msgpack before storing.
209             If usemsgpack is True, the msgpack function `packb` is invoked
210             on the value to yield a byte array that is then sent to SDL.
211             Stated differently, if usemsgpack is True, the value can be anything
212             that is serializable by msgpack.
213             If usemsgpack is False, the value must be bytes.
214         """
215         self._sdl.set(namespace, key, value, usemsgpack)
216
217     def sdl_get(self, namespace, key, usemsgpack=True):
218         """
219         Gets the value for the specified namespace and key from SDL,
220         optionally deserializing stored bytes using msgpack.
221
222         Parameters
223         ----------
224         namespace: string
225             SDL namespace
226         key: string
227             SDL key
228         usemsgpack: boolean (optional, default is True)
229             If usemsgpack is True, the byte array stored by SDL is deserialized
230             using msgpack to yield the original object that was stored.
231             If usemsgpack is False, the byte array stored by SDL is returned
232             without further processing.
233
234         Returns
235         -------
236         Value
237             See the usemsgpack parameter for an explanation of the returned value type.
238             Answers None if the key is not found.
239         """
240         return self._sdl.get(namespace, key, usemsgpack)
241
242     def sdl_find_and_get(self, namespace, prefix, usemsgpack=True):
243         """
244         Gets all key-value pairs in the specified namespace
245         with keys that start with the specified prefix,
246         optionally deserializing stored bytes using msgpack.
247
248         Parameters
249         ----------
250         nnamespaces: string
251            SDL namespace
252         prefix: string
253             the key prefix
254         usemsgpack: boolean (optional, default is True)
255             If usemsgpack is True, the byte array stored by SDL is deserialized
256             using msgpack to yield the original value that was stored.
257             If usemsgpack is False, the byte array stored by SDL is returned
258             without further processing.
259
260         Returns
261         -------
262         Dictionary of key-value pairs
263             Each key has the specified prefix.
264             The value object (its type) depends on the usemsgpack parameter,
265             but is either a Python object or raw bytes as discussed above.
266             Answers an empty dictionary if no keys matched the prefix.
267         """
268         return self._sdl.find_and_get(namespace, prefix, usemsgpack)
269
270     def sdl_delete(self, namespace, key):
271         """
272         Deletes the key-value pair with the specified key in the specified namespace.
273
274         Parameters
275         ----------
276         namespace: string
277            SDL namespace
278         key: string
279             SDL key
280         """
281         self._sdl.delete(namespace, key)
282
283     # Health
284
285     def healthcheck(self):
286         """
287         this needs to be understood how this is supposed to work
288         """
289         return self._rmr_loop.healthcheck() and self._sdl.healthcheck()
290
291     # Convenience function for discovering config change events
292
293     def config_check(self, timeout=0):
294         """
295         Checks the watcher for configuration-file events. The watcher
296         prerequisites and event mask are documented in __init__().
297
298         Parameters
299         ----------
300         timeout: int (optional)
301             Number of seconds to wait for a configuration-file event, default 0.
302
303         Returns
304         -------
305         List of Events, possibly empty
306             An event is a tuple with objects wd, mask, cookie and name.
307             For example::
308
309                 Event(wd=1, mask=1073742080, cookie=0, name='foo')
310
311         """
312         if not self._inotify:
313             return []
314         events = self._inotify.read(timeout=timeout)
315         return list(events)
316
317     def stop(self):
318         """
319         cleans up and stops the xapp rmr thread (currently). This is
320         critical for unit testing as pytest will never return if the
321         thread is running.
322
323         TODO: can we register a ctrl-c handler so this gets called on
324         ctrl-c? Because currently two ctrl-c are needed to stop.
325         """
326         self._rmr_loop.stop()
327
328
329 # Public classes that Xapp writers should instantiate or subclass
330 # to implement an Xapp.
331
332
333 class RMRXapp(_BaseXapp):
334     """
335     Represents an Xapp that reacts only to RMR messages; i.e., the Xapp
336     only performs an action when a message is received.  Clients should
337     invoke the run method, which has a loop that waits for RMR messages
338     and calls the appropriate client-registered consume callback on each.
339
340     If environment variable CONFIG_FILE is defined, and that variable
341     contains a path to an existing file, this class polls a watcher
342     defined on that file to detect file-write events, and invokes a
343     configuration-change handler on each event. The handler is also
344     invoked at startup.  If no handler function is supplied to the
345     constructor, this class defines a default handler that only logs a
346     message.
347
348     Parameters
349     ----------
350     default_handler: function
351         A function with the signature (summary, sbuf) to be called when a
352         message type is received for which no other handler is registered.
353     default_handler argument summary: dict
354         The RMR message summary, a dict of key-value pairs
355     default_handler argument sbuf: ctypes c_void_p
356         Pointer to an RMR message buffer. The user must call free on this when done.
357     config_handler: function (optional, default is documented above)
358         A function with the signature (json) to be called at startup and each time
359         a configuration-file change event is detected. The JSON object is read from
360         the configuration file, if the prerequisites are met.
361     config_handler argument json: dict
362         The contents of the configuration file, parsed as JSON.
363     rmr_port: integer (optional, default is 4562)
364         Initialize RMR to listen on this port
365     rmr_wait_for_ready: boolean (optional, default is True)
366         Wait for RMR to signal ready before starting the dispatch loop
367     use_fake_sdl: boolean (optional, default is False)
368         Use an in-memory store instead of the real SDL service
369     post_init: function (optional, default None)
370         Run this function after the app initializes and before the dispatch loop starts;
371         its signature should be post_init(self)
372     """
373
374     def __init__(self, default_handler, config_handler=None, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
375         """
376         Also see _BaseXapp
377         """
378         # init base
379         super().__init__(
380             rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl, post_init=post_init
381         )
382
383         # setup callbacks
384         self._default_handler = default_handler
385         self._config_handler = config_handler
386         self._dispatch = {}
387
388         # used for thread control
389         self._keep_going = True
390
391         # register a default healthcheck handler
392         # this default checks that rmr is working and SDL is working
393         # the user can override this and register their own handler
394         # if they wish since the "last registered callback wins".
395         def handle_healthcheck(self, summary, sbuf):
396             healthy = self.healthcheck()
397             payload = b"OK\n" if healthy else b"ERROR [RMR or SDL is unhealthy]\n"
398             self.rmr_rts(sbuf, new_payload=payload, new_mtype=RIC_HEALTH_CHECK_RESP)
399             self.rmr_free(sbuf)
400
401         self.register_callback(handle_healthcheck, RIC_HEALTH_CHECK_REQ)
402
403         # define a default configuration-change handler if none was provided.
404         if not config_handler:
405             def handle_config_change(self, config):
406                 self.logger.debug("xapp_frame: default config handler invoked")
407             self._config_handler = handle_config_change
408
409         # call the config handler at startup if prereqs were met
410         if self._inotify:
411             with open(self._config_path) as json_file:
412                 data = json.load(json_file)
413             self.logger.debug("run: invoking config handler at start")
414             self._config_handler(self, data)
415
416     def register_callback(self, handler, message_type):
417         """
418         registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type
419
420         Parameters
421         ----------
422         handler: function
423             a function with the signature (summary, sbuf) to be called
424             when a message of type message_type is received
425         summary: dict
426             the rmr message summary
427         sbuf: ctypes c_void_p
428             Pointer to an rmr message buffer. The user must call free on this when done.
429
430         message:type: int
431             the message type to look for
432
433         Note if this method is called multiple times for a single message type, the "last one wins".
434         """
435         self._dispatch[message_type] = handler
436
437     def run(self, thread=False, rmr_timeout=5, inotify_timeout=0):
438         """
439         This function should be called when the reactive Xapp is ready to start.
440         After start, the Xapp's handlers will be called on received messages.
441
442         Parameters
443         ----------
444         thread: bool (optional, default is False)
445             If False, execution is not returned and the framework loops forever.
446             If True, a thread is started to run the queue read/dispatch loop
447             and execution is returned to caller; the thread can be stopped
448             by calling the .stop() method.
449
450         rmr_timeout: integer (optional, default is 5 seconds)
451             Length of time to wait for an RMR message to arrive.
452
453         inotify_timeout: integer (optional, default is 0 seconds)
454             Length of time to wait for an inotify event to arrive.
455         """
456
457         def loop():
458             while self._keep_going:
459
460                 # poll RMR
461                 try:
462                     (summary, sbuf) = self._rmr_loop.rcv_queue.get(block=True, timeout=rmr_timeout)
463                     # dispatch
464                     func = self._dispatch.get(summary[rmr.RMR_MS_MSG_TYPE], None)
465                     if not func:
466                         func = self._default_handler
467                     self.logger.debug("run: invoking msg handler on type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
468                     func(self, summary, sbuf)
469                 except queue.Empty:
470                     # the get timed out
471                     pass
472
473                 # poll configuration file watcher
474                 try:
475                     events = self.config_check(timeout=inotify_timeout)
476                     for event in events:
477                         with open(self._config_path) as json_file:
478                             data = json.load(json_file)
479                         self.logger.debug("run: invoking config handler on change event {}".format(event))
480                         self._config_handler(self, data)
481                 except Exception as error:
482                     self.logger.error("run: configuration handler failed: {}".format(error))
483
484         if thread:
485             Thread(target=loop).start()
486         else:
487             loop()
488
489     def stop(self):
490         """
491         Sets the flag to end the dispatch loop.
492         """
493         super().stop()
494         self.logger.debug("Setting flag to end framework work loop.")
495         self._keep_going = False
496
497
498 class Xapp(_BaseXapp):
499     """
500     Represents a generic Xapp where the client provides a single function
501     for the framework to call at startup time (instead of providing callback
502     functions by message type). The Xapp writer must implement and provide a
503     function with a loop-forever construct similar to the `run` function in
504     the `RMRXapp` class.  That function should poll to retrieve RMR messages
505     and dispatch them appropriately, poll for configuration changes, etc.
506
507     Parameters
508     ----------
509     entrypoint: function
510         This function is called when the Xapp class's run method is invoked.
511         The function signature must be just function(self)
512     rmr_port: integer (optional, default is 4562)
513         Initialize RMR to listen on this port
514     rmr_wait_for_ready: boolean (optional, default is True)
515         Wait for RMR to signal ready before starting the dispatch loop
516     use_fake_sdl: boolean (optional, default is False)
517         Use an in-memory store instead of the real SDL service
518     """
519
520     def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
521         """
522         Parameters
523         ----------
524
525         For the other parameters, see class _BaseXapp.
526         """
527         # init base
528         super().__init__(rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl)
529         self._entrypoint = entrypoint
530
531     def run(self):
532         """
533         This function should be called when the general Xapp is ready to start.
534         """
535         self._entrypoint(self)
536
537     # there is no need for stop currently here (base has, and nothing
538     # special to do here)