2 Framework for python xapps
3 Framework here means Xapp classes that can be subclassed
5 # ==================================================================================
6 # Copyright (c) 2020 Nokia
7 # Copyright (c) 2020 AT&T Intellectual Property.
9 # Licensed under the Apache License, Version 2.0 (the "License");
10 # you may not use this file except in compliance with the License.
11 # You may obtain a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 # ==================================================================================
21 from threading import Thread
22 from ricxappframe import xapp_rmr
23 from ricxappframe.xapp_sdl import SDLWrapper
25 from mdclogpy import Logger
28 # Private base class; not for direct client use
33 Base xapp; not for client use directly
36 def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
45 rmr_wait_for_ready: bool (optional)
46 if this is True, then init waits until rmr is ready to send, which includes having a valid routing file.
47 this can be set to False if the client only wants to *receive only*
49 use_fake_sdl: bool (optional)
50 if this is True, it uses dbaas' "fake dict backend" instead of Redis or other backends.
51 Set this to true when developing your xapp or during unit testing to completely avoid needing a dbaas running or any network at all
53 post_init: function (optional)
54 runs this user provided function after the base xapp is initialized
55 it's signature should be post_init(self)
57 # PUBLIC, can be used by xapps using self.(name):
58 self.logger = Logger(name=__name__)
60 # Start rmr rcv thread
61 self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready)
62 self._mrc = self._rmr_loop.mrc # for convenience
65 self._sdl = SDLWrapper(use_fake_sdl)
67 # run the optionally provided user post init
73 def rmr_get_messages(self):
75 returns a generator iterable over all current messages in the queue that have not yet been read by the client xapp
77 while not self._rmr_loop.rcv_queue.empty():
78 (summary, sbuf) = self._rmr_loop.rcv_queue.get()
81 def rmr_send(self, payload, mtype, retries=100):
83 Allocates a buffer, sets payload and mtype, and sends
91 retries: int (optional)
92 Number of times to retry at the application level before excepting RMRFailure
97 whether or not the send worked after retries attempts
99 sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True, mtype=mtype)
101 for _ in range(retries):
102 sbuf = rmr.rmr_send_msg(self._mrc, sbuf)
103 if sbuf.contents.state == 0:
110 def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
112 Allows the xapp to return to sender, possibly adjusting the payload and message type before doing so
114 This does NOT free the sbuf for the caller as the caller may wish to perform multiple rts per buffer.
115 The client needs to free.
119 sbuf: ctypes c_void_p
120 Pointer to an rmr message buffer
121 new_payload: bytes (optional)
123 new_mtype: int (optional)
124 New message type (replaces the received message)
125 retries: int (optional)
126 Number of times to retry at the application level before excepting RMRFailure
131 whether or not the send worked after retries attempts
133 for _ in range(retries):
134 sbuf = rmr.rmr_rts_msg(self._mrc, sbuf, payload=new_payload, mtype=new_mtype)
135 if sbuf.contents.state == 0:
140 def rmr_free(self, sbuf):
142 Free an rmr message buffer after use
144 Note: this does not need to be a class method, self is not used. However if we break it out as a function we need a home for it.
147 sbuf: ctypes c_void_p
148 Pointer to an rmr message buffer
150 rmr.rmr_free_msg(sbuf)
153 # NOTE, even though these are passthroughs, the seperate SDL wrapper is useful for other applications like A1.
154 # Therefore, we don't embed that SDLWrapper functionality here so that it can be instantiated on it's own.
156 def sdl_set(self, ns, key, value, usemsgpack=True):
167 if usemsgpack is True, value can be anything serializable by msgpack
168 if usemsgpack is False, value must be bytes
169 usemsgpack: boolean (optional)
170 determines whether the value is serialized using msgpack
172 self._sdl.set(ns, key, value, usemsgpack)
174 def sdl_get(self, ns, key, usemsgpack=True):
184 usemsgpack: boolean (optional)
185 if usemsgpack is True, the value is deserialized using msgpack
186 if usemsgpack is False, the value is returned as raw bytes
190 None (if not exist) or see above; depends on usemsgpack
192 return self._sdl.get(ns, key, usemsgpack)
194 def sdl_find_and_get(self, ns, prefix, usemsgpack=True):
196 get all k v pairs that start with prefix
206 usemsgpack: boolean (optional)
207 if usemsgpack is True, the value returned is a dict where each value has been deserialized using msgpack
208 if usemsgpack is False, the value returned is as a dict mapping keys to raw bytes
212 {} (if no keys match) or see above; depends on usemsgpack
214 return self._sdl.find_and_get(ns, prefix, usemsgpack)
216 def sdl_delete(self, ns, key):
227 self._sdl.delete(ns, key)
231 def healthcheck(self):
233 this needs to be understood how this is supposed to work
235 return self._rmr_loop.healthcheck() and self._sdl.healthcheck()
239 cleans up and stops the xapp rmr thread (currently)
240 This is critical for unit testing as pytest will never return if the thread is running.
242 TODO: can we register a ctrl-c handler so this gets called on ctrl-c? Because currently two ctrl-c are needed to stop
244 self._rmr_loop.stop()
247 # Public Classes to subclass (these subclass _BaseXapp)
250 class RMRXapp(_BaseXapp):
252 Represents an xapp that is purely driven by rmr messages (i.e., when messages are received, the xapp does something
253 When run is called, the xapp framework waits for rmr messages, and calls the client provided consume callback on every one
256 def __init__(self, default_handler, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
260 default_handler: function
261 a function with the signature (summary, sbuf) to be called when a message of type message_type is received
263 the rmr message summary
264 sbuf: ctypes c_void_p
265 Pointer to an rmr message buffer. The user must call free on this when done.
267 post_init: function (optional)
268 optionally runs this function after the app initializes and before the run loop
269 it's signature should be post_init(self)
271 For the other parameters, see _BaseXapp
275 rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl, post_init=post_init
279 self._default_handler = default_handler
282 # used for thread control
283 self._keep_going = True
285 def register_callback(self, handler, message_type):
287 registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type
292 a function with the signature (summary, sbuf) to be called when a message of type message_type is received
294 the rmr message summary
295 sbuf: ctypes c_void_p
296 Pointer to an rmr message buffer. The user must call free on this when done.
299 the message type to look for
301 Note if this method is called multiple times for a single message type, the "last one wins".
303 self._dispatch[message_type] = handler
305 def run(self, thread=False):
307 This function should be called when the client xapp is ready to wait for their handlers to be called on received messages
311 thread: bool (optional)
312 if thread is True, execution is returned to caller and the queue read loop is executed in a thread.
313 The thread can be stopped using .stop()
314 if False, execution is not returned and the framework loops
318 while self._keep_going:
319 if not self._rmr_loop.rcv_queue.empty():
320 (summary, sbuf) = self._rmr_loop.rcv_queue.get()
322 func = self._dispatch.get(summary["message type"], None)
324 func = self._default_handler
325 func(self, summary, sbuf)
328 Thread(target=loop).start()
334 stops the rmr xapp completely.
337 self.logger.debug("Stopping queue reading thread..")
338 self._keep_going = False
341 class Xapp(_BaseXapp):
343 Represents an xapp where the client provides a generic function to call, which is mostly likely a loop-forever loop
346 def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
351 this function is called when the xapp runs; this is the user code
352 it's signature should be function(self)
354 For the other parameters, see _BaseXapp
357 super().__init__(rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl)
358 self._entrypoint = entrypoint
362 This function should be called when the client xapp is ready to start their code
364 self._entrypoint(self)
366 # there is no need for stop currently here (base has, and nothing special to do here)