-"""
-Framework for python xapps
-Framework here means Xapp classes that can be subclassed
-"""
# ==================================================================================
# Copyright (c) 2020 Nokia
# Copyright (c) 2020 AT&T Intellectual Property.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==================================================================================
+"""
+Framework for python xapps
+Framework here means Xapp classes that can be subclassed
+"""
-
+import queue
+from threading import Thread
from ricxappframe import xapp_rmr
+from ricxappframe.rmr import rmr
from ricxappframe.xapp_sdl import SDLWrapper
-from rmr import rmr
from mdclogpy import Logger
-
-mdc_logger = Logger(name=__name__)
+# constants
+RIC_HEALTH_CHECK_REQ = 100
+RIC_HEALTH_CHECK_RESP = 101
# Private base class; not for direct client use
Base xapp; not for client use directly
"""
- def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
+ def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
"""
Init
port to listen on
rmr_wait_for_ready: bool (optional)
- if this is True, then init waits until rmr is ready to send, which includes having a valid routing file.
- this can be set to False if the client only wants to *receive only*
+
+ if this is True, then init waits until rmr is ready to send, which
+ includes having a valid routing file. This can be set to
+ False if the client only wants to *receive only*.
use_fake_sdl: bool (optional)
- if this is True, it uses dbaas' "fake dict backend" instead of Redis or other backends.
- Set this to true when developing your xapp or during unit testing to completely avoid needing a dbaas running or any network at all
+ if this is True, it uses dbaas' "fake dict backend" instead
+ of Redis or other backends. Set this to true when developing
+ your xapp or during unit testing to completely avoid needing
+ a dbaas running or any network at all.
+
+ post_init: function (optional)
+ runs this user provided function after the base xapp is
+ initialized; its signature should be post_init(self)
"""
+ # PUBLIC, can be used by xapps using self.(name):
+ self.logger = Logger(name=__name__)
# Start rmr rcv thread
self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready)
self._sdl = SDLWrapper(use_fake_sdl)
# run the optionally provided user post init
- self.post_init()
-
- # Public methods to be implemented by the client
- def post_init(self):
- """
- this method can optionally be implemented by the client to run code immediately after the xall initialized (but before the xapp starts it's processing loop)
- the base method here does nothing (ie nothing is executed prior to starting if the client does not implement this)
- """
- pass
+ if post_init:
+ post_init(self)
# Public rmr methods
def rmr_get_messages(self):
"""
- returns a generator iterable over all current messages in the queue that have not yet been read by the client xapp
+ Returns a generator iterable over all items in the queue that
+ have not yet been read by the client xapp. Each item is a tuple
+ (S, sbuf) where S is a message summary dict and sbuf is the raw
+ message. The caller MUST call rmr.rmr_free_msg(sbuf) when
+ finished with each sbuf to prevent memory leaks!
"""
while not self._rmr_loop.rcv_queue.empty():
(summary, sbuf) = self._rmr_loop.rcv_queue.get()
def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
"""
- Allows the xapp to return to sender, possibly adjusting the payload and message type before doing so
-
- This does NOT free the sbuf for the caller as the caller may wish to perform multiple rts per buffer.
- The client needs to free.
+ Allows the xapp to return to sender, possibly adjusting the
+ payload and message type before doing so. This does NOT free
+ the sbuf for the caller as the caller may wish to perform
+ multiple rts per buffer. The client needs to free.
Parameters
----------
new_mtype: int (optional)
New message type (replaces the received message)
retries: int (optional)
- Number of times to retry at the application level before excepting RMRFailure
+ Number of times to retry at the application level before
+ throwing exception RMRFailure
Returns
-------
if sbuf.contents.state == 0:
return True
+ self.logger.info("RTS Failed! Summary: {}".format(rmr.message_summary(sbuf)))
return False
def rmr_free(self, sbuf):
"""
- Free an rmr message buffer after use
+ Frees an rmr message buffer after use
+
+ Note: this does not need to be a class method, self is not
+ used. However if we break it out as a function we need a home
+ for it.
- Note: this does not need to be a class method, self is not used. However if we break it out as a function we need a home for it.
Parameters
----------
sbuf: ctypes c_void_p
rmr.rmr_free_msg(sbuf)
# SDL
- # NOTE, even though these are passthroughs, the seperate SDL wrapper is useful for other applications like A1.
- # Therefore, we don't embed that SDLWrapper functionality here so that it can be instantiated on it's own.
+ # NOTE, even though these are passthroughs, the seperate SDL wrapper
+ # is useful for other applications like A1. Therefore, we don't
+ # embed that SDLWrapper functionality here so that it can be
+ # instantiated on its own.
def sdl_set(self, ns, key, value, usemsgpack=True):
"""
def stop(self):
"""
- cleans up and stops the xapp.
- Currently this only stops the rmr thread
- This is critical for unit testing as pytest will never return if the thread is running.
+ cleans up and stops the xapp rmr thread (currently). This is
+ critical for unit testing as pytest will never return if the
+ thread is running.
- TODO: can we register a ctrl-c handler so this gets called on ctrl-c? Because currently two ctrl-c are needed to stop
+ TODO: can we register a ctrl-c handler so this gets called on
+ ctrl-c? Because currently two ctrl-c are needed to stop.
"""
self._rmr_loop.stop()
class RMRXapp(_BaseXapp):
"""
- Represents an xapp that is purely driven by rmr messages (i.e., when messages are received, the xapp does something
- When run is called, the xapp framework waits for rmr messages, and calls the client provided consume callback on every one
+ Represents an xapp that is purely driven by RMR messages; i.e., when
+ messages are received, the xapp does something. When run is called,
+ the xapp framework waits for rmr messages, and calls the
+ client-provided consume callback on every one.
"""
- def consume(self, summary, sbuf):
+ def __init__(self, default_handler, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
"""
- This function is to be implemented by the client and is called whenever a new rmr message is received.
- It is expected to take two parameters (besides self):
+ Parameters
+ ----------
+ default_handler: function
+ a function with the signature (summary, sbuf) to be called
+ when a message of type message_type is received.
+ summary: dict
+ the rmr message summary
+ sbuf: ctypes c_void_p
+ Pointer to an rmr message buffer. The user must call free on
+ this when done.
+ post_init: function (optional)
+ optionally runs this function after the app initializes and
+ before the run loop; its signature should be post_init(self)
+
+ For the other parameters, see _BaseXapp
+ """
+ # init base
+ super().__init__(
+ rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl, post_init=post_init
+ )
+
+ # setup callbacks
+ self._default_handler = default_handler
+ self._dispatch = {}
+
+ # used for thread control
+ self._keep_going = True
+
+ # register a default healthcheck handler
+ # this default checks that rmr is working and SDL is working
+ # the user can override this and register their own handler
+ # if they wish since the "last registered callback wins".
+ def handle_healthcheck(self, summary, sbuf):
+ ok = self.healthcheck()
+ payload = b"OK\n" if ok else b"ERROR [RMR or SDL is unhealthy]\n"
+ self.rmr_rts(sbuf, new_payload=payload, new_mtype=RIC_HEALTH_CHECK_RESP)
+ self.rmr_free(sbuf)
+
+ self.register_callback(handle_healthcheck, RIC_HEALTH_CHECK_REQ)
+
+ def register_callback(self, handler, message_type):
+ """
+ registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type
Parameters
----------
+ handler: function
+ a function with the signature (summary, sbuf) to be called
+ when a message of type message_type is received
summary: dict
the rmr message summary
sbuf: ctypes c_void_p
Pointer to an rmr message buffer. The user must call free on this when done.
+
+ message:type: int
+ the message type to look for
+
+ Note if this method is called multiple times for a single message type, the "last one wins".
"""
- self.stop()
- raise NotImplementedError()
+ self._dispatch[message_type] = handler
- def run(self):
+ def run(self, thread=False):
"""
- This function should be called when the client xapp is ready to wait for consume to be called on received messages
+ This function should be called when the client xapp is ready to
+ wait for its handlers to be called on received messages.
- TODO: should we run this in a thread too? We can't currently call "stop" on rmr xapps at an arbitrary time because this doesn't return control
- Running the below in a thread probably makes the most sense.
+ Parameters
+ ----------
+ thread: bool (optional)
+ If True, a thread is started to run the queue read/dispatch loop
+ and execution is returned to caller; the thread can be stopped
+ by calling .stop(). If False (the default), execution is not
+ returned and the framework loops forever.
+ """
+
+ def loop():
+ while self._keep_going:
+ try:
+ (summary, sbuf) = self._rmr_loop.rcv_queue.get(block=True, timeout=5)
+ # dispatch
+ func = self._dispatch.get(summary[rmr.RMR_MS_MSG_TYPE], None)
+ if not func:
+ func = self._default_handler
+ func(self, summary, sbuf)
+ except queue.Empty:
+ # the get timed out
+ pass
+
+ if thread:
+ Thread(target=loop).start()
+ else:
+ loop()
+
+ def stop(self):
"""
- while True:
- if not self._rmr_loop.rcv_queue.empty():
- (summary, sbuf) = self._rmr_loop.rcv_queue.get()
- self.consume(summary, sbuf)
+ Sets the flag to end the dispatch loop.
+ """
+ super().stop()
+ self.logger.debug("Setting flag to end framework work loop.")
+ self._keep_going = False
class Xapp(_BaseXapp):
"""
- Represents an xapp where the client provides a generic function to call, which is mostly likely a loop-forever loop
+ Represents an xapp where the client provides a generic function to
+ call, which is mostly likely a loop-forever loop.
"""
- def entrypoint(self):
+ def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
"""
- This function is to be implemented by the client and is called after post_init
+ Parameters
+ ----------
+ entrypoint: function
+ this function is called when the xapp runs; this is the user code.
+ its signature should be function(self)
+
+ For the other parameters, see _BaseXapp
"""
- self.stop()
- raise NotImplementedError()
+ # init base
+ super().__init__(rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl)
+ self._entrypoint = entrypoint
def run(self):
"""
- This function should be called when the client xapp is ready to start their code
+ This function should be called when the client xapp is ready to
+ start their code.
"""
- self.entrypoint()
+ self._entrypoint(self)
+
+ # there is no need for stop currently here (base has, and nothing
+ # special to do here)