X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=ricxappframe%2Fxapp_frame.py;h=87ab27e8e839430933e4cb1a9b5b993b333c3c7b;hb=81084bc31ea1d5cde6616dd2267ea01f49a1d6d1;hp=180c3ae9f91838b96b47246bab7c48d7337da77f;hpb=09894e3c7bdd6eeaae4a84467f5ea6af1a061204;p=ric-plt%2Fxapp-frame-py.git diff --git a/ricxappframe/xapp_frame.py b/ricxappframe/xapp_frame.py index 180c3ae..87ab27e 100644 --- a/ricxappframe/xapp_frame.py +++ b/ricxappframe/xapp_frame.py @@ -1,7 +1,3 @@ -""" -Framework for python xapps -Framework here means Xapp classes that can be subclassed -""" # ================================================================================== # Copyright (c) 2020 Nokia # Copyright (c) 2020 AT&T Intellectual Property. @@ -18,10 +14,16 @@ Framework here means Xapp classes that can be subclassed # See the License for the specific language governing permissions and # limitations under the License. # ================================================================================== +""" +Framework for python xapps +Framework here means Xapp classes that can be subclassed +""" + +import queue from threading import Thread from ricxappframe import xapp_rmr +from ricxappframe.rmr import rmr from ricxappframe.xapp_sdl import SDLWrapper -from rmr import rmr from mdclogpy import Logger # constants @@ -47,16 +49,20 @@ class _BaseXapp: port to listen on rmr_wait_for_ready: bool (optional) - if this is True, then init waits until rmr is ready to send, which includes having a valid routing file. - this can be set to False if the client only wants to *receive only* + + if this is True, then init waits until rmr is ready to send, which + includes having a valid routing file. This can be set to + False if the client only wants to *receive only*. use_fake_sdl: bool (optional) - if this is True, it uses dbaas' "fake dict backend" instead of Redis or other backends. - Set this to true when developing your xapp or during unit testing to completely avoid needing a dbaas running or any network at all + if this is True, it uses dbaas' "fake dict backend" instead + of Redis or other backends. Set this to true when developing + your xapp or during unit testing to completely avoid needing + a dbaas running or any network at all. post_init: function (optional) - runs this user provided function after the base xapp is initialized - it's signature should be post_init(self) + runs this user provided function after the base xapp is + initialized; its signature should be post_init(self) """ # PUBLIC, can be used by xapps using self.(name): self.logger = Logger(name=__name__) @@ -76,7 +82,11 @@ class _BaseXapp: def rmr_get_messages(self): """ - returns a generator iterable over all current messages in the queue that have not yet been read by the client xapp + Returns a generator iterable over all items in the queue that + have not yet been read by the client xapp. Each item is a tuple + (S, sbuf) where S is a message summary dict and sbuf is the raw + message. The caller MUST call rmr.rmr_free_msg(sbuf) when + finished with each sbuf to prevent memory leaks! """ while not self._rmr_loop.rcv_queue.empty(): (summary, sbuf) = self._rmr_loop.rcv_queue.get() @@ -113,10 +123,10 @@ class _BaseXapp: def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100): """ - Allows the xapp to return to sender, possibly adjusting the payload and message type before doing so - - This does NOT free the sbuf for the caller as the caller may wish to perform multiple rts per buffer. - The client needs to free. + Allows the xapp to return to sender, possibly adjusting the + payload and message type before doing so. This does NOT free + the sbuf for the caller as the caller may wish to perform + multiple rts per buffer. The client needs to free. Parameters ---------- @@ -127,7 +137,8 @@ class _BaseXapp: new_mtype: int (optional) New message type (replaces the received message) retries: int (optional) - Number of times to retry at the application level before excepting RMRFailure + Number of times to retry at the application level before + throwing exception RMRFailure Returns ------- @@ -144,9 +155,12 @@ class _BaseXapp: def rmr_free(self, sbuf): """ - Free an rmr message buffer after use + Frees an rmr message buffer after use + + Note: this does not need to be a class method, self is not + used. However if we break it out as a function we need a home + for it. - Note: this does not need to be a class method, self is not used. However if we break it out as a function we need a home for it. Parameters ---------- sbuf: ctypes c_void_p @@ -155,79 +169,97 @@ class _BaseXapp: rmr.rmr_free_msg(sbuf) # SDL - # NOTE, even though these are passthroughs, the seperate SDL wrapper is useful for other applications like A1. - # Therefore, we don't embed that SDLWrapper functionality here so that it can be instantiated on it's own. + # NOTE, even though these are passthroughs, the separate SDL wrapper + # is useful for other applications like A1. Therefore, we don't + # embed that SDLWrapper functionality here so that it can be + # instantiated on its own. def sdl_set(self, ns, key, value, usemsgpack=True): """ - set a key + Stores a key-value pair, + optionally serializing the value to bytes using msgpack. Parameters ---------- ns: string - the sdl namespace + SDL namespace key: string - the sdl key + SDL key value: - if usemsgpack is True, value can be anything serializable by msgpack - if usemsgpack is False, value must be bytes - usemsgpack: boolean (optional) - determines whether the value is serialized using msgpack + Object or byte array to store. See the `usemsgpack` parameter. + usemsgpack: boolean (optional, default is True) + Determines whether the value is serialized using msgpack before storing. + If usemsgpack is True, the msgpack function `packb` is invoked + on the value to yield a byte array that is then sent to SDL. + Stated differently, if usemsgpack is True, the value can be anything + that is serializable by msgpack. + If usemsgpack is False, the value must be bytes. """ self._sdl.set(ns, key, value, usemsgpack) def sdl_get(self, ns, key, usemsgpack=True): """ - get a key + Gets the value for the specified namespace and key, + optionally deserializing stored bytes using msgpack. Parameters ---------- ns: string - the sdl namespace + SDL namespace key: string - the sdl key - usemsgpack: boolean (optional) - if usemsgpack is True, the value is deserialized using msgpack - if usemsgpack is False, the value is returned as raw bytes + SDL key + usemsgpack: boolean (optional, default is True) + If usemsgpack is True, the byte array stored by SDL is deserialized + using msgpack to yield the original object that was stored. + If usemsgpack is False, the byte array stored by SDL is returned + without further processing. Returns ------- - None (if not exist) or see above; depends on usemsgpack + Value + See the usemsgpack parameter for an explanation of the returned value type. + Answers None if the key is not found. """ return self._sdl.get(ns, key, usemsgpack) def sdl_find_and_get(self, ns, prefix, usemsgpack=True): """ - get all k v pairs that start with prefix + Gets all key-value pairs in the specified namespace + with keys that start with the specified prefix, + optionally deserializing stored bytes using msgpack. Parameters ---------- ns: string - the sdl namespace - key: string - the sdl key + SDL namespace prefix: string - the prefix - usemsgpack: boolean (optional) - if usemsgpack is True, the value returned is a dict where each value has been deserialized using msgpack - if usemsgpack is False, the value returned is as a dict mapping keys to raw bytes + the key prefix + usemsgpack: boolean (optional, default is True) + If usemsgpack is True, the byte array stored by SDL is deserialized + using msgpack to yield the original value that was stored. + If usemsgpack is False, the byte array stored by SDL is returned + without further processing. Returns ------- - {} (if no keys match) or see above; depends on usemsgpack + Dictionary of key-value pairs + Each key has the specified prefix. + The value object (its type) depends on the usemsgpack parameter, + but is either a Python object or raw bytes as discussed above. + Answers an empty dictionary if no keys matched the prefix. """ return self._sdl.find_and_get(ns, prefix, usemsgpack) def sdl_delete(self, ns, key): """ - delete a key + Deletes the key-value pair with the specified key in the specified namespace. Parameters ---------- ns: string - the sdl namespace + SDL namespace key: string - the sdl key + SDL key """ self._sdl.delete(ns, key) @@ -241,10 +273,12 @@ class _BaseXapp: def stop(self): """ - cleans up and stops the xapp rmr thread (currently) - This is critical for unit testing as pytest will never return if the thread is running. + cleans up and stops the xapp rmr thread (currently). This is + critical for unit testing as pytest will never return if the + thread is running. - TODO: can we register a ctrl-c handler so this gets called on ctrl-c? Because currently two ctrl-c are needed to stop + TODO: can we register a ctrl-c handler so this gets called on + ctrl-c? Because currently two ctrl-c are needed to stop. """ self._rmr_loop.stop() @@ -254,26 +288,34 @@ class _BaseXapp: class RMRXapp(_BaseXapp): """ - Represents an xapp that is purely driven by rmr messages (i.e., when messages are received, the xapp does something - When run is called, the xapp framework waits for rmr messages, and calls the client provided consume callback on every one + Represents an Xapp that reacts only to RMR messages; i.e., when + messages are received, the Xapp does something. When run is called, + the xapp framework waits for RMR messages, and calls the appropriate + client-registered consume callback on each. + + Parameters + ---------- + default_handler: function + A function with the signature (summary, sbuf) to be called + when a message type is received for which no other handler is registered. + default_handler argument summary: dict + The RMR message summary, a dict of key-value pairs + default_handler argument sbuf: ctypes c_void_p + Pointer to an RMR message buffer. The user must call free on this when done. + rmr_port: integer (optional, default is 4562) + Initialize RMR to listen on this port + rmr_wait_for_ready: boolean (optional, default is True) + Wait for RMR to signal ready before starting the dispatch loop + use_fake_sdl: boolean (optional, default is False) + Use an in-memory store instead of the real SDL service + post_init: function (optional, default None) + Run this function after the app initializes and before the dispatch loop starts; + its signature should be post_init(self) """ def __init__(self, default_handler, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None): """ - Parameters - ---------- - default_handler: function - a function with the signature (summary, sbuf) to be called when a message of type message_type is received - summary: dict - the rmr message summary - sbuf: ctypes c_void_p - Pointer to an rmr message buffer. The user must call free on this when done. - - post_init: function (optional) - optionally runs this function after the app initializes and before the run loop - it's signature should be post_init(self) - - For the other parameters, see _BaseXapp + Also see _BaseXapp """ # init base super().__init__( @@ -289,7 +331,8 @@ class RMRXapp(_BaseXapp): # register a default healthcheck handler # this default checks that rmr is working and SDL is working - # the user can override this and register their own handler if they wish since the "last registered callback wins". + # the user can override this and register their own handler + # if they wish since the "last registered callback wins". def handle_healthcheck(self, summary, sbuf): ok = self.healthcheck() payload = b"OK\n" if ok else b"ERROR [RMR or SDL is unhealthy]\n" @@ -305,11 +348,12 @@ class RMRXapp(_BaseXapp): Parameters ---------- handler: function - a function with the signature (summary, sbuf) to be called when a message of type message_type is received - summary: dict - the rmr message summary - sbuf: ctypes c_void_p - Pointer to an rmr message buffer. The user must call free on this when done. + a function with the signature (summary, sbuf) to be called + when a message of type message_type is received + summary: dict + the rmr message summary + sbuf: ctypes c_void_p + Pointer to an rmr message buffer. The user must call free on this when done. message:type: int the message type to look for @@ -320,25 +364,30 @@ class RMRXapp(_BaseXapp): def run(self, thread=False): """ - This function should be called when the client xapp is ready to wait for their handlers to be called on received messages + This function should be called when the reactive Xapp is ready to start. + After start, the Xapp's handlers will be called on received messages. Parameters ---------- - thread: bool (optional) - if thread is True, execution is returned to caller and the queue read loop is executed in a thread. - The thread can be stopped using .stop() - if False, execution is not returned and the framework loops + thread: bool (optional, default is False) + If False, execution is not returned and the framework loops forever. + If True, a thread is started to run the queue read/dispatch loop + and execution is returned to caller; the thread can be stopped + by calling the .stop() method. """ def loop(): while self._keep_going: - if not self._rmr_loop.rcv_queue.empty(): - (summary, sbuf) = self._rmr_loop.rcv_queue.get() + try: + (summary, sbuf) = self._rmr_loop.rcv_queue.get(block=True, timeout=5) # dispatch - func = self._dispatch.get(summary["message type"], None) + func = self._dispatch.get(summary[rmr.RMR_MS_MSG_TYPE], None) if not func: func = self._default_handler func(self, summary, sbuf) + except queue.Empty: + # the get timed out + pass if thread: Thread(target=loop).start() @@ -347,27 +396,37 @@ class RMRXapp(_BaseXapp): def stop(self): """ - stops the rmr xapp completely. + Sets the flag to end the dispatch loop. """ super().stop() - self.logger.debug("Stopping queue reading thread..") + self.logger.debug("Setting flag to end framework work loop.") self._keep_going = False class Xapp(_BaseXapp): """ - Represents an xapp where the client provides a generic function to call, which is mostly likely a loop-forever loop + Represents a generic Xapp where the client provides a function for the framework to call, + which usually contains a loop-forever construct. + + Parameters + ---------- + entrypoint: function + This function is called when the Xapp class's run method is invoked. + The function signature must be just function(self) + rmr_port: integer (optional, default is 4562) + Initialize RMR to listen on this port + rmr_wait_for_ready: boolean (optional, default is True) + Wait for RMR to signal ready before starting the dispatch loop + use_fake_sdl: boolean (optional, default is False) + Use an in-memory store instead of the real SDL service """ def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False): """ Parameters ---------- - entrypoint: function - this function is called when the xapp runs; this is the user code - it's signature should be function(self) - For the other parameters, see _BaseXapp + For the other parameters, see class _BaseXapp. """ # init base super().__init__(rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl) @@ -375,8 +434,9 @@ class Xapp(_BaseXapp): def run(self): """ - This function should be called when the client xapp is ready to start their code + This function should be called when the general Xapp is ready to start. """ self._entrypoint(self) - # there is no need for stop currently here (base has, and nothing special to do here) + # there is no need for stop currently here (base has, and nothing + # special to do here)