1 # ==================================================================================
2 # Copyright (c) 2020 Nokia
3 # Copyright (c) 2020 AT&T Intellectual Property.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ==================================================================================
18 Framework for python xapps
19 Framework here means Xapp classes that can be subclassed
23 from threading import Thread
24 from ricxappframe import xapp_rmr
25 from ricxappframe.rmr import rmr
26 from ricxappframe.xapp_sdl import SDLWrapper
27 from mdclogpy import Logger
30 RIC_HEALTH_CHECK_REQ = 100
31 RIC_HEALTH_CHECK_RESP = 101
34 # Private base class; not for direct client use
39 Base xapp; not for client use directly
42 def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
51 rmr_wait_for_ready: bool (optional)
53 if this is True, then init waits until rmr is ready to send, which
54 includes having a valid routing file. This can be set to
55 False if the client only wants to *receive only*.
57 use_fake_sdl: bool (optional)
58 if this is True, it uses dbaas' "fake dict backend" instead
59 of Redis or other backends. Set this to true when developing
60 your xapp or during unit testing to completely avoid needing
61 a dbaas running or any network at all.
63 post_init: function (optional)
64 runs this user provided function after the base xapp is
65 initialized; its signature should be post_init(self)
67 # PUBLIC, can be used by xapps using self.(name):
68 self.logger = Logger(name=__name__)
70 # Start rmr rcv thread
71 self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready)
72 self._mrc = self._rmr_loop.mrc # for convenience
75 self._sdl = SDLWrapper(use_fake_sdl)
77 # run the optionally provided user post init
83 def rmr_get_messages(self):
85 Returns a generator iterable over all items in the queue that
86 have not yet been read by the client xapp. Each item is a tuple
87 (S, sbuf) where S is a message summary dict and sbuf is the raw
88 message. The caller MUST call rmr.rmr_free_msg(sbuf) when
89 finished with each sbuf to prevent memory leaks!
91 while not self._rmr_loop.rcv_queue.empty():
92 (summary, sbuf) = self._rmr_loop.rcv_queue.get()
95 def rmr_send(self, payload, mtype, retries=100):
97 Allocates a buffer, sets payload and mtype, and sends
105 retries: int (optional)
106 Number of times to retry at the application level before excepting RMRFailure
111 whether or not the send worked after retries attempts
113 sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True, mtype=mtype)
115 for _ in range(retries):
116 sbuf = rmr.rmr_send_msg(self._mrc, sbuf)
117 if sbuf.contents.state == 0:
124 def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
126 Allows the xapp to return to sender, possibly adjusting the
127 payload and message type before doing so. This does NOT free
128 the sbuf for the caller as the caller may wish to perform
129 multiple rts per buffer. The client needs to free.
133 sbuf: ctypes c_void_p
134 Pointer to an rmr message buffer
135 new_payload: bytes (optional)
137 new_mtype: int (optional)
138 New message type (replaces the received message)
139 retries: int (optional)
140 Number of times to retry at the application level before
141 throwing exception RMRFailure
146 whether or not the send worked after retries attempts
148 for _ in range(retries):
149 sbuf = rmr.rmr_rts_msg(self._mrc, sbuf, payload=new_payload, mtype=new_mtype)
150 if sbuf.contents.state == 0:
153 self.logger.info("RTS Failed! Summary: {}".format(rmr.message_summary(sbuf)))
156 def rmr_free(self, sbuf):
158 Frees an rmr message buffer after use
160 Note: this does not need to be a class method, self is not
161 used. However if we break it out as a function we need a home
166 sbuf: ctypes c_void_p
167 Pointer to an rmr message buffer
169 rmr.rmr_free_msg(sbuf)
172 # NOTE, even though these are passthroughs, the seperate SDL wrapper
173 # is useful for other applications like A1. Therefore, we don't
174 # embed that SDLWrapper functionality here so that it can be
175 # instantiated on its own.
177 def sdl_set(self, ns, key, value, usemsgpack=True):
188 if usemsgpack is True, value can be anything serializable by msgpack
189 if usemsgpack is False, value must be bytes
190 usemsgpack: boolean (optional)
191 determines whether the value is serialized using msgpack
193 self._sdl.set(ns, key, value, usemsgpack)
195 def sdl_get(self, ns, key, usemsgpack=True):
205 usemsgpack: boolean (optional)
206 if usemsgpack is True, the value is deserialized using msgpack
207 if usemsgpack is False, the value is returned as raw bytes
211 None (if not exist) or see above; depends on usemsgpack
213 return self._sdl.get(ns, key, usemsgpack)
215 def sdl_find_and_get(self, ns, prefix, usemsgpack=True):
217 get all k v pairs that start with prefix
227 usemsgpack: boolean (optional)
228 if usemsgpack is True, the value returned is a dict where each value has been deserialized using msgpack
229 if usemsgpack is False, the value returned is as a dict mapping keys to raw bytes
233 {} (if no keys match) or see above; depends on usemsgpack
235 return self._sdl.find_and_get(ns, prefix, usemsgpack)
237 def sdl_delete(self, ns, key):
248 self._sdl.delete(ns, key)
252 def healthcheck(self):
254 this needs to be understood how this is supposed to work
256 return self._rmr_loop.healthcheck() and self._sdl.healthcheck()
260 cleans up and stops the xapp rmr thread (currently). This is
261 critical for unit testing as pytest will never return if the
264 TODO: can we register a ctrl-c handler so this gets called on
265 ctrl-c? Because currently two ctrl-c are needed to stop.
267 self._rmr_loop.stop()
270 # Public Classes to subclass (these subclass _BaseXapp)
273 class RMRXapp(_BaseXapp):
275 Represents an xapp that is purely driven by RMR messages; i.e., when
276 messages are received, the xapp does something. When run is called,
277 the xapp framework waits for rmr messages, and calls the
278 client-provided consume callback on every one.
281 def __init__(self, default_handler, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
285 default_handler: function
286 a function with the signature (summary, sbuf) to be called
287 when a message of type message_type is received.
289 the rmr message summary
290 sbuf: ctypes c_void_p
291 Pointer to an rmr message buffer. The user must call free on
293 post_init: function (optional)
294 optionally runs this function after the app initializes and
295 before the run loop; its signature should be post_init(self)
297 For the other parameters, see _BaseXapp
301 rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl, post_init=post_init
305 self._default_handler = default_handler
308 # used for thread control
309 self._keep_going = True
311 # register a default healthcheck handler
312 # this default checks that rmr is working and SDL is working
313 # the user can override this and register their own handler
314 # if they wish since the "last registered callback wins".
315 def handle_healthcheck(self, summary, sbuf):
316 ok = self.healthcheck()
317 payload = b"OK\n" if ok else b"ERROR [RMR or SDL is unhealthy]\n"
318 self.rmr_rts(sbuf, new_payload=payload, new_mtype=RIC_HEALTH_CHECK_RESP)
321 self.register_callback(handle_healthcheck, RIC_HEALTH_CHECK_REQ)
323 def register_callback(self, handler, message_type):
325 registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type
330 a function with the signature (summary, sbuf) to be called
331 when a message of type message_type is received
333 the rmr message summary
334 sbuf: ctypes c_void_p
335 Pointer to an rmr message buffer. The user must call free on this when done.
338 the message type to look for
340 Note if this method is called multiple times for a single message type, the "last one wins".
342 self._dispatch[message_type] = handler
344 def run(self, thread=False):
346 This function should be called when the client xapp is ready to
347 wait for its handlers to be called on received messages.
351 thread: bool (optional)
352 If True, a thread is started to run the queue read/dispatch loop
353 and execution is returned to caller; the thread can be stopped
354 by calling .stop(). If False (the default), execution is not
355 returned and the framework loops forever.
359 while self._keep_going:
361 (summary, sbuf) = self._rmr_loop.rcv_queue.get(block=True, timeout=5)
363 func = self._dispatch.get(summary[rmr.RMR_MS_MSG_TYPE], None)
365 func = self._default_handler
366 func(self, summary, sbuf)
372 Thread(target=loop).start()
378 Sets the flag to end the dispatch loop.
381 self.logger.debug("Setting flag to end framework work loop.")
382 self._keep_going = False
385 class Xapp(_BaseXapp):
387 Represents an xapp where the client provides a generic function to
388 call, which is mostly likely a loop-forever loop.
391 def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
396 this function is called when the xapp runs; this is the user code.
397 its signature should be function(self)
399 For the other parameters, see _BaseXapp
402 super().__init__(rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl)
403 self._entrypoint = entrypoint
407 This function should be called when the client xapp is ready to
410 self._entrypoint(self)
412 # there is no need for stop currently here (base has, and nothing
413 # special to do here)