1 # ==================================================================================
2 # Copyright (c) 2020 Nokia
3 # Copyright (c) 2020 AT&T Intellectual Property.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ==================================================================================
18 This framework for Python Xapps provides classes that Xapp writers
19 should instantiate and/or subclass depending on their needs.
26 from threading import Thread
28 from mdclogpy import Logger
29 from ricxappframe import xapp_rmr
30 from ricxappframe.rmr import rmr
31 from ricxappframe.xapp_sdl import SDLWrapper
33 # message-type constants
34 RIC_HEALTH_CHECK_REQ = 100
35 RIC_HEALTH_CHECK_RESP = 101
37 # environment variable with path to configuration file
38 CONFIG_FILE_ENV = "CONFIG_FILE"
39 CONFIG_FILE_PATH = "CONFIG_FILE_PATH"
44 This class initializes RMR, starts a thread that checks for incoming
45 messages, provisions an SDL object and optionally creates a
46 config-file watcher. This private base class should not be
47 instantiated by clients directly, but it defines many public methods
48 that may be used by clients.
50 If environment variable CONFIG_FILE is defined, and that variable
51 contains a path to an existing file, a watcher is defined to monitor
52 modifications (writes) to that file using the Linux kernel's inotify
53 feature. The watcher must be polled by calling method
58 rmr_port: int (optional, default is 4562)
59 Port on which the RMR library listens for incoming messages.
61 rmr_wait_for_ready: bool (optional, default is True)
62 If this is True, then init waits until RMR is ready to send,
63 which includes having a valid routing file. This can be set
64 to False if the client wants to *receive only*.
66 use_fake_sdl: bool (optional, default is False)
67 if this is True, it uses the DBaaS "fake dict backend" instead
68 of Redis or other backends. Set this to True when developing
69 an xapp or during unit testing to eliminate the need for DBaaS.
71 post_init: function (optional, default is None)
72 Runs this user-provided function at the end of the init method;
73 its signature should be post_init(self)
76 def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
78 Documented in the class comment.
80 # PUBLIC, can be used by xapps using self.(name):
81 self.logger = Logger(name=__name__)
83 # Start rmr rcv thread
84 self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready)
85 self._mrc = self._rmr_loop.mrc # for convenience
88 self.sdl = SDLWrapper(use_fake_sdl)
91 # The environment variable specifies the path to the Xapp config file
92 self._config_path = os.environ.get(CONFIG_FILE_ENV, None)
93 if self._config_path and os.path.isfile(self._config_path):
94 self._inotify = inotify_simple.INotify()
95 self._inotify.add_watch(self._config_path, inotify_simple.flags.MODIFY)
96 self.logger.debug("__init__: watching config file {}".format(self._config_path))
99 self.logger.warning("__init__: NOT watching any config file")
101 # used for thread control of Registration of Xapp
102 self._keep_registration = True
104 # configuration data for xapp registration and deregistration
105 self._config_data = None
106 self._configfile_path = os.environ.get(CONFIG_FILE_PATH, None)
107 if self._configfile_path and os.path.isfile(self._configfile_path):
108 with open(self._configfile_path) as json_file:
109 self._config_data = json.load(json_file)
111 self._keep_registration = False
112 self.logger.warning("__init__: Cannot Read config file for xapp Registration")
114 Thread(target=self.registerXapp).start()
116 # run the optionally provided user post init
120 def get_service(self, host, service):
122 To find the url for connecting to the service
127 defines the hostname in the url
129 defines the servicename in the url
136 app_namespace = self._config_data.get("APP_NAMESPACE")
137 if app_namespace == "":
138 app_namespace = self._config_data.get("DEFAULT_XAPP_NS")
139 svc = service.format(app_namespace.upper(), host.upper())
140 url = svc.replace("-", "_").split("//")
146 def do_post(self, plt_namespace, url, msg):
148 registration of the xapp using the url and json msg
152 plt_namespace: string
153 platform namespace where the xapp is running
155 url for xapp registration
157 json msg containing the xapp details
162 whether or not the xapp is registered
165 request_url = url.format(plt_namespace, plt_namespace)
166 resp = requests.post(request_url, json=msg)
167 self.logger.debug("Post to '{}' done, status : {}".format(request_url, resp.status_code))
168 return resp.status_code == 200 or resp.status_code == 201
169 except requests.exceptions.RequestException as err:
170 self.logger.error("Error : {}".format(err))
172 except requests.exceptions.HTTPError as errh:
173 self.logger.error("Http Error: {}".format(errh))
175 except requests.exceptions.ConnectionError as errc:
176 self.logger.error("Error Connecting: {}".format(errc))
178 except requests.exceptions.Timeout as errt:
179 self.logger.error("Timeout Error: {}".format(errt))
184 function to registers the xapp
189 whether or not the xapp is registered
191 hostname = self._config_data.get("hostname")
192 xappname = self._config_data.get("name")
193 xappversion = self._config_data.get("version")
194 pltnamespace = self._config_data.get("PLT_NAMESPACE")
195 if pltnamespace == "":
196 pltnamespace = self._config_data.get("DEFAULT_PLT_NS")
197 http_endpoint = self.get_service(hostname, self._config_data.get("SERVICE_HTTP"))
198 rmr_endpoint = self.get_service(hostname, self._config_data.get("SERVICE_RMR"))
199 if http_endpoint == "" or rmr_endpoint == "":
200 self.logger.warning("Couldn't resolve service endpoints: http_endpoint={} rmr_endpoint={}".format(http_endpoint, rmr_endpoint))
205 "httpEndpoint": http_endpoint,
206 "rmrEndpoint": rmr_endpoint,
207 "appInstanceName": xappname,
208 "appVersion": xappversion,
209 "configPath": self._config_data.get("CONFIG_PATH")
211 request_body = json.dumps(request_string)
213 self.logger.error("Unable to serialize the object")
214 return "Error searializing the object"
216 return self.do_post(pltnamespace, self._config_data.get("REGISTER_PATH"), request_body)
218 def registerXapp(self):
222 while self._keep_registration:
224 # checking for rmr/sdl/xapp health
225 healthy = self.healthcheck()
227 self.logger.warning("Application='{}' is not ready yet, waiting ...".format(self._config_data.get("name")))
230 self.logger.debug("Application='{}' is now up and ready, continue with registration ...".format(self._config_data.get("name")))
232 self.logger.debug("Registration done, proceeding with startup ...")
235 def deregister(self):
242 whether or not the xapp is registered
244 healthy = self.healthcheck()
246 self.logger.error("RMR or SDL or xapp == Not Healthy")
248 if self._config_data is None:
250 name = self._config_data.get("hostname")
251 xappname = self._config_data.get("name")
252 pltnamespace = self._config_data.get("PLT_NAMESPACE")
253 if pltnamespace == "":
254 pltnamespace = self._config_data.get("PLT_NAMESPACE")
258 "appInstanceName": xappname,
260 request_body = json.dumps(request_string)
262 self.logger.error("Error Serializing the object")
263 return "Error serializing the object"
265 return self.do_post(pltnamespace, self._config_data.get("DEREGISTER_PATH"), request_body)
267 def xapp_shutdown(self):
269 Deregisters the xapp while shutting down
272 self.logger.debug("Wait for xapp to get unregistered")
277 def rmr_get_messages(self):
279 Returns a generator iterable over all items in the queue that
280 have not yet been read by the client xapp. Each item is a tuple
281 (S, sbuf) where S is a message summary dict and sbuf is the raw
282 message. The caller MUST call rmr.rmr_free_msg(sbuf) when
283 finished with each sbuf to prevent memory leaks!
285 while not self._rmr_loop.rcv_queue.empty():
286 (summary, sbuf) = self._rmr_loop.rcv_queue.get()
287 yield (summary, sbuf)
289 def rmr_send(self, payload, mtype, retries=100):
291 Allocates a buffer, sets payload and mtype, and sends
299 retries: int (optional)
300 Number of times to retry at the application level before excepting RMRFailure
305 whether or not the send worked after retries attempts
307 sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True, mtype=mtype)
309 for _ in range(retries):
310 sbuf = rmr.rmr_send_msg(self._mrc, sbuf)
311 if sbuf.contents.state == 0:
318 def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
320 Allows the xapp to return to sender, possibly adjusting the
321 payload and message type before doing so. This does NOT free
322 the sbuf for the caller as the caller may wish to perform
323 multiple rts per buffer. The client needs to free.
327 sbuf: ctypes c_void_p
328 Pointer to an rmr message buffer
329 new_payload: bytes (optional)
331 new_mtype: int (optional)
332 New message type (replaces the received message)
333 retries: int (optional, default 100)
334 Number of times to retry at the application level
339 whether or not the send worked after retries attempts
341 for _ in range(retries):
342 sbuf = rmr.rmr_rts_msg(self._mrc, sbuf, payload=new_payload, mtype=new_mtype)
343 if sbuf.contents.state == 0:
346 self.logger.warning("RTS Failed! Summary: {}".format(rmr.message_summary(sbuf)))
349 def rmr_free(self, sbuf):
351 Frees an rmr message buffer after use
353 Note: this does not need to be a class method, self is not
354 used. However if we break it out as a function we need a home
359 sbuf: ctypes c_void_p
360 Pointer to an rmr message buffer
362 rmr.rmr_free_msg(sbuf)
364 # Convenience (pass-thru) function for invoking SDL.
366 def sdl_set(self, namespace, key, value, usemsgpack=True):
368 ** Deprecate Warning **
369 ** Will be removed in a future function **
371 Stores a key-value pair to SDL, optionally serializing the value
372 to bytes using msgpack.
381 Object or byte array to store. See the `usemsgpack` parameter.
382 usemsgpack: boolean (optional, default is True)
383 Determines whether the value is serialized using msgpack before storing.
384 If usemsgpack is True, the msgpack function `packb` is invoked
385 on the value to yield a byte array that is then sent to SDL.
386 Stated differently, if usemsgpack is True, the value can be anything
387 that is serializable by msgpack.
388 If usemsgpack is False, the value must be bytes.
390 self.sdl.set(namespace, key, value, usemsgpack)
392 def sdl_get(self, namespace, key, usemsgpack=True):
394 ** Deprecate Warning **
395 ** Will be removed in a future function **
397 Gets the value for the specified namespace and key from SDL,
398 optionally deserializing stored bytes using msgpack.
406 usemsgpack: boolean (optional, default is True)
407 If usemsgpack is True, the byte array stored by SDL is deserialized
408 using msgpack to yield the original object that was stored.
409 If usemsgpack is False, the byte array stored by SDL is returned
410 without further processing.
415 See the usemsgpack parameter for an explanation of the returned value type.
416 Answers None if the key is not found.
418 return self.sdl.get(namespace, key, usemsgpack)
420 def sdl_find_and_get(self, namespace, prefix, usemsgpack=True):
422 ** Deprecate Warning **
423 ** Will be removed in a future function **
425 Gets all key-value pairs in the specified namespace
426 with keys that start with the specified prefix,
427 optionally deserializing stored bytes using msgpack.
435 usemsgpack: boolean (optional, default is True)
436 If usemsgpack is True, the byte array stored by SDL is deserialized
437 using msgpack to yield the original value that was stored.
438 If usemsgpack is False, the byte array stored by SDL is returned
439 without further processing.
443 Dictionary of key-value pairs
444 Each key has the specified prefix.
445 The value object (its type) depends on the usemsgpack parameter,
446 but is either a Python object or raw bytes as discussed above.
447 Answers an empty dictionary if no keys matched the prefix.
449 return self.sdl.find_and_get(namespace, prefix, usemsgpack)
451 def sdl_delete(self, namespace, key):
453 ** Deprecate Warning **
454 ** Will be removed in a future function **
456 Deletes the key-value pair with the specified key in the specified namespace.
465 self.sdl.delete(namespace, key)
469 def healthcheck(self):
471 this needs to be understood how this is supposed to work
473 return self._rmr_loop.healthcheck() and self.sdl.healthcheck()
475 # Convenience function for discovering config change events
477 def config_check(self, timeout=0):
479 Checks the watcher for configuration-file events. The watcher
480 prerequisites and event mask are documented in __init__().
484 timeout: int (optional)
485 Number of seconds to wait for a configuration-file event, default 0.
489 List of Events, possibly empty
490 An event is a tuple with objects wd, mask, cookie and name.
493 Event(wd=1, mask=1073742080, cookie=0, name='foo')
496 if not self._inotify:
498 events = self._inotify.read(timeout=timeout)
503 cleans up and stops the xapp rmr thread (currently). This is
504 critical for unit testing as pytest will never return if the
507 TODO: can we register a ctrl-c handler so this gets called on
508 ctrl-c? Because currently two ctrl-c are needed to stop.
513 self._rmr_loop.stop()
516 # Public classes that Xapp writers should instantiate or subclass
517 # to implement an Xapp.
520 class RMRXapp(_BaseXapp):
522 Represents an Xapp that reacts only to RMR messages; i.e., the Xapp
523 only performs an action when a message is received. Clients should
524 invoke the run method, which has a loop that waits for RMR messages
525 and calls the appropriate client-registered consume callback on each.
527 If environment variable CONFIG_FILE is defined, and that variable
528 contains a path to an existing file, this class polls a watcher
529 defined on that file to detect file-write events, and invokes a
530 configuration-change handler on each event. The handler is also
531 invoked at startup. If no handler function is supplied to the
532 constructor, this class defines a default handler that only logs a
537 default_handler: function
538 A function with the signature (summary, sbuf) to be called when a
539 message type is received for which no other handler is registered.
540 default_handler argument summary: dict
541 The RMR message summary, a dict of key-value pairs
542 default_handler argument sbuf: ctypes c_void_p
543 Pointer to an RMR message buffer. The user must call free on this when done.
544 config_handler: function (optional, default is documented above)
545 A function with the signature (json) to be called at startup and each time
546 a configuration-file change event is detected. The JSON object is read from
547 the configuration file, if the prerequisites are met.
548 config_handler argument json: dict
549 The contents of the configuration file, parsed as JSON.
550 rmr_port: integer (optional, default is 4562)
551 Initialize RMR to listen on this port
552 rmr_wait_for_ready: boolean (optional, default is True)
553 Wait for RMR to signal ready before starting the dispatch loop
554 use_fake_sdl: boolean (optional, default is False)
555 Use an in-memory store instead of the real SDL service
556 post_init: function (optional, default None)
557 Run this function after the app initializes and before the dispatch loop starts;
558 its signature should be post_init(self)
561 def __init__(self, default_handler, config_handler=None, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
567 rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl, post_init=post_init
571 self._default_handler = default_handler
572 self._config_handler = config_handler
575 # used for thread control
576 self._keep_going = True
578 # register a default healthcheck handler
579 # this default checks that rmr is working and SDL is working
580 # the user can override this and register their own handler
581 # if they wish since the "last registered callback wins".
582 def handle_healthcheck(self, summary, sbuf):
583 healthy = self.healthcheck()
584 payload = b"OK\n" if healthy else b"ERROR [RMR or SDL is unhealthy]\n"
585 self.rmr_rts(sbuf, new_payload=payload, new_mtype=RIC_HEALTH_CHECK_RESP)
588 self.register_callback(handle_healthcheck, RIC_HEALTH_CHECK_REQ)
590 # define a default configuration-change handler if none was provided.
591 if not config_handler:
592 def handle_config_change(self, config):
593 self.logger.debug("xapp_frame: default config handler invoked")
594 self._config_handler = handle_config_change
596 # call the config handler at startup if prereqs were met
598 with open(self._config_path) as json_file:
599 data = json.load(json_file)
600 self.logger.debug("run: invoking config handler at start")
601 self._config_handler(self, data)
603 def register_callback(self, handler, message_type):
605 registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type
610 a function with the signature (summary, sbuf) to be called
611 when a message of type message_type is received
613 the rmr message summary
614 sbuf: ctypes c_void_p
615 Pointer to an rmr message buffer. The user must call free on this when done.
618 the message type to look for
620 Note if this method is called multiple times for a single message type, the "last one wins".
622 self._dispatch[message_type] = handler
624 def run(self, thread=False, rmr_timeout=5, inotify_timeout=0):
626 This function should be called when the reactive Xapp is ready to start.
627 After start, the Xapp's handlers will be called on received messages.
631 thread: bool (optional, default is False)
632 If False, execution is not returned and the framework loops forever.
633 If True, a thread is started to run the queue read/dispatch loop
634 and execution is returned to caller; the thread can be stopped
635 by calling the .stop() method.
637 rmr_timeout: integer (optional, default is 5 seconds)
638 Length of time to wait for an RMR message to arrive.
640 inotify_timeout: integer (optional, default is 0 seconds)
641 Length of time to wait for an inotify event to arrive.
645 while self._keep_going:
649 (summary, sbuf) = self._rmr_loop.rcv_queue.get(block=True, timeout=rmr_timeout)
651 func = self._dispatch.get(summary[rmr.RMR_MS_MSG_TYPE], None)
653 func = self._default_handler
654 self.logger.debug("run: invoking msg handler on type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
655 func(self, summary, sbuf)
660 # poll configuration file watcher
662 events = self.config_check(timeout=inotify_timeout)
664 with open(self._config_path) as json_file:
665 data = json.load(json_file)
666 self.logger.debug("run: invoking config handler on change event {}".format(event))
667 self._config_handler(self, data)
668 except Exception as error:
669 self.logger.error("run: configuration handler failed: {}".format(error))
672 Thread(target=loop).start()
678 Sets the flag to end the dispatch loop.
681 self.logger.debug("Setting flag to end framework work loop.")
682 self._keep_going = False
685 class Xapp(_BaseXapp):
687 Represents a generic Xapp where the client provides a single function
688 for the framework to call at startup time (instead of providing callback
689 functions by message type). The Xapp writer must implement and provide a
690 function with a loop-forever construct similar to the `run` function in
691 the `RMRXapp` class. That function should poll to retrieve RMR messages
692 and dispatch them appropriately, poll for configuration changes, etc.
697 This function is called when the Xapp class's run method is invoked.
698 The function signature must be just function(self)
699 rmr_port: integer (optional, default is 4562)
700 Initialize RMR to listen on this port
701 rmr_wait_for_ready: boolean (optional, default is True)
702 Wait for RMR to signal ready before starting the dispatch loop
703 use_fake_sdl: boolean (optional, default is False)
704 Use an in-memory store instead of the real SDL service
707 def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
712 For the other parameters, see class _BaseXapp.
715 super().__init__(rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl)
716 self._entrypoint = entrypoint
720 This function should be called when the general Xapp is ready to start.
722 self._entrypoint(self)
724 # there is no need for stop currently here (base has, and nothing
725 # special to do here)