2 Framework for python xapps
3 Framework here means Xapp classes that can be subclassed
5 # ==================================================================================
6 # Copyright (c) 2020 Nokia
7 # Copyright (c) 2020 AT&T Intellectual Property.
9 # Licensed under the Apache License, Version 2.0 (the "License");
10 # you may not use this file except in compliance with the License.
11 # You may obtain a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 # ==================================================================================
21 from threading import Thread
22 from ricxappframe import xapp_rmr
23 from ricxappframe.xapp_sdl import SDLWrapper
25 from mdclogpy import Logger
28 mdc_logger = Logger(name=__name__)
31 # Private base class; not for direct client use
36 Base xapp; not for client use directly
39 def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
48 rmr_wait_for_ready: bool (optional)
49 if this is True, then init waits until rmr is ready to send, which includes having a valid routing file.
50 this can be set to False if the client only wants to *receive only*
52 use_fake_sdl: bool (optional)
53 if this is True, it uses dbaas' "fake dict backend" instead of Redis or other backends.
54 Set this to true when developing your xapp or during unit testing to completely avoid needing a dbaas running or any network at all
56 post_init: function (optional)
57 runs this user provided function after the base xapp is initialized
58 it's signature should be post_init(self)
61 # Start rmr rcv thread
62 self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready)
63 self._mrc = self._rmr_loop.mrc # for convenience
66 self._sdl = SDLWrapper(use_fake_sdl)
68 # run the optionally provided user post init
74 def rmr_get_messages(self):
76 returns a generator iterable over all current messages in the queue that have not yet been read by the client xapp
78 while not self._rmr_loop.rcv_queue.empty():
79 (summary, sbuf) = self._rmr_loop.rcv_queue.get()
82 def rmr_send(self, payload, mtype, retries=100):
84 Allocates a buffer, sets payload and mtype, and sends
92 retries: int (optional)
93 Number of times to retry at the application level before excepting RMRFailure
98 whether or not the send worked after retries attempts
100 sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True, mtype=mtype)
102 for _ in range(retries):
103 sbuf = rmr.rmr_send_msg(self._mrc, sbuf)
104 if sbuf.contents.state == 0:
111 def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
113 Allows the xapp to return to sender, possibly adjusting the payload and message type before doing so
115 This does NOT free the sbuf for the caller as the caller may wish to perform multiple rts per buffer.
116 The client needs to free.
120 sbuf: ctypes c_void_p
121 Pointer to an rmr message buffer
122 new_payload: bytes (optional)
124 new_mtype: int (optional)
125 New message type (replaces the received message)
126 retries: int (optional)
127 Number of times to retry at the application level before excepting RMRFailure
132 whether or not the send worked after retries attempts
134 for _ in range(retries):
135 sbuf = rmr.rmr_rts_msg(self._mrc, sbuf, payload=new_payload, mtype=new_mtype)
136 if sbuf.contents.state == 0:
141 def rmr_free(self, sbuf):
143 Free an rmr message buffer after use
145 Note: this does not need to be a class method, self is not used. However if we break it out as a function we need a home for it.
148 sbuf: ctypes c_void_p
149 Pointer to an rmr message buffer
151 rmr.rmr_free_msg(sbuf)
154 # NOTE, even though these are passthroughs, the seperate SDL wrapper is useful for other applications like A1.
155 # Therefore, we don't embed that SDLWrapper functionality here so that it can be instantiated on it's own.
157 def sdl_set(self, ns, key, value, usemsgpack=True):
168 if usemsgpack is True, value can be anything serializable by msgpack
169 if usemsgpack is False, value must be bytes
170 usemsgpack: boolean (optional)
171 determines whether the value is serialized using msgpack
173 self._sdl.set(ns, key, value, usemsgpack)
175 def sdl_get(self, ns, key, usemsgpack=True):
185 usemsgpack: boolean (optional)
186 if usemsgpack is True, the value is deserialized using msgpack
187 if usemsgpack is False, the value is returned as raw bytes
191 None (if not exist) or see above; depends on usemsgpack
193 return self._sdl.get(ns, key, usemsgpack)
195 def sdl_find_and_get(self, ns, prefix, usemsgpack=True):
197 get all k v pairs that start with prefix
207 usemsgpack: boolean (optional)
208 if usemsgpack is True, the value returned is a dict where each value has been deserialized using msgpack
209 if usemsgpack is False, the value returned is as a dict mapping keys to raw bytes
213 {} (if no keys match) or see above; depends on usemsgpack
215 return self._sdl.find_and_get(ns, prefix, usemsgpack)
217 def sdl_delete(self, ns, key):
228 self._sdl.delete(ns, key)
232 def healthcheck(self):
234 this needs to be understood how this is supposed to work
236 return self._rmr_loop.healthcheck() and self._sdl.healthcheck()
240 cleans up and stops the xapp rmr thread (currently)
241 This is critical for unit testing as pytest will never return if the thread is running.
243 TODO: can we register a ctrl-c handler so this gets called on ctrl-c? Because currently two ctrl-c are needed to stop
245 self._rmr_loop.stop()
248 # Public Classes to subclass (these subclass _BaseXapp)
251 class RMRXapp(_BaseXapp):
253 Represents an xapp that is purely driven by rmr messages (i.e., when messages are received, the xapp does something
254 When run is called, the xapp framework waits for rmr messages, and calls the client provided consume callback on every one
257 def __init__(self, default_handler, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
261 default_handler: function
262 a function with the signature (summary, sbuf) to be called when a message of type message_type is received
264 the rmr message summary
265 sbuf: ctypes c_void_p
266 Pointer to an rmr message buffer. The user must call free on this when done.
268 post_init: function (optional)
269 optionally runs this function after the app initializes and before the run loop
270 it's signature should be post_init(self)
272 For the other parameters, see _BaseXapp
276 rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl, post_init=post_init
280 self._default_handler = default_handler
283 # used for thread control
284 self._keep_going = True
286 def register_callback(self, handler, message_type):
288 registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type
293 a function with the signature (summary, sbuf) to be called when a message of type message_type is received
295 the rmr message summary
296 sbuf: ctypes c_void_p
297 Pointer to an rmr message buffer. The user must call free on this when done.
300 the message type to look for
302 Note if this method is called multiple times for a single message type, the "last one wins".
304 self._dispatch[message_type] = handler
308 This function should be called when the client xapp is ready to wait for their handlers to be called on received messages
310 execution is returned to caller
314 while self._keep_going:
315 if not self._rmr_loop.rcv_queue.empty():
316 (summary, sbuf) = self._rmr_loop.rcv_queue.get()
318 func = self._dispatch.get(summary["message type"], None)
320 func = self._default_handler
321 func(self, summary, sbuf)
323 Thread(target=loop).start()
327 stops the rmr xapp completely.
330 mdc_logger.debug("Stopping queue reading thread..")
331 self._keep_going = False
334 class Xapp(_BaseXapp):
336 Represents an xapp where the client provides a generic function to call, which is mostly likely a loop-forever loop
339 def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
344 this function is called when the xapp runs; this is the user code
345 it's signature should be function(self)
347 For the other parameters, see _BaseXapp
350 super().__init__(rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl)
351 self._entrypoint = entrypoint
355 This function should be called when the client xapp is ready to start their code
357 self._entrypoint(self)
359 # there is no need for stop currently here (base has, and nothing special to do here)