Initial pass of the py xapp frame
[ric-plt/xapp-frame-py.git] / ricxappframe / xapp_frame.py
1 """
2 Framework for python xapps
3 Framework here means Xapp classes that can be subclassed
4 """
5 # ==================================================================================
6 #       Copyright (c) 2020 Nokia
7 #       Copyright (c) 2020 AT&T Intellectual Property.
8 #
9 #   Licensed under the Apache License, Version 2.0 (the "License");
10 #   you may not use this file except in compliance with the License.
11 #   You may obtain a copy of the License at
12 #
13 #          http://www.apache.org/licenses/LICENSE-2.0
14 #
15 #   Unless required by applicable law or agreed to in writing, software
16 #   distributed under the License is distributed on an "AS IS" BASIS,
17 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
18 #   See the License for the specific language governing permissions and
19 #   limitations under the License.
20 # ==================================================================================
21
22
23 from ricxappframe import xapp_rmr
24 from ricxappframe.xapp_sdl import SDLWrapper
25 from rmr import rmr
26 from mdclogpy import Logger
27
28
29 mdc_logger = Logger(name=__name__)
30
31
32 # Private base class; not for direct client use
33
34
35 class _BaseXapp:
36     """
37     Base xapp; not for client use directly
38     """
39
40     def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
41         """
42         Init
43
44         Parameters
45         ----------
46         rmr_port: int
47             port to listen on
48
49         rmr_wait_for_ready: bool (optional)
50             if this is True, then init waits until rmr is ready to send, which includes having a valid routing file.
51             this can be set to False if the client only wants to *receive only*
52
53         use_fake_sdl: bool (optional)
54             if this is True, it uses dbaas' "fake dict backend" instead of Redis or other backends.
55             Set this to true when developing your xapp or during unit testing to completely avoid needing a dbaas running or any network at all
56         """
57
58         # Start rmr rcv thread
59         self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready)
60         self._mrc = self._rmr_loop.mrc  # for convenience
61
62         # SDL
63         self._sdl = SDLWrapper(use_fake_sdl)
64
65     def rmr_get_messages(self):
66         """
67         returns a generator iterable over all current messages in the queue that have not yet been read by the client xapp
68         """
69         while not self._rmr_loop.rcv_queue.empty():
70             (summary, sbuf) = self._rmr_loop.rcv_queue.get()
71             yield (summary, sbuf)
72
73     def rmr_send(self, payload, mtype, retries=100):
74         """
75         Allocates a buffer, sets payload and mtype, and sends
76
77         Parameters
78         ----------
79         payload: bytes
80             payload to set
81         mtype: int
82             message type
83         retries: int (optional)
84             Number of times to retry at the application level before excepting RMRFailure
85
86         Returns
87         -------
88         bool
89             whether or not the send worked after retries attempts
90         """
91         sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True, mtype=mtype)
92
93         for _ in range(retries):
94             sbuf = rmr.rmr_send_msg(self._mrc, sbuf)
95             if sbuf.contents.state == 0:
96                 self.rmr_free(sbuf)
97                 return True
98
99         self.rmr_free(sbuf)
100         return False
101
102     def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
103         """
104         Allows the xapp to return to sender, possibly adjusting the payload and message type before doing so
105
106         This does NOT free the sbuf for the caller as the caller may wish to perform multiple rts per buffer.
107         The client needs to free.
108
109         Parameters
110         ----------
111         sbuf: ctypes c_void_p
112              Pointer to an rmr message buffer
113         new_payload: bytes (optional)
114             New payload to set
115         new_mtype: int (optional)
116             New message type (replaces the received message)
117         retries: int (optional)
118             Number of times to retry at the application level before excepting RMRFailure
119
120         Returns
121         -------
122         bool
123             whether or not the send worked after retries attempts
124         """
125         for _ in range(retries):
126             sbuf = rmr.rmr_rts_msg(self._mrc, sbuf, payload=new_payload, mtype=new_mtype)
127             if sbuf.contents.state == 0:
128                 return True
129
130         return False
131
132     def rmr_free(self, sbuf):
133         """
134         Free an rmr message buffer after use
135
136         Note: this does not need to be a class method, self is not used. However if we break it out as a function we need a home for it.
137         Parameters
138         ----------
139         sbuf: ctypes c_void_p
140              Pointer to an rmr message buffer
141         """
142         rmr.rmr_free_msg(sbuf)
143
144     # SDL
145     # NOTE, even though these are passthroughs, the seperate SDL wrapper is useful for other applications like A1.
146     # Therefore, we don't embed that SDLWrapper functionality here so that it can be instantiated on it's own.
147
148     def sdl_set(self, ns, key, value, usemsgpack=True):
149         """
150         set a key
151
152         Parameters
153         ----------
154         ns: string
155            the sdl namespace
156         key: string
157             the sdl key
158         value:
159             if usemsgpack is True, value can be anything serializable by msgpack
160             if usemsgpack is False, value must be bytes
161         usemsgpack: boolean (optional)
162             determines whether the value is serialized using msgpack
163         """
164         self._sdl.set(ns, key, value, usemsgpack)
165
166     def sdl_get(self, ns, key, usemsgpack=True):
167         """
168         get a key
169
170         Parameters
171         ----------
172         ns: string
173            the sdl namespace
174         key: string
175             the sdl key
176         usemsgpack: boolean (optional)
177             if usemsgpack is True, the value is deserialized using msgpack
178             if usemsgpack is False, the value is returned as raw bytes
179
180         Returns
181         -------
182         None (if not exist) or see above; depends on usemsgpack
183         """
184         return self._sdl.get(ns, key, usemsgpack)
185
186     def sdl_find_and_get(self, ns, prefix, usemsgpack=True):
187         """
188         get all k v pairs that start with prefix
189
190         Parameters
191         ----------
192         ns: string
193            the sdl namespace
194         key: string
195             the sdl key
196         prefix: string
197             the prefix
198         usemsgpack: boolean (optional)
199             if usemsgpack is True, the value returned is a dict where each value has been deserialized using msgpack
200             if usemsgpack is False, the value returned is as a dict mapping keys to raw bytes
201
202         Returns
203         -------
204         {} (if no keys match) or see above; depends on usemsgpack
205         """
206         return self._sdl.find_and_get(ns, prefix, usemsgpack)
207
208     def sdl_delete(self, ns, key):
209         """
210         delete a key
211
212         Parameters
213         ----------
214         ns: string
215            the sdl namespace
216         key: string
217             the sdl key
218         """
219         self._sdl.delete(ns, key)
220
221     # Health
222
223     def healthcheck(self):
224         """
225         this needs to be understood how this is supposed to work
226         """
227         return self._rmr_loop.healthcheck() and self._sdl.healthcheck()
228
229     def stop(self):
230         """
231         cleans up and stops the xapp.
232         Currently this only stops the rmr thread
233         This is critical for unit testing as pytest will never return if the thread is running.
234
235         TODO: can we register a ctrl-c handler so this gets called on ctrl-c? Because currently two ctrl-c are needed to stop
236         """
237         self._rmr_loop.stop()
238
239
240 # Public Classes to subclass (these subclass _BaseXapp)
241
242
243 class RMRXapp(_BaseXapp):
244     """
245     Represents an xapp that is purely driven by rmr messages (i.e., when messages are received, the xapp does something
246     When run is called, the xapp framework waits for rmr messages, and calls the client provided consume callback on every one
247     """
248
249     def consume(self, summary, sbuf):
250         """
251         This function is to be implemented by the client and is called whenever a new rmr message is received.
252         It is expected to take two parameters (besides self):
253
254         Parameters
255         ----------
256         summary: dict
257             the rmr message summary
258         sbuf: ctypes c_void_p
259             Pointer to an rmr message buffer. The user must call free on this when done.
260         """
261         raise NotImplementedError()
262
263     def run(self):
264         """
265         This function should be called when the client xapp is ready to wait for consume to be called on received messages
266
267         TODO: should we run this in a thread too? We can't currently call "stop" on rmr xapps at an arbitrary time because this doesn't return control
268         Running the below in a thread probably makes the most sense.
269         """
270         while True:
271             if not self._rmr_loop.rcv_queue.empty():
272                 (summary, sbuf) = self._rmr_loop.rcv_queue.get()
273                 self.consume(summary, sbuf)
274
275
276 class Xapp(_BaseXapp):
277     """
278     Represents an xapp where the client provides a generic function to call, which is mostly likely a loop-forever loop
279     """
280
281     def loop(self):
282         """
283         This function is to be implemented by the client and is called
284         """
285         raise NotImplementedError()
286
287     def run(self):
288         """
289         This function should be called when the client xapp is ready to start their loop
290         This is simple and the client could just call self.loop(), however this gives a consistent interface as the other xapps
291         """
292         self.loop()