Define message-summary dict keys as constants
[ric-plt/xapp-frame-py.git] / ricxappframe / xapp_frame.py
1 # ==================================================================================
2 #       Copyright (c) 2020 Nokia
3 #       Copyright (c) 2020 AT&T Intellectual Property.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #          http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 # ==================================================================================
17 """
18 Framework for python xapps
19 Framework here means Xapp classes that can be subclassed
20 """
21
22 from threading import Thread
23 from ricxappframe import xapp_rmr
24 from ricxappframe.rmr import rmr
25 from ricxappframe.xapp_sdl import SDLWrapper
26 from mdclogpy import Logger
27
28 # constants
29 RIC_HEALTH_CHECK_REQ = 100
30 RIC_HEALTH_CHECK_RESP = 101
31
32
33 # Private base class; not for direct client use
34
35
36 class _BaseXapp:
37     """
38     Base xapp; not for client use directly
39     """
40
41     def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
42         """
43         Init
44
45         Parameters
46         ----------
47         rmr_port: int
48             port to listen on
49
50         rmr_wait_for_ready: bool (optional)
51             if this is True, then init waits until rmr is ready to send, which includes having a valid routing file.
52             this can be set to False if the client only wants to *receive only*
53
54         use_fake_sdl: bool (optional)
55             if this is True, it uses dbaas' "fake dict backend" instead of Redis or other backends.
56             Set this to true when developing your xapp or during unit testing to completely avoid needing a dbaas running or any network at all
57
58         post_init: function (optional)
59             runs this user provided function after the base xapp is initialized
60             it's signature should be post_init(self)
61         """
62         # PUBLIC, can be used by xapps using self.(name):
63         self.logger = Logger(name=__name__)
64
65         # Start rmr rcv thread
66         self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready)
67         self._mrc = self._rmr_loop.mrc  # for convenience
68
69         # SDL
70         self._sdl = SDLWrapper(use_fake_sdl)
71
72         # run the optionally provided user post init
73         if post_init:
74             post_init(self)
75
76     # Public rmr methods
77
78     def rmr_get_messages(self):
79         """
80         Returns a generator iterable over all items in the queue that have not yet been read by the client xapp.
81         Each item is a tuple (S, sbuf) where S is a message summary dict and sbuf is the raw message.
82         The caller MUST call rmr.rmr_free_msg(sbuf) when finished with each sbuf to prevent memory leaks!
83         """
84         while not self._rmr_loop.rcv_queue.empty():
85             (summary, sbuf) = self._rmr_loop.rcv_queue.get()
86             yield (summary, sbuf)
87
88     def rmr_send(self, payload, mtype, retries=100):
89         """
90         Allocates a buffer, sets payload and mtype, and sends
91
92         Parameters
93         ----------
94         payload: bytes
95             payload to set
96         mtype: int
97             message type
98         retries: int (optional)
99             Number of times to retry at the application level before excepting RMRFailure
100
101         Returns
102         -------
103         bool
104             whether or not the send worked after retries attempts
105         """
106         sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True, mtype=mtype)
107
108         for _ in range(retries):
109             sbuf = rmr.rmr_send_msg(self._mrc, sbuf)
110             if sbuf.contents.state == 0:
111                 self.rmr_free(sbuf)
112                 return True
113
114         self.rmr_free(sbuf)
115         return False
116
117     def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
118         """
119         Allows the xapp to return to sender, possibly adjusting the payload and message type before doing so
120
121         This does NOT free the sbuf for the caller as the caller may wish to perform multiple rts per buffer.
122         The client needs to free.
123
124         Parameters
125         ----------
126         sbuf: ctypes c_void_p
127              Pointer to an rmr message buffer
128         new_payload: bytes (optional)
129             New payload to set
130         new_mtype: int (optional)
131             New message type (replaces the received message)
132         retries: int (optional)
133             Number of times to retry at the application level before excepting RMRFailure
134
135         Returns
136         -------
137         bool
138             whether or not the send worked after retries attempts
139         """
140         for _ in range(retries):
141             sbuf = rmr.rmr_rts_msg(self._mrc, sbuf, payload=new_payload, mtype=new_mtype)
142             if sbuf.contents.state == 0:
143                 return True
144
145         self.logger.info("RTS Failed! Summary: {}".format(rmr.message_summary(sbuf)))
146         return False
147
148     def rmr_free(self, sbuf):
149         """
150         Free an rmr message buffer after use
151
152         Note: this does not need to be a class method, self is not used. However if we break it out as a function we need a home for it.
153         Parameters
154         ----------
155         sbuf: ctypes c_void_p
156              Pointer to an rmr message buffer
157         """
158         rmr.rmr_free_msg(sbuf)
159
160     # SDL
161     # NOTE, even though these are passthroughs, the seperate SDL wrapper is useful for other applications like A1.
162     # Therefore, we don't embed that SDLWrapper functionality here so that it can be instantiated on it's own.
163
164     def sdl_set(self, ns, key, value, usemsgpack=True):
165         """
166         set a key
167
168         Parameters
169         ----------
170         ns: string
171            the sdl namespace
172         key: string
173             the sdl key
174         value:
175             if usemsgpack is True, value can be anything serializable by msgpack
176             if usemsgpack is False, value must be bytes
177         usemsgpack: boolean (optional)
178             determines whether the value is serialized using msgpack
179         """
180         self._sdl.set(ns, key, value, usemsgpack)
181
182     def sdl_get(self, ns, key, usemsgpack=True):
183         """
184         get a key
185
186         Parameters
187         ----------
188         ns: string
189            the sdl namespace
190         key: string
191             the sdl key
192         usemsgpack: boolean (optional)
193             if usemsgpack is True, the value is deserialized using msgpack
194             if usemsgpack is False, the value is returned as raw bytes
195
196         Returns
197         -------
198         None (if not exist) or see above; depends on usemsgpack
199         """
200         return self._sdl.get(ns, key, usemsgpack)
201
202     def sdl_find_and_get(self, ns, prefix, usemsgpack=True):
203         """
204         get all k v pairs that start with prefix
205
206         Parameters
207         ----------
208         ns: string
209            the sdl namespace
210         key: string
211             the sdl key
212         prefix: string
213             the prefix
214         usemsgpack: boolean (optional)
215             if usemsgpack is True, the value returned is a dict where each value has been deserialized using msgpack
216             if usemsgpack is False, the value returned is as a dict mapping keys to raw bytes
217
218         Returns
219         -------
220         {} (if no keys match) or see above; depends on usemsgpack
221         """
222         return self._sdl.find_and_get(ns, prefix, usemsgpack)
223
224     def sdl_delete(self, ns, key):
225         """
226         delete a key
227
228         Parameters
229         ----------
230         ns: string
231            the sdl namespace
232         key: string
233             the sdl key
234         """
235         self._sdl.delete(ns, key)
236
237     # Health
238
239     def healthcheck(self):
240         """
241         this needs to be understood how this is supposed to work
242         """
243         return self._rmr_loop.healthcheck() and self._sdl.healthcheck()
244
245     def stop(self):
246         """
247         cleans up and stops the xapp rmr thread (currently)
248         This is critical for unit testing as pytest will never return if the thread is running.
249
250         TODO: can we register a ctrl-c handler so this gets called on ctrl-c? Because currently two ctrl-c are needed to stop
251         """
252         self._rmr_loop.stop()
253
254
255 # Public Classes to subclass (these subclass _BaseXapp)
256
257
258 class RMRXapp(_BaseXapp):
259     """
260     Represents an xapp that is purely driven by rmr messages (i.e., when messages are received, the xapp does something
261     When run is called, the xapp framework waits for rmr messages, and calls the client provided consume callback on every one
262     """
263
264     def __init__(self, default_handler, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
265         """
266         Parameters
267         ----------
268         default_handler: function
269             a function with the signature (summary, sbuf) to be called when a message of type message_type is received
270             summary: dict
271                 the rmr message summary
272             sbuf: ctypes c_void_p
273                 Pointer to an rmr message buffer. The user must call free on this when done.
274
275         post_init: function (optional)
276             optionally runs this function after the app initializes and before the run loop
277             it's signature should be post_init(self)
278
279         For the other parameters, see _BaseXapp
280         """
281         # init base
282         super().__init__(
283             rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl, post_init=post_init
284         )
285
286         # setup callbacks
287         self._default_handler = default_handler
288         self._dispatch = {}
289
290         # used for thread control
291         self._keep_going = True
292
293         # register a default healthcheck handler
294         # this default checks that rmr is working and SDL is working
295         # the user can override this and register their own handler if they wish since the "last registered callback wins".
296         def handle_healthcheck(self, summary, sbuf):
297             ok = self.healthcheck()
298             payload = b"OK\n" if ok else b"ERROR [RMR or SDL is unhealthy]\n"
299             self.rmr_rts(sbuf, new_payload=payload, new_mtype=RIC_HEALTH_CHECK_RESP)
300             self.rmr_free(sbuf)
301
302         self.register_callback(handle_healthcheck, RIC_HEALTH_CHECK_REQ)
303
304     def register_callback(self, handler, message_type):
305         """
306         registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type
307
308         Parameters
309         ----------
310         handler: function
311             a function with the signature (summary, sbuf) to be called when a message of type message_type is received
312             summary: dict
313                 the rmr message summary
314             sbuf: ctypes c_void_p
315                 Pointer to an rmr message buffer. The user must call free on this when done.
316
317         message:type: int
318             the message type to look for
319
320         Note if this method is called multiple times for a single message type, the "last one wins".
321         """
322         self._dispatch[message_type] = handler
323
324     def run(self, thread=False):
325         """
326         This function should be called when the client xapp is ready to wait for their handlers to be called on received messages
327
328         Parameters
329         ----------
330         thread: bool (optional)
331             if thread is True, execution is returned to caller and the queue read loop is executed in a thread.
332             The thread can be stopped using .stop()
333             if False, execution is not returned and the framework loops
334         """
335
336         def loop():
337             while self._keep_going:
338                 if not self._rmr_loop.rcv_queue.empty():
339                     (summary, sbuf) = self._rmr_loop.rcv_queue.get()
340                     # dispatch
341                     func = self._dispatch.get(summary[rmr.RMR_MS_MSG_TYPE], None)
342                     if not func:
343                         func = self._default_handler
344                     func(self, summary, sbuf)
345
346         if thread:
347             Thread(target=loop).start()
348         else:
349             loop()
350
351     def stop(self):
352         """
353         stops the rmr xapp completely.
354         """
355         super().stop()
356         self.logger.debug("Stopping queue reading thread..")
357         self._keep_going = False
358
359
360 class Xapp(_BaseXapp):
361     """
362     Represents an xapp where the client provides a generic function to call, which is mostly likely a loop-forever loop
363     """
364
365     def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
366         """
367         Parameters
368         ----------
369         entrypoint: function
370             this function is called when the xapp runs; this is the user code
371             it's signature should be function(self)
372
373         For the other parameters, see _BaseXapp
374         """
375         # init base
376         super().__init__(rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl)
377         self._entrypoint = entrypoint
378
379     def run(self):
380         """
381         This function should be called when the client xapp is ready to start their code
382         """
383         self._entrypoint(self)
384
385     # there is no need for stop currently here (base has, and nothing special to do here)