Fixes and enhancements:
[ric-plt/xapp-frame-py.git] / ricxappframe / xapp_frame.py
1 """
2 Framework for python xapps
3 Framework here means Xapp classes that can be subclassed
4 """
5 # ==================================================================================
6 #       Copyright (c) 2020 Nokia
7 #       Copyright (c) 2020 AT&T Intellectual Property.
8 #
9 #   Licensed under the Apache License, Version 2.0 (the "License");
10 #   you may not use this file except in compliance with the License.
11 #   You may obtain a copy of the License at
12 #
13 #          http://www.apache.org/licenses/LICENSE-2.0
14 #
15 #   Unless required by applicable law or agreed to in writing, software
16 #   distributed under the License is distributed on an "AS IS" BASIS,
17 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 #   See the License for the specific language governing permissions and
19 #   limitations under the License.
20 # ==================================================================================
21
22
23 from ricxappframe import xapp_rmr
24 from ricxappframe.xapp_sdl import SDLWrapper
25 from rmr import rmr
26 from mdclogpy import Logger
27
28
29 mdc_logger = Logger(name=__name__)
30
31
32 # Private base class; not for direct client use
33
34
35 class _BaseXapp:
36     """
37     Base xapp; not for client use directly
38     """
39
40     def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
41         """
42         Init
43
44         Parameters
45         ----------
46         rmr_port: int
47             port to listen on
48
49         rmr_wait_for_ready: bool (optional)
50             if this is True, then init waits until rmr is ready to send, which includes having a valid routing file.
51             this can be set to False if the client only wants to *receive only*
52
53         use_fake_sdl: bool (optional)
54             if this is True, it uses dbaas' "fake dict backend" instead of Redis or other backends.
55             Set this to true when developing your xapp or during unit testing to completely avoid needing a dbaas running or any network at all
56         """
57
58         # Start rmr rcv thread
59         self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready)
60         self._mrc = self._rmr_loop.mrc  # for convenience
61
62         # SDL
63         self._sdl = SDLWrapper(use_fake_sdl)
64
65         # run the optionally provided user post init
66         self.post_init()
67
68     # Public methods to be implemented by the client
69     def post_init(self):
70         """
71         this method can optionally be implemented by the client to run code immediately after the xall initialized (but before the xapp starts it's processing loop)
72         the base method here does nothing (ie nothing is executed prior to starting if the client does not implement this)
73         """
74         pass
75
76     # Public rmr methods
77
78     def rmr_get_messages(self):
79         """
80         returns a generator iterable over all current messages in the queue that have not yet been read by the client xapp
81         """
82         while not self._rmr_loop.rcv_queue.empty():
83             (summary, sbuf) = self._rmr_loop.rcv_queue.get()
84             yield (summary, sbuf)
85
86     def rmr_send(self, payload, mtype, retries=100):
87         """
88         Allocates a buffer, sets payload and mtype, and sends
89
90         Parameters
91         ----------
92         payload: bytes
93             payload to set
94         mtype: int
95             message type
96         retries: int (optional)
97             Number of times to retry at the application level before excepting RMRFailure
98
99         Returns
100         -------
101         bool
102             whether or not the send worked after retries attempts
103         """
104         sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True, mtype=mtype)
105
106         for _ in range(retries):
107             sbuf = rmr.rmr_send_msg(self._mrc, sbuf)
108             if sbuf.contents.state == 0:
109                 self.rmr_free(sbuf)
110                 return True
111
112         self.rmr_free(sbuf)
113         return False
114
115     def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
116         """
117         Allows the xapp to return to sender, possibly adjusting the payload and message type before doing so
118
119         This does NOT free the sbuf for the caller as the caller may wish to perform multiple rts per buffer.
120         The client needs to free.
121
122         Parameters
123         ----------
124         sbuf: ctypes c_void_p
125              Pointer to an rmr message buffer
126         new_payload: bytes (optional)
127             New payload to set
128         new_mtype: int (optional)
129             New message type (replaces the received message)
130         retries: int (optional)
131             Number of times to retry at the application level before excepting RMRFailure
132
133         Returns
134         -------
135         bool
136             whether or not the send worked after retries attempts
137         """
138         for _ in range(retries):
139             sbuf = rmr.rmr_rts_msg(self._mrc, sbuf, payload=new_payload, mtype=new_mtype)
140             if sbuf.contents.state == 0:
141                 return True
142
143         return False
144
145     def rmr_free(self, sbuf):
146         """
147         Free an rmr message buffer after use
148
149         Note: this does not need to be a class method, self is not used. However if we break it out as a function we need a home for it.
150         Parameters
151         ----------
152         sbuf: ctypes c_void_p
153              Pointer to an rmr message buffer
154         """
155         rmr.rmr_free_msg(sbuf)
156
157     # SDL
158     # NOTE, even though these are passthroughs, the seperate SDL wrapper is useful for other applications like A1.
159     # Therefore, we don't embed that SDLWrapper functionality here so that it can be instantiated on it's own.
160
161     def sdl_set(self, ns, key, value, usemsgpack=True):
162         """
163         set a key
164
165         Parameters
166         ----------
167         ns: string
168            the sdl namespace
169         key: string
170             the sdl key
171         value:
172             if usemsgpack is True, value can be anything serializable by msgpack
173             if usemsgpack is False, value must be bytes
174         usemsgpack: boolean (optional)
175             determines whether the value is serialized using msgpack
176         """
177         self._sdl.set(ns, key, value, usemsgpack)
178
179     def sdl_get(self, ns, key, usemsgpack=True):
180         """
181         get a key
182
183         Parameters
184         ----------
185         ns: string
186            the sdl namespace
187         key: string
188             the sdl key
189         usemsgpack: boolean (optional)
190             if usemsgpack is True, the value is deserialized using msgpack
191             if usemsgpack is False, the value is returned as raw bytes
192
193         Returns
194         -------
195         None (if not exist) or see above; depends on usemsgpack
196         """
197         return self._sdl.get(ns, key, usemsgpack)
198
199     def sdl_find_and_get(self, ns, prefix, usemsgpack=True):
200         """
201         get all k v pairs that start with prefix
202
203         Parameters
204         ----------
205         ns: string
206            the sdl namespace
207         key: string
208             the sdl key
209         prefix: string
210             the prefix
211         usemsgpack: boolean (optional)
212             if usemsgpack is True, the value returned is a dict where each value has been deserialized using msgpack
213             if usemsgpack is False, the value returned is as a dict mapping keys to raw bytes
214
215         Returns
216         -------
217         {} (if no keys match) or see above; depends on usemsgpack
218         """
219         return self._sdl.find_and_get(ns, prefix, usemsgpack)
220
221     def sdl_delete(self, ns, key):
222         """
223         delete a key
224
225         Parameters
226         ----------
227         ns: string
228            the sdl namespace
229         key: string
230             the sdl key
231         """
232         self._sdl.delete(ns, key)
233
234     # Health
235
236     def healthcheck(self):
237         """
238         this needs to be understood how this is supposed to work
239         """
240         return self._rmr_loop.healthcheck() and self._sdl.healthcheck()
241
242     def stop(self):
243         """
244         cleans up and stops the xapp.
245         Currently this only stops the rmr thread
246         This is critical for unit testing as pytest will never return if the thread is running.
247
248         TODO: can we register a ctrl-c handler so this gets called on ctrl-c? Because currently two ctrl-c are needed to stop
249         """
250         self._rmr_loop.stop()
251
252
253 # Public Classes to subclass (these subclass _BaseXapp)
254
255
256 class RMRXapp(_BaseXapp):
257     """
258     Represents an xapp that is purely driven by rmr messages (i.e., when messages are received, the xapp does something
259     When run is called, the xapp framework waits for rmr messages, and calls the client provided consume callback on every one
260     """
261
262     def consume(self, summary, sbuf):
263         """
264         This function is to be implemented by the client and is called whenever a new rmr message is received.
265         It is expected to take two parameters (besides self):
266
267         Parameters
268         ----------
269         summary: dict
270             the rmr message summary
271         sbuf: ctypes c_void_p
272             Pointer to an rmr message buffer. The user must call free on this when done.
273         """
274         self.stop()
275         raise NotImplementedError()
276
277     def run(self):
278         """
279         This function should be called when the client xapp is ready to wait for consume to be called on received messages
280
281         TODO: should we run this in a thread too? We can't currently call "stop" on rmr xapps at an arbitrary time because this doesn't return control
282         Running the below in a thread probably makes the most sense.
283         """
284         while True:
285             if not self._rmr_loop.rcv_queue.empty():
286                 (summary, sbuf) = self._rmr_loop.rcv_queue.get()
287                 self.consume(summary, sbuf)
288
289
290 class Xapp(_BaseXapp):
291     """
292     Represents an xapp where the client provides a generic function to call, which is mostly likely a loop-forever loop
293     """
294
295     def entrypoint(self):
296         """
297         This function is to be implemented by the client and is called after post_init
298         """
299         self.stop()
300         raise NotImplementedError()
301
302     def run(self):
303         """
304         This function should be called when the client xapp is ready to start their code
305         """
306         self.entrypoint()