-"""
-Framework for python xapps
-Framework here means Xapp classes that can be subclassed
-"""
# ==================================================================================
# Copyright (c) 2020 Nokia
# Copyright (c) 2020 AT&T Intellectual Property.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==================================================================================
+"""
+This framework for Python Xapps provides classes that Xapp writers
+should instantiate and/or subclass depending on their needs.
+"""
+
+import json
+import os
+import queue
from threading import Thread
+import inotify_simple
+from mdclogpy import Logger
from ricxappframe import xapp_rmr
+from ricxappframe.rmr import rmr
from ricxappframe.xapp_sdl import SDLWrapper
-from rmr import rmr
-from mdclogpy import Logger
-# constants
+# message-type constants
RIC_HEALTH_CHECK_REQ = 100
RIC_HEALTH_CHECK_RESP = 101
-
-# Private base class; not for direct client use
+# environment variable with path to configuration file
+CONFIG_FILE_ENV = "CONFIG_FILE"
class _BaseXapp:
"""
- Base xapp; not for client use directly
+ This class initializes RMR, starts a thread that checks for incoming
+ messages, provisions an SDL object and optionally creates a
+ config-file watcher. This private base class should not be
+ instantiated by clients directly, but it defines many public methods
+ that may be used by clients.
+
+ If environment variable CONFIG_FILE is defined, and that variable
+ contains a path to an existing file, a watcher is defined to monitor
+ modifications (writes) to that file using the Linux kernel's inotify
+ feature. The watcher must be polled by calling method
+ config_check().
+
+ Parameters
+ ----------
+ rmr_port: int (optional, default is 4562)
+ Port on which the RMR library listens for incoming messages.
+
+ rmr_wait_for_ready: bool (optional, default is True)
+ If this is True, then init waits until RMR is ready to send,
+ which includes having a valid routing file. This can be set
+ to False if the client wants to *receive only*.
+
+ use_fake_sdl: bool (optional, default is False)
+ if this is True, it uses the DBaaS "fake dict backend" instead
+ of Redis or other backends. Set this to True when developing
+ an xapp or during unit testing to eliminate the need for DBaaS.
+
+ post_init: function (optional, default is None)
+ Runs this user-provided function at the end of the init method;
+ its signature should be post_init(self)
"""
def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
"""
- Init
-
- Parameters
- ----------
- rmr_port: int
- port to listen on
-
- rmr_wait_for_ready: bool (optional)
- if this is True, then init waits until rmr is ready to send, which includes having a valid routing file.
- this can be set to False if the client only wants to *receive only*
-
- use_fake_sdl: bool (optional)
- if this is True, it uses dbaas' "fake dict backend" instead of Redis or other backends.
- Set this to true when developing your xapp or during unit testing to completely avoid needing a dbaas running or any network at all
-
- post_init: function (optional)
- runs this user provided function after the base xapp is initialized
- it's signature should be post_init(self)
+ Documented in the class comment.
"""
# PUBLIC, can be used by xapps using self.(name):
self.logger = Logger(name=__name__)
# SDL
self._sdl = SDLWrapper(use_fake_sdl)
+ # Config
+ # The environment variable specifies the path to the Xapp config file
+ self._config_path = os.environ.get(CONFIG_FILE_ENV, None)
+ if self._config_path and os.path.isfile(self._config_path):
+ self._inotify = inotify_simple.INotify()
+ self._inotify.add_watch(self._config_path, inotify_simple.flags.MODIFY)
+ self.logger.debug("__init__: watching config file {}".format(self._config_path))
+ else:
+ self._inotify = None
+ self.logger.warning("__init__: NOT watching any config file")
+
# run the optionally provided user post init
if post_init:
post_init(self)
def rmr_get_messages(self):
"""
- returns a generator iterable over all current messages in the queue that have not yet been read by the client xapp
+ Returns a generator iterable over all items in the queue that
+ have not yet been read by the client xapp. Each item is a tuple
+ (S, sbuf) where S is a message summary dict and sbuf is the raw
+ message. The caller MUST call rmr.rmr_free_msg(sbuf) when
+ finished with each sbuf to prevent memory leaks!
"""
while not self._rmr_loop.rcv_queue.empty():
(summary, sbuf) = self._rmr_loop.rcv_queue.get()
def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
"""
- Allows the xapp to return to sender, possibly adjusting the payload and message type before doing so
-
- This does NOT free the sbuf for the caller as the caller may wish to perform multiple rts per buffer.
- The client needs to free.
+ Allows the xapp to return to sender, possibly adjusting the
+ payload and message type before doing so. This does NOT free
+ the sbuf for the caller as the caller may wish to perform
+ multiple rts per buffer. The client needs to free.
Parameters
----------
New payload to set
new_mtype: int (optional)
New message type (replaces the received message)
- retries: int (optional)
- Number of times to retry at the application level before excepting RMRFailure
+ retries: int (optional, default 100)
+ Number of times to retry at the application level
Returns
-------
if sbuf.contents.state == 0:
return True
- self.logger.info("RTS Failed! Summary: {}".format(rmr.message_summary(sbuf)))
+ self.logger.warning("RTS Failed! Summary: {}".format(rmr.message_summary(sbuf)))
return False
def rmr_free(self, sbuf):
"""
- Free an rmr message buffer after use
+ Frees an rmr message buffer after use
+
+ Note: this does not need to be a class method, self is not
+ used. However if we break it out as a function we need a home
+ for it.
- Note: this does not need to be a class method, self is not used. However if we break it out as a function we need a home for it.
Parameters
----------
sbuf: ctypes c_void_p
"""
rmr.rmr_free_msg(sbuf)
- # SDL
- # NOTE, even though these are passthroughs, the seperate SDL wrapper is useful for other applications like A1.
- # Therefore, we don't embed that SDLWrapper functionality here so that it can be instantiated on it's own.
+ # Convenience (pass-thru) function for invoking SDL.
- def sdl_set(self, ns, key, value, usemsgpack=True):
+ def sdl_set(self, namespace, key, value, usemsgpack=True):
"""
- set a key
+ Stores a key-value pair to SDL, optionally serializing the value
+ to bytes using msgpack.
Parameters
----------
- ns: string
- the sdl namespace
+ namespace: string
+ SDL namespace
key: string
- the sdl key
+ SDL key
value:
- if usemsgpack is True, value can be anything serializable by msgpack
- if usemsgpack is False, value must be bytes
- usemsgpack: boolean (optional)
- determines whether the value is serialized using msgpack
+ Object or byte array to store. See the `usemsgpack` parameter.
+ usemsgpack: boolean (optional, default is True)
+ Determines whether the value is serialized using msgpack before storing.
+ If usemsgpack is True, the msgpack function `packb` is invoked
+ on the value to yield a byte array that is then sent to SDL.
+ Stated differently, if usemsgpack is True, the value can be anything
+ that is serializable by msgpack.
+ If usemsgpack is False, the value must be bytes.
"""
- self._sdl.set(ns, key, value, usemsgpack)
+ self._sdl.set(namespace, key, value, usemsgpack)
- def sdl_get(self, ns, key, usemsgpack=True):
+ def sdl_get(self, namespace, key, usemsgpack=True):
"""
- get a key
+ Gets the value for the specified namespace and key from SDL,
+ optionally deserializing stored bytes using msgpack.
Parameters
----------
- ns: string
- the sdl namespace
+ namespace: string
+ SDL namespace
key: string
- the sdl key
- usemsgpack: boolean (optional)
- if usemsgpack is True, the value is deserialized using msgpack
- if usemsgpack is False, the value is returned as raw bytes
+ SDL key
+ usemsgpack: boolean (optional, default is True)
+ If usemsgpack is True, the byte array stored by SDL is deserialized
+ using msgpack to yield the original object that was stored.
+ If usemsgpack is False, the byte array stored by SDL is returned
+ without further processing.
Returns
-------
- None (if not exist) or see above; depends on usemsgpack
+ Value
+ See the usemsgpack parameter for an explanation of the returned value type.
+ Answers None if the key is not found.
"""
- return self._sdl.get(ns, key, usemsgpack)
+ return self._sdl.get(namespace, key, usemsgpack)
- def sdl_find_and_get(self, ns, prefix, usemsgpack=True):
+ def sdl_find_and_get(self, namespace, prefix, usemsgpack=True):
"""
- get all k v pairs that start with prefix
+ Gets all key-value pairs in the specified namespace
+ with keys that start with the specified prefix,
+ optionally deserializing stored bytes using msgpack.
Parameters
----------
- ns: string
- the sdl namespace
- key: string
- the sdl key
+ nnamespaces: string
+ SDL namespace
prefix: string
- the prefix
- usemsgpack: boolean (optional)
- if usemsgpack is True, the value returned is a dict where each value has been deserialized using msgpack
- if usemsgpack is False, the value returned is as a dict mapping keys to raw bytes
+ the key prefix
+ usemsgpack: boolean (optional, default is True)
+ If usemsgpack is True, the byte array stored by SDL is deserialized
+ using msgpack to yield the original value that was stored.
+ If usemsgpack is False, the byte array stored by SDL is returned
+ without further processing.
Returns
-------
- {} (if no keys match) or see above; depends on usemsgpack
+ Dictionary of key-value pairs
+ Each key has the specified prefix.
+ The value object (its type) depends on the usemsgpack parameter,
+ but is either a Python object or raw bytes as discussed above.
+ Answers an empty dictionary if no keys matched the prefix.
"""
- return self._sdl.find_and_get(ns, prefix, usemsgpack)
+ return self._sdl.find_and_get(namespace, prefix, usemsgpack)
- def sdl_delete(self, ns, key):
+ def sdl_delete(self, namespace, key):
"""
- delete a key
+ Deletes the key-value pair with the specified key in the specified namespace.
Parameters
----------
- ns: string
- the sdl namespace
+ namespace: string
+ SDL namespace
key: string
- the sdl key
+ SDL key
"""
- self._sdl.delete(ns, key)
+ self._sdl.delete(namespace, key)
# Health
"""
return self._rmr_loop.healthcheck() and self._sdl.healthcheck()
+ # Convenience function for discovering config change events
+
+ def config_check(self, timeout=0):
+ """
+ Checks the watcher for configuration-file events. The watcher
+ prerequisites and event mask are documented in __init__().
+
+ Parameters
+ ----------
+ timeout: int (optional)
+ Number of seconds to wait for a configuration-file event, default 0.
+
+ Returns
+ -------
+ List of Events, possibly empty
+ An event is a tuple with objects wd, mask, cookie and name.
+ For example::
+
+ Event(wd=1, mask=1073742080, cookie=0, name='foo')
+
+ """
+ if not self._inotify:
+ return []
+ events = self._inotify.read(timeout=timeout)
+ return list(events)
+
def stop(self):
"""
- cleans up and stops the xapp rmr thread (currently)
- This is critical for unit testing as pytest will never return if the thread is running.
+ cleans up and stops the xapp rmr thread (currently). This is
+ critical for unit testing as pytest will never return if the
+ thread is running.
- TODO: can we register a ctrl-c handler so this gets called on ctrl-c? Because currently two ctrl-c are needed to stop
+ TODO: can we register a ctrl-c handler so this gets called on
+ ctrl-c? Because currently two ctrl-c are needed to stop.
"""
self._rmr_loop.stop()
-# Public Classes to subclass (these subclass _BaseXapp)
+# Public classes that Xapp writers should instantiate or subclass
+# to implement an Xapp.
class RMRXapp(_BaseXapp):
"""
- Represents an xapp that is purely driven by rmr messages (i.e., when messages are received, the xapp does something
- When run is called, the xapp framework waits for rmr messages, and calls the client provided consume callback on every one
+ Represents an Xapp that reacts only to RMR messages; i.e., the Xapp
+ only performs an action when a message is received. Clients should
+ invoke the run method, which has a loop that waits for RMR messages
+ and calls the appropriate client-registered consume callback on each.
+
+ If environment variable CONFIG_FILE is defined, and that variable
+ contains a path to an existing file, this class polls a watcher
+ defined on that file to detect file-write events, and invokes a
+ configuration-change handler on each event. The handler is also
+ invoked at startup. If no handler function is supplied to the
+ constructor, this class defines a default handler that only logs a
+ message.
+
+ Parameters
+ ----------
+ default_handler: function
+ A function with the signature (summary, sbuf) to be called when a
+ message type is received for which no other handler is registered.
+ default_handler argument summary: dict
+ The RMR message summary, a dict of key-value pairs
+ default_handler argument sbuf: ctypes c_void_p
+ Pointer to an RMR message buffer. The user must call free on this when done.
+ config_handler: function (optional, default is documented above)
+ A function with the signature (json) to be called at startup and each time
+ a configuration-file change event is detected. The JSON object is read from
+ the configuration file, if the prerequisites are met.
+ config_handler argument json: dict
+ The contents of the configuration file, parsed as JSON.
+ rmr_port: integer (optional, default is 4562)
+ Initialize RMR to listen on this port
+ rmr_wait_for_ready: boolean (optional, default is True)
+ Wait for RMR to signal ready before starting the dispatch loop
+ use_fake_sdl: boolean (optional, default is False)
+ Use an in-memory store instead of the real SDL service
+ post_init: function (optional, default None)
+ Run this function after the app initializes and before the dispatch loop starts;
+ its signature should be post_init(self)
"""
- def __init__(self, default_handler, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
+ def __init__(self, default_handler, config_handler=None, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
"""
- Parameters
- ----------
- default_handler: function
- a function with the signature (summary, sbuf) to be called when a message of type message_type is received
- summary: dict
- the rmr message summary
- sbuf: ctypes c_void_p
- Pointer to an rmr message buffer. The user must call free on this when done.
-
- post_init: function (optional)
- optionally runs this function after the app initializes and before the run loop
- it's signature should be post_init(self)
-
- For the other parameters, see _BaseXapp
+ Also see _BaseXapp
"""
# init base
super().__init__(
# setup callbacks
self._default_handler = default_handler
+ self._config_handler = config_handler
self._dispatch = {}
# used for thread control
# register a default healthcheck handler
# this default checks that rmr is working and SDL is working
- # the user can override this and register their own handler if they wish since the "last registered callback wins".
+ # the user can override this and register their own handler
+ # if they wish since the "last registered callback wins".
def handle_healthcheck(self, summary, sbuf):
- ok = self.healthcheck()
- payload = b"OK\n" if ok else b"ERROR [RMR or SDL is unhealthy]\n"
+ healthy = self.healthcheck()
+ payload = b"OK\n" if healthy else b"ERROR [RMR or SDL is unhealthy]\n"
self.rmr_rts(sbuf, new_payload=payload, new_mtype=RIC_HEALTH_CHECK_RESP)
self.rmr_free(sbuf)
self.register_callback(handle_healthcheck, RIC_HEALTH_CHECK_REQ)
+ # define a default configuration-change handler if none was provided.
+ if not config_handler:
+ def handle_config_change(self, config):
+ self.logger.debug("xapp_frame: default config handler invoked")
+ self._config_handler = handle_config_change
+
+ # call the config handler at startup if prereqs were met
+ if self._inotify:
+ with open(self._config_path) as json_file:
+ data = json.load(json_file)
+ self.logger.debug("run: invoking config handler at start")
+ self._config_handler(self, data)
+
def register_callback(self, handler, message_type):
"""
registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type
Parameters
----------
handler: function
- a function with the signature (summary, sbuf) to be called when a message of type message_type is received
- summary: dict
- the rmr message summary
- sbuf: ctypes c_void_p
- Pointer to an rmr message buffer. The user must call free on this when done.
+ a function with the signature (summary, sbuf) to be called
+ when a message of type message_type is received
+ summary: dict
+ the rmr message summary
+ sbuf: ctypes c_void_p
+ Pointer to an rmr message buffer. The user must call free on this when done.
message:type: int
the message type to look for
"""
self._dispatch[message_type] = handler
- def run(self, thread=False):
+ def run(self, thread=False, rmr_timeout=5, inotify_timeout=0):
"""
- This function should be called when the client xapp is ready to wait for their handlers to be called on received messages
+ This function should be called when the reactive Xapp is ready to start.
+ After start, the Xapp's handlers will be called on received messages.
Parameters
----------
- thread: bool (optional)
- if thread is True, execution is returned to caller and the queue read loop is executed in a thread.
- The thread can be stopped using .stop()
- if False, execution is not returned and the framework loops
+ thread: bool (optional, default is False)
+ If False, execution is not returned and the framework loops forever.
+ If True, a thread is started to run the queue read/dispatch loop
+ and execution is returned to caller; the thread can be stopped
+ by calling the .stop() method.
+
+ rmr_timeout: integer (optional, default is 5 seconds)
+ Length of time to wait for an RMR message to arrive.
+
+ inotify_timeout: integer (optional, default is 0 seconds)
+ Length of time to wait for an inotify event to arrive.
"""
def loop():
while self._keep_going:
- if not self._rmr_loop.rcv_queue.empty():
- (summary, sbuf) = self._rmr_loop.rcv_queue.get()
+
+ # poll RMR
+ try:
+ (summary, sbuf) = self._rmr_loop.rcv_queue.get(block=True, timeout=rmr_timeout)
# dispatch
- func = self._dispatch.get(summary["message type"], None)
+ func = self._dispatch.get(summary[rmr.RMR_MS_MSG_TYPE], None)
if not func:
func = self._default_handler
+ self.logger.debug("run: invoking msg handler on type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
func(self, summary, sbuf)
+ except queue.Empty:
+ # the get timed out
+ pass
+
+ # poll configuration file watcher
+ try:
+ events = self.config_check(timeout=inotify_timeout)
+ for event in events:
+ with open(self._config_path) as json_file:
+ data = json.load(json_file)
+ self.logger.debug("run: invoking config handler on change event {}".format(event))
+ self._config_handler(self, data)
+ except Exception as error:
+ self.logger.error("run: configuration handler failed: {}".format(error))
if thread:
Thread(target=loop).start()
def stop(self):
"""
- stops the rmr xapp completely.
+ Sets the flag to end the dispatch loop.
"""
super().stop()
- self.logger.debug("Stopping queue reading thread..")
+ self.logger.debug("Setting flag to end framework work loop.")
self._keep_going = False
class Xapp(_BaseXapp):
"""
- Represents an xapp where the client provides a generic function to call, which is mostly likely a loop-forever loop
+ Represents a generic Xapp where the client provides a single function
+ for the framework to call at startup time (instead of providing callback
+ functions by message type). The Xapp writer must implement and provide a
+ function with a loop-forever construct similar to the `run` function in
+ the `RMRXapp` class. That function should poll to retrieve RMR messages
+ and dispatch them appropriately, poll for configuration changes, etc.
+
+ Parameters
+ ----------
+ entrypoint: function
+ This function is called when the Xapp class's run method is invoked.
+ The function signature must be just function(self)
+ rmr_port: integer (optional, default is 4562)
+ Initialize RMR to listen on this port
+ rmr_wait_for_ready: boolean (optional, default is True)
+ Wait for RMR to signal ready before starting the dispatch loop
+ use_fake_sdl: boolean (optional, default is False)
+ Use an in-memory store instead of the real SDL service
"""
def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
"""
Parameters
----------
- entrypoint: function
- this function is called when the xapp runs; this is the user code
- it's signature should be function(self)
- For the other parameters, see _BaseXapp
+ For the other parameters, see class _BaseXapp.
"""
# init base
super().__init__(rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl)
def run(self):
"""
- This function should be called when the client xapp is ready to start their code
+ This function should be called when the general Xapp is ready to start.
"""
self._entrypoint(self)
- # there is no need for stop currently here (base has, and nothing special to do here)
+ # there is no need for stop currently here (base has, and nothing
+ # special to do here)