6b70bb6a944e38f5f9e16c657597ee8b2ea7c75e
[ric-plt/xapp-frame-py.git] / ricxappframe / xapp_frame.py
1 """
2 Framework for python xapps
3 Framework here means Xapp classes that can be subclassed
4 """
5 # ==================================================================================
6 #       Copyright (c) 2020 Nokia
7 #       Copyright (c) 2020 AT&T Intellectual Property.
8 #
9 #   Licensed under the Apache License, Version 2.0 (the "License");
10 #   you may not use this file except in compliance with the License.
11 #   You may obtain a copy of the License at
12 #
13 #          http://www.apache.org/licenses/LICENSE-2.0
14 #
15 #   Unless required by applicable law or agreed to in writing, software
16 #   distributed under the License is distributed on an "AS IS" BASIS,
17 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 #   See the License for the specific language governing permissions and
19 #   limitations under the License.
20 # ==================================================================================
21 from threading import Thread
22 from ricxappframe import xapp_rmr
23 from ricxappframe.xapp_sdl import SDLWrapper
24 from rmr import rmr
25 from mdclogpy import Logger
26
27
28 mdc_logger = Logger(name=__name__)
29
30
31 # Private base class; not for direct client use
32
33
34 class _BaseXapp:
35     """
36     Base xapp; not for client use directly
37     """
38
39     def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
40         """
41         Init
42
43         Parameters
44         ----------
45         rmr_port: int
46             port to listen on
47
48         rmr_wait_for_ready: bool (optional)
49             if this is True, then init waits until rmr is ready to send, which includes having a valid routing file.
50             this can be set to False if the client only wants to *receive only*
51
52         use_fake_sdl: bool (optional)
53             if this is True, it uses dbaas' "fake dict backend" instead of Redis or other backends.
54             Set this to true when developing your xapp or during unit testing to completely avoid needing a dbaas running or any network at all
55
56         post_init: function (optional)
57             runs this user provided function after the base xapp is initialized
58             it's signature should be post_init(self)
59         """
60
61         # Start rmr rcv thread
62         self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready)
63         self._mrc = self._rmr_loop.mrc  # for convenience
64
65         # SDL
66         self._sdl = SDLWrapper(use_fake_sdl)
67
68         # run the optionally provided user post init
69         if post_init:
70             post_init(self)
71
72     # Public rmr methods
73
74     def rmr_get_messages(self):
75         """
76         returns a generator iterable over all current messages in the queue that have not yet been read by the client xapp
77         """
78         while not self._rmr_loop.rcv_queue.empty():
79             (summary, sbuf) = self._rmr_loop.rcv_queue.get()
80             yield (summary, sbuf)
81
82     def rmr_send(self, payload, mtype, retries=100):
83         """
84         Allocates a buffer, sets payload and mtype, and sends
85
86         Parameters
87         ----------
88         payload: bytes
89             payload to set
90         mtype: int
91             message type
92         retries: int (optional)
93             Number of times to retry at the application level before excepting RMRFailure
94
95         Returns
96         -------
97         bool
98             whether or not the send worked after retries attempts
99         """
100         sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True, mtype=mtype)
101
102         for _ in range(retries):
103             sbuf = rmr.rmr_send_msg(self._mrc, sbuf)
104             if sbuf.contents.state == 0:
105                 self.rmr_free(sbuf)
106                 return True
107
108         self.rmr_free(sbuf)
109         return False
110
111     def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
112         """
113         Allows the xapp to return to sender, possibly adjusting the payload and message type before doing so
114
115         This does NOT free the sbuf for the caller as the caller may wish to perform multiple rts per buffer.
116         The client needs to free.
117
118         Parameters
119         ----------
120         sbuf: ctypes c_void_p
121              Pointer to an rmr message buffer
122         new_payload: bytes (optional)
123             New payload to set
124         new_mtype: int (optional)
125             New message type (replaces the received message)
126         retries: int (optional)
127             Number of times to retry at the application level before excepting RMRFailure
128
129         Returns
130         -------
131         bool
132             whether or not the send worked after retries attempts
133         """
134         for _ in range(retries):
135             sbuf = rmr.rmr_rts_msg(self._mrc, sbuf, payload=new_payload, mtype=new_mtype)
136             if sbuf.contents.state == 0:
137                 return True
138
139         return False
140
141     def rmr_free(self, sbuf):
142         """
143         Free an rmr message buffer after use
144
145         Note: this does not need to be a class method, self is not used. However if we break it out as a function we need a home for it.
146         Parameters
147         ----------
148         sbuf: ctypes c_void_p
149              Pointer to an rmr message buffer
150         """
151         rmr.rmr_free_msg(sbuf)
152
153     # SDL
154     # NOTE, even though these are passthroughs, the seperate SDL wrapper is useful for other applications like A1.
155     # Therefore, we don't embed that SDLWrapper functionality here so that it can be instantiated on it's own.
156
157     def sdl_set(self, ns, key, value, usemsgpack=True):
158         """
159         set a key
160
161         Parameters
162         ----------
163         ns: string
164            the sdl namespace
165         key: string
166             the sdl key
167         value:
168             if usemsgpack is True, value can be anything serializable by msgpack
169             if usemsgpack is False, value must be bytes
170         usemsgpack: boolean (optional)
171             determines whether the value is serialized using msgpack
172         """
173         self._sdl.set(ns, key, value, usemsgpack)
174
175     def sdl_get(self, ns, key, usemsgpack=True):
176         """
177         get a key
178
179         Parameters
180         ----------
181         ns: string
182            the sdl namespace
183         key: string
184             the sdl key
185         usemsgpack: boolean (optional)
186             if usemsgpack is True, the value is deserialized using msgpack
187             if usemsgpack is False, the value is returned as raw bytes
188
189         Returns
190         -------
191         None (if not exist) or see above; depends on usemsgpack
192         """
193         return self._sdl.get(ns, key, usemsgpack)
194
195     def sdl_find_and_get(self, ns, prefix, usemsgpack=True):
196         """
197         get all k v pairs that start with prefix
198
199         Parameters
200         ----------
201         ns: string
202            the sdl namespace
203         key: string
204             the sdl key
205         prefix: string
206             the prefix
207         usemsgpack: boolean (optional)
208             if usemsgpack is True, the value returned is a dict where each value has been deserialized using msgpack
209             if usemsgpack is False, the value returned is as a dict mapping keys to raw bytes
210
211         Returns
212         -------
213         {} (if no keys match) or see above; depends on usemsgpack
214         """
215         return self._sdl.find_and_get(ns, prefix, usemsgpack)
216
217     def sdl_delete(self, ns, key):
218         """
219         delete a key
220
221         Parameters
222         ----------
223         ns: string
224            the sdl namespace
225         key: string
226             the sdl key
227         """
228         self._sdl.delete(ns, key)
229
230     # Health
231
232     def healthcheck(self):
233         """
234         this needs to be understood how this is supposed to work
235         """
236         return self._rmr_loop.healthcheck() and self._sdl.healthcheck()
237
238     def stop(self):
239         """
240         cleans up and stops the xapp rmr thread (currently)
241         This is critical for unit testing as pytest will never return if the thread is running.
242
243         TODO: can we register a ctrl-c handler so this gets called on ctrl-c? Because currently two ctrl-c are needed to stop
244         """
245         self._rmr_loop.stop()
246
247
248 # Public Classes to subclass (these subclass _BaseXapp)
249
250
251 class RMRXapp(_BaseXapp):
252     """
253     Represents an xapp that is purely driven by rmr messages (i.e., when messages are received, the xapp does something
254     When run is called, the xapp framework waits for rmr messages, and calls the client provided consume callback on every one
255     """
256
257     def __init__(self, default_handler, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
258         """
259         Parameters
260         ----------
261         default_handler: function
262             a function with the signature (summary, sbuf) to be called when a message of type message_type is received
263             summary: dict
264                 the rmr message summary
265             sbuf: ctypes c_void_p
266                 Pointer to an rmr message buffer. The user must call free on this when done.
267
268         post_init: function (optional)
269             optionally runs this function after the app initializes and before the run loop
270             it's signature should be post_init(self)
271
272         For the other parameters, see _BaseXapp
273         """
274         # init base
275         super().__init__(
276             rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl, post_init=post_init
277         )
278
279         # setup callbacks
280         self._default_handler = default_handler
281         self._dispatch = {}
282
283         # used for thread control
284         self._keep_going = True
285
286     def register_callback(self, handler, message_type):
287         """
288         registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type
289
290         Parameters
291         ----------
292         handler: function
293             a function with the signature (summary, sbuf) to be called when a message of type message_type is received
294             summary: dict
295                 the rmr message summary
296             sbuf: ctypes c_void_p
297                 Pointer to an rmr message buffer. The user must call free on this when done.
298
299         message:type: int
300             the message type to look for
301
302         Note if this method is called multiple times for a single message type, the "last one wins".
303         """
304         self._dispatch[message_type] = handler
305
306     def run(self):
307         """
308         This function should be called when the client xapp is ready to wait for their handlers to be called on received messages
309
310         execution is returned to caller
311         """
312
313         def loop():
314             while self._keep_going:
315                 if not self._rmr_loop.rcv_queue.empty():
316                     (summary, sbuf) = self._rmr_loop.rcv_queue.get()
317                     # dispatch
318                     func = self._dispatch.get(summary["message type"], None)
319                     if not func:
320                         func = self._default_handler
321                     func(self, summary, sbuf)
322
323         Thread(target=loop).start()
324
325     def stop(self):
326         """
327         stops the rmr xapp completely.
328         """
329         super().stop()
330         mdc_logger.debug("Stopping queue reading thread..")
331         self._keep_going = False
332
333
334 class Xapp(_BaseXapp):
335     """
336     Represents an xapp where the client provides a generic function to call, which is mostly likely a loop-forever loop
337     """
338
339     def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
340         """
341         Parameters
342         ----------
343         entrypoint: function
344             this function is called when the xapp runs; this is the user code
345             it's signature should be function(self)
346
347         For the other parameters, see _BaseXapp
348         """
349         # init base
350         super().__init__(rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl)
351         self._entrypoint = entrypoint
352
353     def run(self):
354         """
355         This function should be called when the client xapp is ready to start their code
356         """
357         self._entrypoint(self)
358
359     # there is no need for stop currently here (base has, and nothing special to do here)