2 Framework for python xapps
3 Framework here means Xapp classes that can be subclassed
5 # ==================================================================================
6 # Copyright (c) 2020 Nokia
7 # Copyright (c) 2020 AT&T Intellectual Property.
9 # Licensed under the Apache License, Version 2.0 (the "License");
10 # you may not use this file except in compliance with the License.
11 # You may obtain a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 # ==================================================================================
21 from threading import Thread
22 from ricxappframe import xapp_rmr
23 from ricxappframe.xapp_sdl import SDLWrapper
24 from ricxappframe.rmr import rmr
25 from mdclogpy import Logger
28 RIC_HEALTH_CHECK_REQ = 100
29 RIC_HEALTH_CHECK_RESP = 101
32 # Private base class; not for direct client use
37 Base xapp; not for client use directly
40 def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
49 rmr_wait_for_ready: bool (optional)
50 if this is True, then init waits until rmr is ready to send, which includes having a valid routing file.
51 this can be set to False if the client only wants to *receive only*
53 use_fake_sdl: bool (optional)
54 if this is True, it uses dbaas' "fake dict backend" instead of Redis or other backends.
55 Set this to true when developing your xapp or during unit testing to completely avoid needing a dbaas running or any network at all
57 post_init: function (optional)
58 runs this user provided function after the base xapp is initialized
59 it's signature should be post_init(self)
61 # PUBLIC, can be used by xapps using self.(name):
62 self.logger = Logger(name=__name__)
64 # Start rmr rcv thread
65 self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready)
66 self._mrc = self._rmr_loop.mrc # for convenience
69 self._sdl = SDLWrapper(use_fake_sdl)
71 # run the optionally provided user post init
77 def rmr_get_messages(self):
79 Returns a generator iterable over all items in the queue that have not yet been read by the client xapp.
80 Each item is a tuple (S, sbuf) where S is a message summary dict and sbuf is the raw message.
81 The caller MUST call rmr.rmr_free_msg(sbuf) when finished with each sbuf to prevent memory leaks!
83 while not self._rmr_loop.rcv_queue.empty():
84 (summary, sbuf) = self._rmr_loop.rcv_queue.get()
87 def rmr_send(self, payload, mtype, retries=100):
89 Allocates a buffer, sets payload and mtype, and sends
97 retries: int (optional)
98 Number of times to retry at the application level before excepting RMRFailure
103 whether or not the send worked after retries attempts
105 sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True, mtype=mtype)
107 for _ in range(retries):
108 sbuf = rmr.rmr_send_msg(self._mrc, sbuf)
109 if sbuf.contents.state == 0:
116 def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
118 Allows the xapp to return to sender, possibly adjusting the payload and message type before doing so
120 This does NOT free the sbuf for the caller as the caller may wish to perform multiple rts per buffer.
121 The client needs to free.
125 sbuf: ctypes c_void_p
126 Pointer to an rmr message buffer
127 new_payload: bytes (optional)
129 new_mtype: int (optional)
130 New message type (replaces the received message)
131 retries: int (optional)
132 Number of times to retry at the application level before excepting RMRFailure
137 whether or not the send worked after retries attempts
139 for _ in range(retries):
140 sbuf = rmr.rmr_rts_msg(self._mrc, sbuf, payload=new_payload, mtype=new_mtype)
141 if sbuf.contents.state == 0:
144 self.logger.info("RTS Failed! Summary: {}".format(rmr.message_summary(sbuf)))
147 def rmr_free(self, sbuf):
149 Free an rmr message buffer after use
151 Note: this does not need to be a class method, self is not used. However if we break it out as a function we need a home for it.
154 sbuf: ctypes c_void_p
155 Pointer to an rmr message buffer
157 rmr.rmr_free_msg(sbuf)
160 # NOTE, even though these are passthroughs, the seperate SDL wrapper is useful for other applications like A1.
161 # Therefore, we don't embed that SDLWrapper functionality here so that it can be instantiated on it's own.
163 def sdl_set(self, ns, key, value, usemsgpack=True):
174 if usemsgpack is True, value can be anything serializable by msgpack
175 if usemsgpack is False, value must be bytes
176 usemsgpack: boolean (optional)
177 determines whether the value is serialized using msgpack
179 self._sdl.set(ns, key, value, usemsgpack)
181 def sdl_get(self, ns, key, usemsgpack=True):
191 usemsgpack: boolean (optional)
192 if usemsgpack is True, the value is deserialized using msgpack
193 if usemsgpack is False, the value is returned as raw bytes
197 None (if not exist) or see above; depends on usemsgpack
199 return self._sdl.get(ns, key, usemsgpack)
201 def sdl_find_and_get(self, ns, prefix, usemsgpack=True):
203 get all k v pairs that start with prefix
213 usemsgpack: boolean (optional)
214 if usemsgpack is True, the value returned is a dict where each value has been deserialized using msgpack
215 if usemsgpack is False, the value returned is as a dict mapping keys to raw bytes
219 {} (if no keys match) or see above; depends on usemsgpack
221 return self._sdl.find_and_get(ns, prefix, usemsgpack)
223 def sdl_delete(self, ns, key):
234 self._sdl.delete(ns, key)
238 def healthcheck(self):
240 this needs to be understood how this is supposed to work
242 return self._rmr_loop.healthcheck() and self._sdl.healthcheck()
246 cleans up and stops the xapp rmr thread (currently)
247 This is critical for unit testing as pytest will never return if the thread is running.
249 TODO: can we register a ctrl-c handler so this gets called on ctrl-c? Because currently two ctrl-c are needed to stop
251 self._rmr_loop.stop()
254 # Public Classes to subclass (these subclass _BaseXapp)
257 class RMRXapp(_BaseXapp):
259 Represents an xapp that is purely driven by rmr messages (i.e., when messages are received, the xapp does something
260 When run is called, the xapp framework waits for rmr messages, and calls the client provided consume callback on every one
263 def __init__(self, default_handler, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
267 default_handler: function
268 a function with the signature (summary, sbuf) to be called when a message of type message_type is received
270 the rmr message summary
271 sbuf: ctypes c_void_p
272 Pointer to an rmr message buffer. The user must call free on this when done.
274 post_init: function (optional)
275 optionally runs this function after the app initializes and before the run loop
276 it's signature should be post_init(self)
278 For the other parameters, see _BaseXapp
282 rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl, post_init=post_init
286 self._default_handler = default_handler
289 # used for thread control
290 self._keep_going = True
292 # register a default healthcheck handler
293 # this default checks that rmr is working and SDL is working
294 # the user can override this and register their own handler if they wish since the "last registered callback wins".
295 def handle_healthcheck(self, summary, sbuf):
296 ok = self.healthcheck()
297 payload = b"OK\n" if ok else b"ERROR [RMR or SDL is unhealthy]\n"
298 self.rmr_rts(sbuf, new_payload=payload, new_mtype=RIC_HEALTH_CHECK_RESP)
301 self.register_callback(handle_healthcheck, RIC_HEALTH_CHECK_REQ)
303 def register_callback(self, handler, message_type):
305 registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type
310 a function with the signature (summary, sbuf) to be called when a message of type message_type is received
312 the rmr message summary
313 sbuf: ctypes c_void_p
314 Pointer to an rmr message buffer. The user must call free on this when done.
317 the message type to look for
319 Note if this method is called multiple times for a single message type, the "last one wins".
321 self._dispatch[message_type] = handler
323 def run(self, thread=False):
325 This function should be called when the client xapp is ready to wait for their handlers to be called on received messages
329 thread: bool (optional)
330 if thread is True, execution is returned to caller and the queue read loop is executed in a thread.
331 The thread can be stopped using .stop()
332 if False, execution is not returned and the framework loops
336 while self._keep_going:
337 if not self._rmr_loop.rcv_queue.empty():
338 (summary, sbuf) = self._rmr_loop.rcv_queue.get()
340 func = self._dispatch.get(summary["message type"], None)
342 func = self._default_handler
343 func(self, summary, sbuf)
346 Thread(target=loop).start()
352 stops the rmr xapp completely.
355 self.logger.debug("Stopping queue reading thread..")
356 self._keep_going = False
359 class Xapp(_BaseXapp):
361 Represents an xapp where the client provides a generic function to call, which is mostly likely a loop-forever loop
364 def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
369 this function is called when the xapp runs; this is the user code
370 it's signature should be function(self)
372 For the other parameters, see _BaseXapp
375 super().__init__(rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl)
376 self._entrypoint = entrypoint
380 This function should be called when the client xapp is ready to start their code
382 self._entrypoint(self)
384 # there is no need for stop currently here (base has, and nothing special to do here)