2 Framework for python xapps
3 Framework here means Xapp classes that can be subclassed
5 # ==================================================================================
6 # Copyright (c) 2020 Nokia
7 # Copyright (c) 2020 AT&T Intellectual Property.
9 # Licensed under the Apache License, Version 2.0 (the "License");
10 # you may not use this file except in compliance with the License.
11 # You may obtain a copy of the License at
13 # http://www.apache.org/licenses/LICENSE-2.0
15 # Unless required by applicable law or agreed to in writing, software
16 # distributed under the License is distributed on an "AS IS" BASIS,
17 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 # See the License for the specific language governing permissions and
19 # limitations under the License.
20 # ==================================================================================
23 from ricxappframe import xapp_rmr
24 from ricxappframe.xapp_sdl import SDLWrapper
26 from mdclogpy import Logger
29 mdc_logger = Logger(name=__name__)
32 # Private base class; not for direct client use
37 Base xapp; not for client use directly
40 def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
49 rmr_wait_for_ready: bool (optional)
50 if this is True, then init waits until rmr is ready to send, which includes having a valid routing file.
51 this can be set to False if the client only wants to *receive only*
53 use_fake_sdl: bool (optional)
54 if this is True, it uses dbaas' "fake dict backend" instead of Redis or other backends.
55 Set this to true when developing your xapp or during unit testing to completely avoid needing a dbaas running or any network at all
58 # Start rmr rcv thread
59 self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready)
60 self._mrc = self._rmr_loop.mrc # for convenience
63 self._sdl = SDLWrapper(use_fake_sdl)
65 def rmr_get_messages(self):
67 returns a generator iterable over all current messages in the queue that have not yet been read by the client xapp
69 while not self._rmr_loop.rcv_queue.empty():
70 (summary, sbuf) = self._rmr_loop.rcv_queue.get()
73 def rmr_send(self, payload, mtype, retries=100):
75 Allocates a buffer, sets payload and mtype, and sends
83 retries: int (optional)
84 Number of times to retry at the application level before excepting RMRFailure
89 whether or not the send worked after retries attempts
91 sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True, mtype=mtype)
93 for _ in range(retries):
94 sbuf = rmr.rmr_send_msg(self._mrc, sbuf)
95 if sbuf.contents.state == 0:
102 def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
104 Allows the xapp to return to sender, possibly adjusting the payload and message type before doing so
106 This does NOT free the sbuf for the caller as the caller may wish to perform multiple rts per buffer.
107 The client needs to free.
111 sbuf: ctypes c_void_p
112 Pointer to an rmr message buffer
113 new_payload: bytes (optional)
115 new_mtype: int (optional)
116 New message type (replaces the received message)
117 retries: int (optional)
118 Number of times to retry at the application level before excepting RMRFailure
123 whether or not the send worked after retries attempts
125 for _ in range(retries):
126 sbuf = rmr.rmr_rts_msg(self._mrc, sbuf, payload=new_payload, mtype=new_mtype)
127 if sbuf.contents.state == 0:
132 def rmr_free(self, sbuf):
134 Free an rmr message buffer after use
136 Note: this does not need to be a class method, self is not used. However if we break it out as a function we need a home for it.
139 sbuf: ctypes c_void_p
140 Pointer to an rmr message buffer
142 rmr.rmr_free_msg(sbuf)
145 # NOTE, even though these are passthroughs, the seperate SDL wrapper is useful for other applications like A1.
146 # Therefore, we don't embed that SDLWrapper functionality here so that it can be instantiated on it's own.
148 def sdl_set(self, ns, key, value, usemsgpack=True):
159 if usemsgpack is True, value can be anything serializable by msgpack
160 if usemsgpack is False, value must be bytes
161 usemsgpack: boolean (optional)
162 determines whether the value is serialized using msgpack
164 self._sdl.set(ns, key, value, usemsgpack)
166 def sdl_get(self, ns, key, usemsgpack=True):
176 usemsgpack: boolean (optional)
177 if usemsgpack is True, the value is deserialized using msgpack
178 if usemsgpack is False, the value is returned as raw bytes
182 None (if not exist) or see above; depends on usemsgpack
184 return self._sdl.get(ns, key, usemsgpack)
186 def sdl_find_and_get(self, ns, prefix, usemsgpack=True):
188 get all k v pairs that start with prefix
198 usemsgpack: boolean (optional)
199 if usemsgpack is True, the value returned is a dict where each value has been deserialized using msgpack
200 if usemsgpack is False, the value returned is as a dict mapping keys to raw bytes
204 {} (if no keys match) or see above; depends on usemsgpack
206 return self._sdl.find_and_get(ns, prefix, usemsgpack)
208 def sdl_delete(self, ns, key):
219 self._sdl.delete(ns, key)
223 def healthcheck(self):
225 this needs to be understood how this is supposed to work
227 return self._rmr_loop.healthcheck() and self._sdl.healthcheck()
231 cleans up and stops the xapp.
232 Currently this only stops the rmr thread
233 This is critical for unit testing as pytest will never return if the thread is running.
235 TODO: can we register a ctrl-c handler so this gets called on ctrl-c? Because currently two ctrl-c are needed to stop
237 self._rmr_loop.stop()
240 # Public Classes to subclass (these subclass _BaseXapp)
243 class RMRXapp(_BaseXapp):
245 Represents an xapp that is purely driven by rmr messages (i.e., when messages are received, the xapp does something
246 When run is called, the xapp framework waits for rmr messages, and calls the client provided consume callback on every one
249 def consume(self, summary, sbuf):
251 This function is to be implemented by the client and is called whenever a new rmr message is received.
252 It is expected to take two parameters (besides self):
257 the rmr message summary
258 sbuf: ctypes c_void_p
259 Pointer to an rmr message buffer. The user must call free on this when done.
261 raise NotImplementedError()
265 This function should be called when the client xapp is ready to wait for consume to be called on received messages
267 TODO: should we run this in a thread too? We can't currently call "stop" on rmr xapps at an arbitrary time because this doesn't return control
268 Running the below in a thread probably makes the most sense.
271 if not self._rmr_loop.rcv_queue.empty():
272 (summary, sbuf) = self._rmr_loop.rcv_queue.get()
273 self.consume(summary, sbuf)
276 class Xapp(_BaseXapp):
278 Represents an xapp where the client provides a generic function to call, which is mostly likely a loop-forever loop
283 This function is to be implemented by the client and is called
285 raise NotImplementedError()
289 This function should be called when the client xapp is ready to start their loop
290 This is simple and the client could just call self.loop(), however this gives a consistent interface as the other xapps