1 # ==================================================================================
2 # Copyright (c) 2020 Nokia
3 # Copyright (c) 2020 AT&T Intellectual Property.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ==================================================================================
18 This framework for Python Xapps provides classes that Xapp writers
19 should instantiate and/or subclass depending on their needs.
25 from threading import Thread
27 from mdclogpy import Logger
28 from ricxappframe import xapp_rmr
29 from ricxappframe.rmr import rmr
30 from ricxappframe.xapp_sdl import SDLWrapper
32 # message-type constants
33 RIC_HEALTH_CHECK_REQ = 100
34 RIC_HEALTH_CHECK_RESP = 101
36 # environment variable with path to configuration file
37 CONFIG_FILE_ENV = "CONFIG_FILE"
42 This class initializes RMR, starts a thread that checks for incoming
43 messages, provisions an SDL object and optionally creates a
44 config-file watcher. This private base class should not be
45 instantiated by clients directly, but it defines many public methods
46 that may be used by clients.
48 If environment variable CONFIG_FILE is defined, and that variable
49 contains a path to an existing file, a watcher is defined to monitor
50 modifications (writes) to that file using the Linux kernel's inotify
51 feature. The watcher must be polled by calling method
56 rmr_port: int (optional, default is 4562)
57 Port on which the RMR library listens for incoming messages.
59 rmr_wait_for_ready: bool (optional, default is True)
60 If this is True, then init waits until RMR is ready to send,
61 which includes having a valid routing file. This can be set
62 to False if the client wants to *receive only*.
64 use_fake_sdl: bool (optional, default is False)
65 if this is True, it uses the DBaaS "fake dict backend" instead
66 of Redis or other backends. Set this to True when developing
67 an xapp or during unit testing to eliminate the need for DBaaS.
69 post_init: function (optional, default is None)
70 Runs this user-provided function at the end of the init method;
71 its signature should be post_init(self)
74 def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
76 Documented in the class comment.
78 # PUBLIC, can be used by xapps using self.(name):
79 self.logger = Logger(name=__name__)
81 # Start rmr rcv thread
82 self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready)
83 self._mrc = self._rmr_loop.mrc # for convenience
86 self._sdl = SDLWrapper(use_fake_sdl)
89 # The environment variable specifies the path to the Xapp config file
90 self._config_path = os.environ.get(CONFIG_FILE_ENV, None)
91 if self._config_path and os.path.isfile(self._config_path):
92 self._inotify = inotify_simple.INotify()
93 self._inotify.add_watch(self._config_path, inotify_simple.flags.MODIFY)
94 self.logger.debug("__init__: watching config file {}".format(self._config_path))
97 self.logger.warning("__init__: NOT watching any config file")
99 # run the optionally provided user post init
105 def rmr_get_messages(self):
107 Returns a generator iterable over all items in the queue that
108 have not yet been read by the client xapp. Each item is a tuple
109 (S, sbuf) where S is a message summary dict and sbuf is the raw
110 message. The caller MUST call rmr.rmr_free_msg(sbuf) when
111 finished with each sbuf to prevent memory leaks!
113 while not self._rmr_loop.rcv_queue.empty():
114 (summary, sbuf) = self._rmr_loop.rcv_queue.get()
115 yield (summary, sbuf)
117 def rmr_send(self, payload, mtype, retries=100):
119 Allocates a buffer, sets payload and mtype, and sends
127 retries: int (optional)
128 Number of times to retry at the application level before excepting RMRFailure
133 whether or not the send worked after retries attempts
135 sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True, mtype=mtype)
137 for _ in range(retries):
138 sbuf = rmr.rmr_send_msg(self._mrc, sbuf)
139 if sbuf.contents.state == 0:
146 def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
148 Allows the xapp to return to sender, possibly adjusting the
149 payload and message type before doing so. This does NOT free
150 the sbuf for the caller as the caller may wish to perform
151 multiple rts per buffer. The client needs to free.
155 sbuf: ctypes c_void_p
156 Pointer to an rmr message buffer
157 new_payload: bytes (optional)
159 new_mtype: int (optional)
160 New message type (replaces the received message)
161 retries: int (optional)
162 Number of times to retry at the application level before
163 throwing exception RMRFailure
168 whether or not the send worked after retries attempts
170 for _ in range(retries):
171 sbuf = rmr.rmr_rts_msg(self._mrc, sbuf, payload=new_payload, mtype=new_mtype)
172 if sbuf.contents.state == 0:
175 self.logger.warning("RTS Failed! Summary: {}".format(rmr.message_summary(sbuf)))
178 def rmr_free(self, sbuf):
180 Frees an rmr message buffer after use
182 Note: this does not need to be a class method, self is not
183 used. However if we break it out as a function we need a home
188 sbuf: ctypes c_void_p
189 Pointer to an rmr message buffer
191 rmr.rmr_free_msg(sbuf)
193 # Convenience (pass-thru) function for invoking SDL.
195 def sdl_set(self, namespace, key, value, usemsgpack=True):
197 Stores a key-value pair to SDL, optionally serializing the value
198 to bytes using msgpack.
207 Object or byte array to store. See the `usemsgpack` parameter.
208 usemsgpack: boolean (optional, default is True)
209 Determines whether the value is serialized using msgpack before storing.
210 If usemsgpack is True, the msgpack function `packb` is invoked
211 on the value to yield a byte array that is then sent to SDL.
212 Stated differently, if usemsgpack is True, the value can be anything
213 that is serializable by msgpack.
214 If usemsgpack is False, the value must be bytes.
216 self._sdl.set(namespace, key, value, usemsgpack)
218 def sdl_get(self, namespace, key, usemsgpack=True):
220 Gets the value for the specified namespace and key from SDL,
221 optionally deserializing stored bytes using msgpack.
229 usemsgpack: boolean (optional, default is True)
230 If usemsgpack is True, the byte array stored by SDL is deserialized
231 using msgpack to yield the original object that was stored.
232 If usemsgpack is False, the byte array stored by SDL is returned
233 without further processing.
238 See the usemsgpack parameter for an explanation of the returned value type.
239 Answers None if the key is not found.
241 return self._sdl.get(namespace, key, usemsgpack)
243 def sdl_find_and_get(self, namespace, prefix, usemsgpack=True):
245 Gets all key-value pairs in the specified namespace
246 with keys that start with the specified prefix,
247 optionally deserializing stored bytes using msgpack.
255 usemsgpack: boolean (optional, default is True)
256 If usemsgpack is True, the byte array stored by SDL is deserialized
257 using msgpack to yield the original value that was stored.
258 If usemsgpack is False, the byte array stored by SDL is returned
259 without further processing.
263 Dictionary of key-value pairs
264 Each key has the specified prefix.
265 The value object (its type) depends on the usemsgpack parameter,
266 but is either a Python object or raw bytes as discussed above.
267 Answers an empty dictionary if no keys matched the prefix.
269 return self._sdl.find_and_get(namespace, prefix, usemsgpack)
271 def sdl_delete(self, namespace, key):
273 Deletes the key-value pair with the specified key in the specified namespace.
282 self._sdl.delete(namespace, key)
286 def healthcheck(self):
288 this needs to be understood how this is supposed to work
290 return self._rmr_loop.healthcheck() and self._sdl.healthcheck()
292 # Convenience function for discovering config change events
294 def config_check(self, timeout=0):
296 Checks the watcher for configuration-file events. The watcher
297 prerequisites and event mask are documented in __init__().
301 timeout: int (optional)
302 Number of seconds to wait for a configuration-file event, default 0.
306 List of Events, possibly empty
307 An event is a tuple with objects wd, mask, cookie and name.
310 Event(wd=1, mask=1073742080, cookie=0, name='foo')
313 if not self._inotify:
315 events = self._inotify.read(timeout=timeout)
320 cleans up and stops the xapp rmr thread (currently). This is
321 critical for unit testing as pytest will never return if the
324 TODO: can we register a ctrl-c handler so this gets called on
325 ctrl-c? Because currently two ctrl-c are needed to stop.
327 self._rmr_loop.stop()
330 # Public classes that Xapp writers should instantiate or subclass
331 # to implement an Xapp.
334 class RMRXapp(_BaseXapp):
336 Represents an Xapp that reacts only to RMR messages; i.e., the Xapp
337 only performs an action when a message is received. Clients should
338 invoke the run method, which has a loop that waits for RMR messages
339 and calls the appropriate client-registered consume callback on each.
341 If environment variable CONFIG_FILE is defined, and that variable
342 contains a path to an existing file, this class polls a watcher
343 defined on that file to detect file-write events, and invokes a
344 configuration-change handler on each event. The handler is also
345 invoked at startup. If no handler function is supplied to the
346 constructor, this class defines a default handler that only logs a
351 default_handler: function
352 A function with the signature (summary, sbuf) to be called when a
353 message type is received for which no other handler is registered.
354 default_handler argument summary: dict
355 The RMR message summary, a dict of key-value pairs
356 default_handler argument sbuf: ctypes c_void_p
357 Pointer to an RMR message buffer. The user must call free on this when done.
358 config_handler: function (optional, default is documented above)
359 A function with the signature (json) to be called at startup and each time
360 a configuration-file change event is detected. The JSON object is read from
361 the configuration file, if the prerequisites are met.
362 config_handler argument json: dict
363 The contents of the configuration file, parsed as JSON.
364 rmr_port: integer (optional, default is 4562)
365 Initialize RMR to listen on this port
366 rmr_wait_for_ready: boolean (optional, default is True)
367 Wait for RMR to signal ready before starting the dispatch loop
368 use_fake_sdl: boolean (optional, default is False)
369 Use an in-memory store instead of the real SDL service
370 post_init: function (optional, default None)
371 Run this function after the app initializes and before the dispatch loop starts;
372 its signature should be post_init(self)
375 def __init__(self, default_handler, config_handler=None, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
381 rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl, post_init=post_init
385 self._default_handler = default_handler
386 self._config_handler = config_handler
389 # used for thread control
390 self._keep_going = True
392 # register a default healthcheck handler
393 # this default checks that rmr is working and SDL is working
394 # the user can override this and register their own handler
395 # if they wish since the "last registered callback wins".
396 def handle_healthcheck(self, summary, sbuf):
397 healthy = self.healthcheck()
398 payload = b"OK\n" if healthy else b"ERROR [RMR or SDL is unhealthy]\n"
399 self.rmr_rts(sbuf, new_payload=payload, new_mtype=RIC_HEALTH_CHECK_RESP)
402 self.register_callback(handle_healthcheck, RIC_HEALTH_CHECK_REQ)
404 # define a default configuration-change handler if none was provided.
405 if not config_handler:
406 def handle_config_change(self, config):
407 self.logger.debug("xapp_frame: default config handler invoked")
408 self._config_handler = handle_config_change
410 # call the config handler at startup if prereqs were met
412 with open(self._config_path) as json_file:
413 data = json.load(json_file)
414 self.logger.debug("run: invoking config handler at start")
415 self._config_handler(self, data)
417 def register_callback(self, handler, message_type):
419 registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type
424 a function with the signature (summary, sbuf) to be called
425 when a message of type message_type is received
427 the rmr message summary
428 sbuf: ctypes c_void_p
429 Pointer to an rmr message buffer. The user must call free on this when done.
432 the message type to look for
434 Note if this method is called multiple times for a single message type, the "last one wins".
436 self._dispatch[message_type] = handler
438 def run(self, thread=False, rmr_timeout=5, inotify_timeout=0):
440 This function should be called when the reactive Xapp is ready to start.
441 After start, the Xapp's handlers will be called on received messages.
445 thread: bool (optional, default is False)
446 If False, execution is not returned and the framework loops forever.
447 If True, a thread is started to run the queue read/dispatch loop
448 and execution is returned to caller; the thread can be stopped
449 by calling the .stop() method.
451 rmr_timeout: integer (optional, default is 5 seconds)
452 Length of time to wait for an RMR message to arrive.
454 inotify_timeout: integer (optional, default is 0 seconds)
455 Length of time to wait for an inotify event to arrive.
459 while self._keep_going:
463 (summary, sbuf) = self._rmr_loop.rcv_queue.get(block=True, timeout=rmr_timeout)
465 func = self._dispatch.get(summary[rmr.RMR_MS_MSG_TYPE], None)
467 func = self._default_handler
468 self.logger.debug("run: invoking msg handler on type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
469 func(self, summary, sbuf)
474 # poll configuration file watcher
476 events = self.config_check(timeout=inotify_timeout)
478 with open(self._config_path) as json_file:
479 data = json.load(json_file)
480 self.logger.debug("run: invoking config handler on change event {}".format(event))
481 self._config_handler(self, data)
482 except Exception as error:
483 self.logger.error("run: configuration handler failed: {}".format(error))
486 Thread(target=loop).start()
492 Sets the flag to end the dispatch loop.
495 self.logger.debug("Setting flag to end framework work loop.")
496 self._keep_going = False
499 class Xapp(_BaseXapp):
501 Represents a generic Xapp where the client provides a single function
502 for the framework to call at startup time (instead of providing callback
503 functions by message type). The Xapp writer must implement and provide a
504 function with a loop-forever construct similar to the `run` function in
505 the `RMRXapp` class. That function should poll to retrieve RMR messages
506 and dispatch them appropriately, poll for configuration changes, etc.
511 This function is called when the Xapp class's run method is invoked.
512 The function signature must be just function(self)
513 rmr_port: integer (optional, default is 4562)
514 Initialize RMR to listen on this port
515 rmr_wait_for_ready: boolean (optional, default is True)
516 Wait for RMR to signal ready before starting the dispatch loop
517 use_fake_sdl: boolean (optional, default is False)
518 Use an in-memory store instead of the real SDL service
521 def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
526 For the other parameters, see class _BaseXapp.
529 super().__init__(rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl)
530 self._entrypoint = entrypoint
534 This function should be called when the general Xapp is ready to start.
536 self._entrypoint(self)
538 # there is no need for stop currently here (base has, and nothing
539 # special to do here)