X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=ricxappframe%2Fxapp_frame.py;h=8abc443ee34e88413b85e7040818f210e06fd6a5;hb=refs%2Fchanges%2F10%2F5010%2F1;hp=87ab27e8e839430933e4cb1a9b5b993b333c3c7b;hpb=ca170d3c19485b052b3635a516c7a4e82a506ba5;p=ric-plt%2Fxapp-frame-py.git diff --git a/ricxappframe/xapp_frame.py b/ricxappframe/xapp_frame.py index 87ab27e..8abc443 100644 --- a/ricxappframe/xapp_frame.py +++ b/ricxappframe/xapp_frame.py @@ -15,54 +15,65 @@ # limitations under the License. # ================================================================================== """ -Framework for python xapps -Framework here means Xapp classes that can be subclassed +This framework for Python Xapps provides classes that Xapp writers +should instantiate and/or subclass depending on their needs. """ +import json +import os import queue from threading import Thread +import inotify_simple +from mdclogpy import Logger from ricxappframe import xapp_rmr from ricxappframe.rmr import rmr from ricxappframe.xapp_sdl import SDLWrapper -from mdclogpy import Logger -# constants +# message-type constants RIC_HEALTH_CHECK_REQ = 100 RIC_HEALTH_CHECK_RESP = 101 - -# Private base class; not for direct client use +# environment variable with path to configuration file +CONFIG_FILE_ENV = "CONFIG_FILE" class _BaseXapp: """ - Base xapp; not for client use directly - """ - - def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None): - """ - Init + This class initializes RMR, starts a thread that checks for incoming + messages, provisions an SDL object and optionally creates a + config-file watcher. This private base class should not be + instantiated by clients directly, but it defines many public methods + that may be used by clients. + + If environment variable CONFIG_FILE is defined, and that variable + contains a path to an existing file, a watcher is defined to monitor + modifications (writes) to that file using the Linux kernel's inotify + feature. The watcher must be polled by calling method + config_check(). - Parameters - ---------- - rmr_port: int - port to listen on + Parameters + ---------- + rmr_port: int (optional, default is 4562) + Port on which the RMR library listens for incoming messages. - rmr_wait_for_ready: bool (optional) + rmr_wait_for_ready: bool (optional, default is True) + If this is True, then init waits until RMR is ready to send, + which includes having a valid routing file. This can be set + to False if the client wants to *receive only*. - if this is True, then init waits until rmr is ready to send, which - includes having a valid routing file. This can be set to - False if the client only wants to *receive only*. + use_fake_sdl: bool (optional, default is False) + if this is True, it uses the DBaaS "fake dict backend" instead + of Redis or other backends. Set this to True when developing + an xapp or during unit testing to eliminate the need for DBaaS. - use_fake_sdl: bool (optional) - if this is True, it uses dbaas' "fake dict backend" instead - of Redis or other backends. Set this to true when developing - your xapp or during unit testing to completely avoid needing - a dbaas running or any network at all. + post_init: function (optional, default is None) + Runs this user-provided function at the end of the init method; + its signature should be post_init(self) + """ - post_init: function (optional) - runs this user provided function after the base xapp is - initialized; its signature should be post_init(self) + def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None): + """ + Documented in the class comment. """ # PUBLIC, can be used by xapps using self.(name): self.logger = Logger(name=__name__) @@ -72,7 +83,18 @@ class _BaseXapp: self._mrc = self._rmr_loop.mrc # for convenience # SDL - self._sdl = SDLWrapper(use_fake_sdl) + self.sdl = SDLWrapper(use_fake_sdl) + + # Config + # The environment variable specifies the path to the Xapp config file + self._config_path = os.environ.get(CONFIG_FILE_ENV, None) + if self._config_path and os.path.isfile(self._config_path): + self._inotify = inotify_simple.INotify() + self._inotify.add_watch(self._config_path, inotify_simple.flags.MODIFY) + self.logger.debug("__init__: watching config file {}".format(self._config_path)) + else: + self._inotify = None + self.logger.warning("__init__: NOT watching any config file") # run the optionally provided user post init if post_init: @@ -136,9 +158,8 @@ class _BaseXapp: New payload to set new_mtype: int (optional) New message type (replaces the received message) - retries: int (optional) - Number of times to retry at the application level before - throwing exception RMRFailure + retries: int (optional, default 100) + Number of times to retry at the application level Returns ------- @@ -150,7 +171,7 @@ class _BaseXapp: if sbuf.contents.state == 0: return True - self.logger.info("RTS Failed! Summary: {}".format(rmr.message_summary(sbuf))) + self.logger.warning("RTS Failed! Summary: {}".format(rmr.message_summary(sbuf))) return False def rmr_free(self, sbuf): @@ -168,20 +189,19 @@ class _BaseXapp: """ rmr.rmr_free_msg(sbuf) - # SDL - # NOTE, even though these are passthroughs, the separate SDL wrapper - # is useful for other applications like A1. Therefore, we don't - # embed that SDLWrapper functionality here so that it can be - # instantiated on its own. + # Convenience (pass-thru) function for invoking SDL. - def sdl_set(self, ns, key, value, usemsgpack=True): + def sdl_set(self, namespace, key, value, usemsgpack=True): """ - Stores a key-value pair, - optionally serializing the value to bytes using msgpack. + ** Deprecate Warning ** + ** Will be removed in a future function ** + + Stores a key-value pair to SDL, optionally serializing the value + to bytes using msgpack. Parameters ---------- - ns: string + namespace: string SDL namespace key: string SDL key @@ -195,16 +215,19 @@ class _BaseXapp: that is serializable by msgpack. If usemsgpack is False, the value must be bytes. """ - self._sdl.set(ns, key, value, usemsgpack) + self.sdl.set(namespace, key, value, usemsgpack) - def sdl_get(self, ns, key, usemsgpack=True): + def sdl_get(self, namespace, key, usemsgpack=True): """ - Gets the value for the specified namespace and key, + ** Deprecate Warning ** + ** Will be removed in a future function ** + + Gets the value for the specified namespace and key from SDL, optionally deserializing stored bytes using msgpack. Parameters ---------- - ns: string + namespace: string SDL namespace key: string SDL key @@ -220,17 +243,20 @@ class _BaseXapp: See the usemsgpack parameter for an explanation of the returned value type. Answers None if the key is not found. """ - return self._sdl.get(ns, key, usemsgpack) + return self.sdl.get(namespace, key, usemsgpack) - def sdl_find_and_get(self, ns, prefix, usemsgpack=True): + def sdl_find_and_get(self, namespace, prefix, usemsgpack=True): """ + ** Deprecate Warning ** + ** Will be removed in a future function ** + Gets all key-value pairs in the specified namespace with keys that start with the specified prefix, optionally deserializing stored bytes using msgpack. Parameters ---------- - ns: string + nnamespaces: string SDL namespace prefix: string the key prefix @@ -248,20 +274,23 @@ class _BaseXapp: but is either a Python object or raw bytes as discussed above. Answers an empty dictionary if no keys matched the prefix. """ - return self._sdl.find_and_get(ns, prefix, usemsgpack) + return self.sdl.find_and_get(namespace, prefix, usemsgpack) - def sdl_delete(self, ns, key): + def sdl_delete(self, namespace, key): """ + ** Deprecate Warning ** + ** Will be removed in a future function ** + Deletes the key-value pair with the specified key in the specified namespace. Parameters ---------- - ns: string + namespace: string SDL namespace key: string SDL key """ - self._sdl.delete(ns, key) + self.sdl.delete(namespace, key) # Health @@ -269,7 +298,33 @@ class _BaseXapp: """ this needs to be understood how this is supposed to work """ - return self._rmr_loop.healthcheck() and self._sdl.healthcheck() + return self._rmr_loop.healthcheck() and self.sdl.healthcheck() + + # Convenience function for discovering config change events + + def config_check(self, timeout=0): + """ + Checks the watcher for configuration-file events. The watcher + prerequisites and event mask are documented in __init__(). + + Parameters + ---------- + timeout: int (optional) + Number of seconds to wait for a configuration-file event, default 0. + + Returns + ------- + List of Events, possibly empty + An event is a tuple with objects wd, mask, cookie and name. + For example:: + + Event(wd=1, mask=1073742080, cookie=0, name='foo') + + """ + if not self._inotify: + return [] + events = self._inotify.read(timeout=timeout) + return list(events) def stop(self): """ @@ -283,25 +338,40 @@ class _BaseXapp: self._rmr_loop.stop() -# Public Classes to subclass (these subclass _BaseXapp) +# Public classes that Xapp writers should instantiate or subclass +# to implement an Xapp. class RMRXapp(_BaseXapp): """ - Represents an Xapp that reacts only to RMR messages; i.e., when - messages are received, the Xapp does something. When run is called, - the xapp framework waits for RMR messages, and calls the appropriate - client-registered consume callback on each. + Represents an Xapp that reacts only to RMR messages; i.e., the Xapp + only performs an action when a message is received. Clients should + invoke the run method, which has a loop that waits for RMR messages + and calls the appropriate client-registered consume callback on each. + + If environment variable CONFIG_FILE is defined, and that variable + contains a path to an existing file, this class polls a watcher + defined on that file to detect file-write events, and invokes a + configuration-change handler on each event. The handler is also + invoked at startup. If no handler function is supplied to the + constructor, this class defines a default handler that only logs a + message. Parameters ---------- default_handler: function - A function with the signature (summary, sbuf) to be called - when a message type is received for which no other handler is registered. + A function with the signature (summary, sbuf) to be called when a + message type is received for which no other handler is registered. default_handler argument summary: dict The RMR message summary, a dict of key-value pairs default_handler argument sbuf: ctypes c_void_p Pointer to an RMR message buffer. The user must call free on this when done. + config_handler: function (optional, default is documented above) + A function with the signature (json) to be called at startup and each time + a configuration-file change event is detected. The JSON object is read from + the configuration file, if the prerequisites are met. + config_handler argument json: dict + The contents of the configuration file, parsed as JSON. rmr_port: integer (optional, default is 4562) Initialize RMR to listen on this port rmr_wait_for_ready: boolean (optional, default is True) @@ -313,7 +383,7 @@ class RMRXapp(_BaseXapp): its signature should be post_init(self) """ - def __init__(self, default_handler, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None): + def __init__(self, default_handler, config_handler=None, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None): """ Also see _BaseXapp """ @@ -324,6 +394,7 @@ class RMRXapp(_BaseXapp): # setup callbacks self._default_handler = default_handler + self._config_handler = config_handler self._dispatch = {} # used for thread control @@ -334,13 +405,26 @@ class RMRXapp(_BaseXapp): # the user can override this and register their own handler # if they wish since the "last registered callback wins". def handle_healthcheck(self, summary, sbuf): - ok = self.healthcheck() - payload = b"OK\n" if ok else b"ERROR [RMR or SDL is unhealthy]\n" + healthy = self.healthcheck() + payload = b"OK\n" if healthy else b"ERROR [RMR or SDL is unhealthy]\n" self.rmr_rts(sbuf, new_payload=payload, new_mtype=RIC_HEALTH_CHECK_RESP) self.rmr_free(sbuf) self.register_callback(handle_healthcheck, RIC_HEALTH_CHECK_REQ) + # define a default configuration-change handler if none was provided. + if not config_handler: + def handle_config_change(self, config): + self.logger.debug("xapp_frame: default config handler invoked") + self._config_handler = handle_config_change + + # call the config handler at startup if prereqs were met + if self._inotify: + with open(self._config_path) as json_file: + data = json.load(json_file) + self.logger.debug("run: invoking config handler at start") + self._config_handler(self, data) + def register_callback(self, handler, message_type): """ registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type @@ -362,7 +446,7 @@ class RMRXapp(_BaseXapp): """ self._dispatch[message_type] = handler - def run(self, thread=False): + def run(self, thread=False, rmr_timeout=5, inotify_timeout=0): """ This function should be called when the reactive Xapp is ready to start. After start, the Xapp's handlers will be called on received messages. @@ -374,21 +458,41 @@ class RMRXapp(_BaseXapp): If True, a thread is started to run the queue read/dispatch loop and execution is returned to caller; the thread can be stopped by calling the .stop() method. + + rmr_timeout: integer (optional, default is 5 seconds) + Length of time to wait for an RMR message to arrive. + + inotify_timeout: integer (optional, default is 0 seconds) + Length of time to wait for an inotify event to arrive. """ def loop(): while self._keep_going: + + # poll RMR try: - (summary, sbuf) = self._rmr_loop.rcv_queue.get(block=True, timeout=5) + (summary, sbuf) = self._rmr_loop.rcv_queue.get(block=True, timeout=rmr_timeout) # dispatch func = self._dispatch.get(summary[rmr.RMR_MS_MSG_TYPE], None) if not func: func = self._default_handler + self.logger.debug("run: invoking msg handler on type {}".format(summary[rmr.RMR_MS_MSG_TYPE])) func(self, summary, sbuf) except queue.Empty: # the get timed out pass + # poll configuration file watcher + try: + events = self.config_check(timeout=inotify_timeout) + for event in events: + with open(self._config_path) as json_file: + data = json.load(json_file) + self.logger.debug("run: invoking config handler on change event {}".format(event)) + self._config_handler(self, data) + except Exception as error: + self.logger.error("run: configuration handler failed: {}".format(error)) + if thread: Thread(target=loop).start() else: @@ -405,8 +509,12 @@ class RMRXapp(_BaseXapp): class Xapp(_BaseXapp): """ - Represents a generic Xapp where the client provides a function for the framework to call, - which usually contains a loop-forever construct. + Represents a generic Xapp where the client provides a single function + for the framework to call at startup time (instead of providing callback + functions by message type). The Xapp writer must implement and provide a + function with a loop-forever construct similar to the `run` function in + the `RMRXapp` class. That function should poll to retrieve RMR messages + and dispatch them appropriately, poll for configuration changes, etc. Parameters ----------