Move rmr python here.
[ric-plt/xapp-frame-py.git] / ricxappframe / xapp_frame.py
1 """
2 Framework for python xapps
3 Framework here means Xapp classes that can be subclassed
4 """
5 # ==================================================================================
6 #       Copyright (c) 2020 Nokia
7 #       Copyright (c) 2020 AT&T Intellectual Property.
8 #
9 #   Licensed under the Apache License, Version 2.0 (the "License");
10 #   you may not use this file except in compliance with the License.
11 #   You may obtain a copy of the License at
12 #
13 #          http://www.apache.org/licenses/LICENSE-2.0
14 #
15 #   Unless required by applicable law or agreed to in writing, software
16 #   distributed under the License is distributed on an "AS IS" BASIS,
17 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 #   See the License for the specific language governing permissions and
19 #   limitations under the License.
20 # ==================================================================================
21 from threading import Thread
22 from ricxappframe import xapp_rmr
23 from ricxappframe.xapp_sdl import SDLWrapper
24 from ricxappframe.rmr import rmr
25 from mdclogpy import Logger
26
27 # constants
28 RIC_HEALTH_CHECK_REQ = 100
29 RIC_HEALTH_CHECK_RESP = 101
30
31
32 # Private base class; not for direct client use
33
34
35 class _BaseXapp:
36     """
37     Base xapp; not for client use directly
38     """
39
40     def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
41         """
42         Init
43
44         Parameters
45         ----------
46         rmr_port: int
47             port to listen on
48
49         rmr_wait_for_ready: bool (optional)
50             if this is True, then init waits until rmr is ready to send, which includes having a valid routing file.
51             this can be set to False if the client only wants to *receive only*
52
53         use_fake_sdl: bool (optional)
54             if this is True, it uses dbaas' "fake dict backend" instead of Redis or other backends.
55             Set this to true when developing your xapp or during unit testing to completely avoid needing a dbaas running or any network at all
56
57         post_init: function (optional)
58             runs this user provided function after the base xapp is initialized
59             it's signature should be post_init(self)
60         """
61         # PUBLIC, can be used by xapps using self.(name):
62         self.logger = Logger(name=__name__)
63
64         # Start rmr rcv thread
65         self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready)
66         self._mrc = self._rmr_loop.mrc  # for convenience
67
68         # SDL
69         self._sdl = SDLWrapper(use_fake_sdl)
70
71         # run the optionally provided user post init
72         if post_init:
73             post_init(self)
74
75     # Public rmr methods
76
77     def rmr_get_messages(self):
78         """
79         returns a generator iterable over all current messages in the queue that have not yet been read by the client xapp
80         """
81         while not self._rmr_loop.rcv_queue.empty():
82             (summary, sbuf) = self._rmr_loop.rcv_queue.get()
83             yield (summary, sbuf)
84
85     def rmr_send(self, payload, mtype, retries=100):
86         """
87         Allocates a buffer, sets payload and mtype, and sends
88
89         Parameters
90         ----------
91         payload: bytes
92             payload to set
93         mtype: int
94             message type
95         retries: int (optional)
96             Number of times to retry at the application level before excepting RMRFailure
97
98         Returns
99         -------
100         bool
101             whether or not the send worked after retries attempts
102         """
103         sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True, mtype=mtype)
104
105         for _ in range(retries):
106             sbuf = rmr.rmr_send_msg(self._mrc, sbuf)
107             if sbuf.contents.state == 0:
108                 self.rmr_free(sbuf)
109                 return True
110
111         self.rmr_free(sbuf)
112         return False
113
114     def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
115         """
116         Allows the xapp to return to sender, possibly adjusting the payload and message type before doing so
117
118         This does NOT free the sbuf for the caller as the caller may wish to perform multiple rts per buffer.
119         The client needs to free.
120
121         Parameters
122         ----------
123         sbuf: ctypes c_void_p
124              Pointer to an rmr message buffer
125         new_payload: bytes (optional)
126             New payload to set
127         new_mtype: int (optional)
128             New message type (replaces the received message)
129         retries: int (optional)
130             Number of times to retry at the application level before excepting RMRFailure
131
132         Returns
133         -------
134         bool
135             whether or not the send worked after retries attempts
136         """
137         for _ in range(retries):
138             sbuf = rmr.rmr_rts_msg(self._mrc, sbuf, payload=new_payload, mtype=new_mtype)
139             if sbuf.contents.state == 0:
140                 return True
141
142         self.logger.info("RTS Failed! Summary: {}".format(rmr.message_summary(sbuf)))
143         return False
144
145     def rmr_free(self, sbuf):
146         """
147         Free an rmr message buffer after use
148
149         Note: this does not need to be a class method, self is not used. However if we break it out as a function we need a home for it.
150         Parameters
151         ----------
152         sbuf: ctypes c_void_p
153              Pointer to an rmr message buffer
154         """
155         rmr.rmr_free_msg(sbuf)
156
157     # SDL
158     # NOTE, even though these are passthroughs, the seperate SDL wrapper is useful for other applications like A1.
159     # Therefore, we don't embed that SDLWrapper functionality here so that it can be instantiated on it's own.
160
161     def sdl_set(self, ns, key, value, usemsgpack=True):
162         """
163         set a key
164
165         Parameters
166         ----------
167         ns: string
168            the sdl namespace
169         key: string
170             the sdl key
171         value:
172             if usemsgpack is True, value can be anything serializable by msgpack
173             if usemsgpack is False, value must be bytes
174         usemsgpack: boolean (optional)
175             determines whether the value is serialized using msgpack
176         """
177         self._sdl.set(ns, key, value, usemsgpack)
178
179     def sdl_get(self, ns, key, usemsgpack=True):
180         """
181         get a key
182
183         Parameters
184         ----------
185         ns: string
186            the sdl namespace
187         key: string
188             the sdl key
189         usemsgpack: boolean (optional)
190             if usemsgpack is True, the value is deserialized using msgpack
191             if usemsgpack is False, the value is returned as raw bytes
192
193         Returns
194         -------
195         None (if not exist) or see above; depends on usemsgpack
196         """
197         return self._sdl.get(ns, key, usemsgpack)
198
199     def sdl_find_and_get(self, ns, prefix, usemsgpack=True):
200         """
201         get all k v pairs that start with prefix
202
203         Parameters
204         ----------
205         ns: string
206            the sdl namespace
207         key: string
208             the sdl key
209         prefix: string
210             the prefix
211         usemsgpack: boolean (optional)
212             if usemsgpack is True, the value returned is a dict where each value has been deserialized using msgpack
213             if usemsgpack is False, the value returned is as a dict mapping keys to raw bytes
214
215         Returns
216         -------
217         {} (if no keys match) or see above; depends on usemsgpack
218         """
219         return self._sdl.find_and_get(ns, prefix, usemsgpack)
220
221     def sdl_delete(self, ns, key):
222         """
223         delete a key
224
225         Parameters
226         ----------
227         ns: string
228            the sdl namespace
229         key: string
230             the sdl key
231         """
232         self._sdl.delete(ns, key)
233
234     # Health
235
236     def healthcheck(self):
237         """
238         this needs to be understood how this is supposed to work
239         """
240         return self._rmr_loop.healthcheck() and self._sdl.healthcheck()
241
242     def stop(self):
243         """
244         cleans up and stops the xapp rmr thread (currently)
245         This is critical for unit testing as pytest will never return if the thread is running.
246
247         TODO: can we register a ctrl-c handler so this gets called on ctrl-c? Because currently two ctrl-c are needed to stop
248         """
249         self._rmr_loop.stop()
250
251
252 # Public Classes to subclass (these subclass _BaseXapp)
253
254
255 class RMRXapp(_BaseXapp):
256     """
257     Represents an xapp that is purely driven by rmr messages (i.e., when messages are received, the xapp does something
258     When run is called, the xapp framework waits for rmr messages, and calls the client provided consume callback on every one
259     """
260
261     def __init__(self, default_handler, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
262         """
263         Parameters
264         ----------
265         default_handler: function
266             a function with the signature (summary, sbuf) to be called when a message of type message_type is received
267             summary: dict
268                 the rmr message summary
269             sbuf: ctypes c_void_p
270                 Pointer to an rmr message buffer. The user must call free on this when done.
271
272         post_init: function (optional)
273             optionally runs this function after the app initializes and before the run loop
274             it's signature should be post_init(self)
275
276         For the other parameters, see _BaseXapp
277         """
278         # init base
279         super().__init__(
280             rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl, post_init=post_init
281         )
282
283         # setup callbacks
284         self._default_handler = default_handler
285         self._dispatch = {}
286
287         # used for thread control
288         self._keep_going = True
289
290         # register a default healthcheck handler
291         # this default checks that rmr is working and SDL is working
292         # the user can override this and register their own handler if they wish since the "last registered callback wins".
293         def handle_healthcheck(self, summary, sbuf):
294             ok = self.healthcheck()
295             payload = b"OK\n" if ok else b"ERROR [RMR or SDL is unhealthy]\n"
296             self.rmr_rts(sbuf, new_payload=payload, new_mtype=RIC_HEALTH_CHECK_RESP)
297             self.rmr_free(sbuf)
298
299         self.register_callback(handle_healthcheck, RIC_HEALTH_CHECK_REQ)
300
301     def register_callback(self, handler, message_type):
302         """
303         registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type
304
305         Parameters
306         ----------
307         handler: function
308             a function with the signature (summary, sbuf) to be called when a message of type message_type is received
309             summary: dict
310                 the rmr message summary
311             sbuf: ctypes c_void_p
312                 Pointer to an rmr message buffer. The user must call free on this when done.
313
314         message:type: int
315             the message type to look for
316
317         Note if this method is called multiple times for a single message type, the "last one wins".
318         """
319         self._dispatch[message_type] = handler
320
321     def run(self, thread=False):
322         """
323         This function should be called when the client xapp is ready to wait for their handlers to be called on received messages
324
325         Parameters
326         ----------
327         thread: bool (optional)
328             if thread is True, execution is returned to caller and the queue read loop is executed in a thread.
329             The thread can be stopped using .stop()
330             if False, execution is not returned and the framework loops
331         """
332
333         def loop():
334             while self._keep_going:
335                 if not self._rmr_loop.rcv_queue.empty():
336                     (summary, sbuf) = self._rmr_loop.rcv_queue.get()
337                     # dispatch
338                     func = self._dispatch.get(summary["message type"], None)
339                     if not func:
340                         func = self._default_handler
341                     func(self, summary, sbuf)
342
343         if thread:
344             Thread(target=loop).start()
345         else:
346             loop()
347
348     def stop(self):
349         """
350         stops the rmr xapp completely.
351         """
352         super().stop()
353         self.logger.debug("Stopping queue reading thread..")
354         self._keep_going = False
355
356
357 class Xapp(_BaseXapp):
358     """
359     Represents an xapp where the client provides a generic function to call, which is mostly likely a loop-forever loop
360     """
361
362     def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
363         """
364         Parameters
365         ----------
366         entrypoint: function
367             this function is called when the xapp runs; this is the user code
368             it's signature should be function(self)
369
370         For the other parameters, see _BaseXapp
371         """
372         # init base
373         super().__init__(rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl)
374         self._entrypoint = entrypoint
375
376     def run(self):
377         """
378         This function should be called when the client xapp is ready to start their code
379         """
380         self._entrypoint(self)
381
382     # there is no need for stop currently here (base has, and nothing special to do here)