e977e27cb5a7ef8e40cb9294d9b4483db616a1b7
[ric-plt/xapp-frame-py.git] / ricxappframe / xapp_frame.py
1 """
2 Framework for python xapps
3 Framework here means Xapp classes that can be subclassed
4 """
5 # ==================================================================================
6 #       Copyright (c) 2020 Nokia
7 #       Copyright (c) 2020 AT&T Intellectual Property.
8 #
9 #   Licensed under the Apache License, Version 2.0 (the "License");
10 #   you may not use this file except in compliance with the License.
11 #   You may obtain a copy of the License at
12 #
13 #          http://www.apache.org/licenses/LICENSE-2.0
14 #
15 #   Unless required by applicable law or agreed to in writing, software
16 #   distributed under the License is distributed on an "AS IS" BASIS,
17 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 #   See the License for the specific language governing permissions and
19 #   limitations under the License.
20 # ==================================================================================
21 from threading import Thread
22 from ricxappframe import xapp_rmr
23 from ricxappframe.xapp_sdl import SDLWrapper
24 from rmr import rmr
25 from mdclogpy import Logger
26
27
28 # Private base class; not for direct client use
29
30
31 class _BaseXapp:
32     """
33     Base xapp; not for client use directly
34     """
35
36     def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
37         """
38         Init
39
40         Parameters
41         ----------
42         rmr_port: int
43             port to listen on
44
45         rmr_wait_for_ready: bool (optional)
46             if this is True, then init waits until rmr is ready to send, which includes having a valid routing file.
47             this can be set to False if the client only wants to *receive only*
48
49         use_fake_sdl: bool (optional)
50             if this is True, it uses dbaas' "fake dict backend" instead of Redis or other backends.
51             Set this to true when developing your xapp or during unit testing to completely avoid needing a dbaas running or any network at all
52
53         post_init: function (optional)
54             runs this user provided function after the base xapp is initialized
55             it's signature should be post_init(self)
56         """
57         # PUBLIC, can be used by xapps using self.(name):
58         self.logger = Logger(name=__name__)
59
60         # Start rmr rcv thread
61         self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready)
62         self._mrc = self._rmr_loop.mrc  # for convenience
63
64         # SDL
65         self._sdl = SDLWrapper(use_fake_sdl)
66
67         # run the optionally provided user post init
68         if post_init:
69             post_init(self)
70
71     # Public rmr methods
72
73     def rmr_get_messages(self):
74         """
75         returns a generator iterable over all current messages in the queue that have not yet been read by the client xapp
76         """
77         while not self._rmr_loop.rcv_queue.empty():
78             (summary, sbuf) = self._rmr_loop.rcv_queue.get()
79             yield (summary, sbuf)
80
81     def rmr_send(self, payload, mtype, retries=100):
82         """
83         Allocates a buffer, sets payload and mtype, and sends
84
85         Parameters
86         ----------
87         payload: bytes
88             payload to set
89         mtype: int
90             message type
91         retries: int (optional)
92             Number of times to retry at the application level before excepting RMRFailure
93
94         Returns
95         -------
96         bool
97             whether or not the send worked after retries attempts
98         """
99         sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True, mtype=mtype)
100
101         for _ in range(retries):
102             sbuf = rmr.rmr_send_msg(self._mrc, sbuf)
103             if sbuf.contents.state == 0:
104                 self.rmr_free(sbuf)
105                 return True
106
107         self.rmr_free(sbuf)
108         return False
109
110     def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
111         """
112         Allows the xapp to return to sender, possibly adjusting the payload and message type before doing so
113
114         This does NOT free the sbuf for the caller as the caller may wish to perform multiple rts per buffer.
115         The client needs to free.
116
117         Parameters
118         ----------
119         sbuf: ctypes c_void_p
120              Pointer to an rmr message buffer
121         new_payload: bytes (optional)
122             New payload to set
123         new_mtype: int (optional)
124             New message type (replaces the received message)
125         retries: int (optional)
126             Number of times to retry at the application level before excepting RMRFailure
127
128         Returns
129         -------
130         bool
131             whether or not the send worked after retries attempts
132         """
133         for _ in range(retries):
134             sbuf = rmr.rmr_rts_msg(self._mrc, sbuf, payload=new_payload, mtype=new_mtype)
135             if sbuf.contents.state == 0:
136                 return True
137
138         return False
139
140     def rmr_free(self, sbuf):
141         """
142         Free an rmr message buffer after use
143
144         Note: this does not need to be a class method, self is not used. However if we break it out as a function we need a home for it.
145         Parameters
146         ----------
147         sbuf: ctypes c_void_p
148              Pointer to an rmr message buffer
149         """
150         rmr.rmr_free_msg(sbuf)
151
152     # SDL
153     # NOTE, even though these are passthroughs, the seperate SDL wrapper is useful for other applications like A1.
154     # Therefore, we don't embed that SDLWrapper functionality here so that it can be instantiated on it's own.
155
156     def sdl_set(self, ns, key, value, usemsgpack=True):
157         """
158         set a key
159
160         Parameters
161         ----------
162         ns: string
163            the sdl namespace
164         key: string
165             the sdl key
166         value:
167             if usemsgpack is True, value can be anything serializable by msgpack
168             if usemsgpack is False, value must be bytes
169         usemsgpack: boolean (optional)
170             determines whether the value is serialized using msgpack
171         """
172         self._sdl.set(ns, key, value, usemsgpack)
173
174     def sdl_get(self, ns, key, usemsgpack=True):
175         """
176         get a key
177
178         Parameters
179         ----------
180         ns: string
181            the sdl namespace
182         key: string
183             the sdl key
184         usemsgpack: boolean (optional)
185             if usemsgpack is True, the value is deserialized using msgpack
186             if usemsgpack is False, the value is returned as raw bytes
187
188         Returns
189         -------
190         None (if not exist) or see above; depends on usemsgpack
191         """
192         return self._sdl.get(ns, key, usemsgpack)
193
194     def sdl_find_and_get(self, ns, prefix, usemsgpack=True):
195         """
196         get all k v pairs that start with prefix
197
198         Parameters
199         ----------
200         ns: string
201            the sdl namespace
202         key: string
203             the sdl key
204         prefix: string
205             the prefix
206         usemsgpack: boolean (optional)
207             if usemsgpack is True, the value returned is a dict where each value has been deserialized using msgpack
208             if usemsgpack is False, the value returned is as a dict mapping keys to raw bytes
209
210         Returns
211         -------
212         {} (if no keys match) or see above; depends on usemsgpack
213         """
214         return self._sdl.find_and_get(ns, prefix, usemsgpack)
215
216     def sdl_delete(self, ns, key):
217         """
218         delete a key
219
220         Parameters
221         ----------
222         ns: string
223            the sdl namespace
224         key: string
225             the sdl key
226         """
227         self._sdl.delete(ns, key)
228
229     # Health
230
231     def healthcheck(self):
232         """
233         this needs to be understood how this is supposed to work
234         """
235         return self._rmr_loop.healthcheck() and self._sdl.healthcheck()
236
237     def stop(self):
238         """
239         cleans up and stops the xapp rmr thread (currently)
240         This is critical for unit testing as pytest will never return if the thread is running.
241
242         TODO: can we register a ctrl-c handler so this gets called on ctrl-c? Because currently two ctrl-c are needed to stop
243         """
244         self._rmr_loop.stop()
245
246
247 # Public Classes to subclass (these subclass _BaseXapp)
248
249
250 class RMRXapp(_BaseXapp):
251     """
252     Represents an xapp that is purely driven by rmr messages (i.e., when messages are received, the xapp does something
253     When run is called, the xapp framework waits for rmr messages, and calls the client provided consume callback on every one
254     """
255
256     def __init__(self, default_handler, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
257         """
258         Parameters
259         ----------
260         default_handler: function
261             a function with the signature (summary, sbuf) to be called when a message of type message_type is received
262             summary: dict
263                 the rmr message summary
264             sbuf: ctypes c_void_p
265                 Pointer to an rmr message buffer. The user must call free on this when done.
266
267         post_init: function (optional)
268             optionally runs this function after the app initializes and before the run loop
269             it's signature should be post_init(self)
270
271         For the other parameters, see _BaseXapp
272         """
273         # init base
274         super().__init__(
275             rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl, post_init=post_init
276         )
277
278         # setup callbacks
279         self._default_handler = default_handler
280         self._dispatch = {}
281
282         # used for thread control
283         self._keep_going = True
284
285     def register_callback(self, handler, message_type):
286         """
287         registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type
288
289         Parameters
290         ----------
291         handler: function
292             a function with the signature (summary, sbuf) to be called when a message of type message_type is received
293             summary: dict
294                 the rmr message summary
295             sbuf: ctypes c_void_p
296                 Pointer to an rmr message buffer. The user must call free on this when done.
297
298         message:type: int
299             the message type to look for
300
301         Note if this method is called multiple times for a single message type, the "last one wins".
302         """
303         self._dispatch[message_type] = handler
304
305     def run(self, thread=False):
306         """
307         This function should be called when the client xapp is ready to wait for their handlers to be called on received messages
308
309         Parameters
310         ----------
311         thread: bool (optional)
312             if thread is True, execution is returned to caller and the queue read loop is executed in a thread.
313             The thread can be stopped using .stop()
314             if False, execution is not returned and the framework loops
315         """
316
317         def loop():
318             while self._keep_going:
319                 if not self._rmr_loop.rcv_queue.empty():
320                     (summary, sbuf) = self._rmr_loop.rcv_queue.get()
321                     # dispatch
322                     func = self._dispatch.get(summary["message type"], None)
323                     if not func:
324                         func = self._default_handler
325                     func(self, summary, sbuf)
326
327         if thread:
328             Thread(target=loop).start()
329         else:
330             loop()
331
332     def stop(self):
333         """
334         stops the rmr xapp completely.
335         """
336         super().stop()
337         self.logger.debug("Stopping queue reading thread..")
338         self._keep_going = False
339
340
341 class Xapp(_BaseXapp):
342     """
343     Represents an xapp where the client provides a generic function to call, which is mostly likely a loop-forever loop
344     """
345
346     def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
347         """
348         Parameters
349         ----------
350         entrypoint: function
351             this function is called when the xapp runs; this is the user code
352             it's signature should be function(self)
353
354         For the other parameters, see _BaseXapp
355         """
356         # init base
357         super().__init__(rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl)
358         self._entrypoint = entrypoint
359
360     def run(self):
361         """
362         This function should be called when the client xapp is ready to start their code
363         """
364         self._entrypoint(self)
365
366     # there is no need for stop currently here (base has, and nothing special to do here)