1 # ==================================================================================
2 # Copyright (c) 2020 Nokia
3 # Copyright (c) 2020 AT&T Intellectual Property.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ==================================================================================
18 This framework for Python Xapps provides classes that Xapp writers
19 should instantiate and/or subclass depending on their needs.
26 from threading import Thread
27 from typing import List, Set
30 from mdclogpy import Logger
32 from ricxappframe import xapp_rmr
33 from ricxappframe.constants import sdl_namespaces
34 from ricxappframe.entities.rnib.nb_identity_pb2 import NbIdentity
35 from ricxappframe.entities.rnib.nodeb_info_pb2 import Node
36 from ricxappframe.rmr import rmr
37 from ricxappframe.util.constants import Constants
38 from ricxappframe.xapp_sdl import SDLWrapper
44 This class initializes RMR, starts a thread that checks for incoming
45 messages, provisions an SDL object and optionally creates a
46 config-file watcher. This private base class should not be
47 instantiated by clients directly, but it defines many public methods
48 that may be used by clients.
50 If environment variable CONFIG_FILE is defined, and that variable
51 contains a path to an existing file, a watcher is defined to monitor
52 modifications (writes) to that file using the Linux kernel's inotify
53 feature. The watcher must be polled by calling method
58 rmr_port: int (optional, default is 4562)
59 Port on which the RMR library listens for incoming messages.
61 rmr_wait_for_ready: bool (optional, default is True)
62 If this is True, then init waits until RMR is ready to send,
63 which includes having a valid routing file. This can be set
64 to False if the client wants to *receive only*.
66 use_fake_sdl: bool (optional, default is False)
67 if this is True, it uses the DBaaS "fake dict backend" instead
68 of Redis or other backends. Set this to True when developing
69 an xapp or during unit testing to eliminate the need for DBaaS.
71 post_init: function (optional, default is None)
72 Runs this user-provided function at the end of the init method;
73 its signature should be post_init(self)
76 def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
78 Documented in the class comment.
80 # PUBLIC, can be used by xapps using self.(name):
81 self.logger = Logger(name=__name__)
83 # Start rmr rcv thread
84 self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready)
85 self._mrc = self._rmr_loop.mrc # for convenience
88 self.sdl = SDLWrapper(use_fake_sdl)
91 # The environment variable specifies the path to the Xapp config file
92 self._config_path = os.environ.get(Constants.CONFIG_FILE_ENV, None)
93 if self._config_path and os.path.isfile(self._config_path):
94 self._inotify = inotify_simple.INotify()
95 self._inotify.add_watch(self._config_path, inotify_simple.flags.MODIFY)
96 self.logger.debug("__init__: watching config file {}".format(self._config_path))
99 self.logger.warning("__init__: NOT watching any config file")
101 # used for thread control of Registration of Xapp
102 self._keep_registration = True
104 # configuration data for xapp registration and deregistration
105 self._config_data = None
106 if self._config_path and os.path.isfile(self._config_path):
107 with open(self._config_path) as json_file:
108 self._config_data = json.load(json_file)
110 self._keep_registration = False
111 self.logger.error("__init__: Cannot Read config file for xapp Registration")
112 self._config_data = {}
114 Thread(target=self.registerXapp).start()
116 # run the optionally provided user post init
120 def get_service(self, host, service):
122 To find the url for connecting to the service
127 defines the hostname in the url
129 defines the servicename in the url
136 app_namespace = self._config_data.get("APP_NAMESPACE")
137 if app_namespace is None:
138 app_namespace = Constants.DEFAULT_XAPP_NS
139 self.logger.debug("service : {} host : {},appnamespace : {}".format(service, host, app_namespace))
140 if app_namespace is not None and host is not None:
141 svc = service.format(app_namespace.upper(), host.upper())
142 urlkey = svc.replace("-", "_")
143 url = os.environ.get(urlkey).split("//")
144 self.logger.debug("Service urlkey : {} and url: {}".format(urlkey, url))
149 def do_post(self, plt_namespace, url, msg):
151 registration of the xapp using the url and json msg
155 plt_namespace: string
156 platform namespace where the xapp is running
158 url for xapp registration
160 json msg containing the xapp details
165 whether or not the xapp is registered
168 self.logger.error("url is empty ")
170 if plt_namespace is None:
171 self.logger.error("plt_namespace is empty")
174 request_url = url.format(plt_namespace, plt_namespace)
175 resp = requests.post(request_url, json=msg)
176 self.logger.debug("Post to '{}' done, status : {}".format(request_url, resp.status_code))
177 self.logger.debug("Response Text : {}".format(resp.text))
178 return resp.status_code == 200 or resp.status_code == 201
179 except requests.exceptions.RequestException as err:
180 self.logger.error("Error : {}".format(err))
182 except requests.exceptions.HTTPError as errh:
183 self.logger.error("Http Error: {}".format(errh))
185 except requests.exceptions.ConnectionError as errc:
186 self.logger.error("Error Connecting: {}".format(errc))
188 except requests.exceptions.Timeout as errt:
189 self.logger.error("Timeout Error: {}".format(errt))
194 function to registers the xapp
199 whether or not the xapp is registered
201 hostname = os.environ.get("HOSTNAME")
202 xappname = self._config_data.get("name")
203 xappversion = self._config_data.get("version")
204 pltnamespace = os.environ.get("PLT_NAMESPACE")
205 if pltnamespace is None:
206 pltnamespace = Constants.DEFAULT_PLT_NS
207 self.logger.debug("config details hostname : {} xappname: {} xappversion : {} pltnamespace : {}".format(
208 hostname, xappname, xappversion, pltnamespace))
210 http_endpoint = self.get_service(hostname, Constants.SERVICE_HTTP)
211 rmr_endpoint = self.get_service(hostname, Constants.SERVICE_RMR)
212 if http_endpoint == "" or rmr_endpoint == "":
214 "Couldn't resolve service endpoints: http_endpoint={} rmr_endpoint={}".format(http_endpoint,
218 "config details hostname : {} xappname: {} xappversion : {} pltnamespace : {} http_endpoint : {} rmr_endpoint "
219 ": {} configpath : {}".format(hostname, xappname, xappversion, pltnamespace, http_endpoint, rmr_endpoint,
220 self._config_data.get("CONFIG_PATH")))
223 "appVersion": xappversion,
225 "appInstanceName": xappname,
226 "httpEndpoint": http_endpoint,
227 "rmrEndpoint": rmr_endpoint,
228 "config": json.dumps(self._config_data)
230 self.logger.info("REQUEST STRING :{}".format(request_string))
231 return self.do_post(pltnamespace, Constants.REGISTER_PATH, request_string)
233 def registerXapp(self):
238 while self._keep_registration and retries > 0:
241 # checking for rmr/sdl/xapp health
242 healthy = self.healthcheck()
245 "Application='{}' is not ready yet, waiting ...".format(self._config_data.get("name")))
248 self.logger.debug("Application='{}' is now up and ready, continue with registration ...".format(
249 self._config_data.get("name")))
251 self.logger.debug("Registration done, proceeding with startup ...")
254 def deregister(self):
261 whether or not the xapp is registered
263 healthy = self.healthcheck()
265 self.logger.error("RMR or SDL or xapp == Not Healthy")
267 if self._config_data is None:
269 name = os.environ.get("HOSTNAME")
270 xappname = self._config_data.get("name")
271 pltnamespace = os.environ.get("PLT_NAMESPACE")
272 if pltnamespace is None:
273 pltnamespace = Constants.DEFAULT_PLT_NS
276 "appInstanceName": xappname,
279 return self.do_post(pltnamespace, Constants.DEREGISTER_PATH, request_string)
281 def xapp_shutdown(self):
283 Deregisters the xapp while shutting down
286 self.logger.debug("Wait for xapp to get unregistered")
291 def rmr_get_messages(self):
293 Returns a generator iterable over all items in the queue that
294 have not yet been read by the client xapp. Each item is a tuple
295 (S, sbuf) where S is a message summary dict and sbuf is the raw
296 message. The caller MUST call rmr.rmr_free_msg(sbuf) when
297 finished with each sbuf to prevent memory leaks!
299 while not self._rmr_loop.rcv_queue.empty():
300 (summary, sbuf) = self._rmr_loop.rcv_queue.get()
301 yield (summary, sbuf)
303 def rmr_send(self, payload, mtype, retries=100):
305 Allocates a buffer, sets payload and mtype, and sends
313 retries: int (optional)
314 Number of times to retry at the application level before excepting RMRFailure
319 whether or not the send worked after retries attempts
321 sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True,
324 for _ in range(retries):
325 sbuf = rmr.rmr_send_msg(self._mrc, sbuf)
326 if sbuf.contents.state == 0:
333 def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
335 Allows the xapp to return to sender, possibly adjusting the
336 payload and message type before doing so. This does NOT free
337 the sbuf for the caller as the caller may wish to perform
338 multiple rts per buffer. The client needs to free.
342 sbuf: ctypes c_void_p
343 Pointer to an rmr message buffer
344 new_payload: bytes (optional)
346 new_mtype: int (optional)
347 New message type (replaces the received message)
348 retries: int (optional, default 100)
349 Number of times to retry at the application level
354 whether or not the send worked after retries attempts
356 for _ in range(retries):
357 sbuf = rmr.rmr_rts_msg(self._mrc, sbuf, payload=new_payload, mtype=new_mtype)
358 if sbuf.contents.state == 0:
361 self.logger.warning("RTS Failed! Summary: {}".format(rmr.message_summary(sbuf)))
364 def rmr_free(self, sbuf):
366 Frees an rmr message buffer after use
368 Note: this does not need to be a class method, self is not
369 used. However if we break it out as a function we need a home
374 sbuf: ctypes c_void_p
375 Pointer to an rmr message buffer
377 rmr.rmr_free_msg(sbuf)
379 # Convenience (pass-thru) function for invoking SDL.
381 def sdl_set(self, namespace, key, value, usemsgpack=True):
383 ** Deprecate Warning **
384 ** Will be removed in a future function **
386 Stores a key-value pair to SDL, optionally serializing the value
387 to bytes using msgpack.
396 Object or byte array to store. See the `usemsgpack` parameter.
397 usemsgpack: boolean (optional, default is True)
398 Determines whether the value is serialized using msgpack before storing.
399 If usemsgpack is True, the msgpack function `packb` is invoked
400 on the value to yield a byte array that is then sent to SDL.
401 Stated differently, if usemsgpack is True, the value can be anything
402 that is serializable by msgpack.
403 If usemsgpack is False, the value must be bytes.
405 self.sdl.set(namespace, key, value, usemsgpack)
407 def sdl_get(self, namespace, key, usemsgpack=True):
409 ** Deprecate Warning **
410 ** Will be removed in a future function **
412 Gets the value for the specified namespace and key from SDL,
413 optionally deserializing stored bytes using msgpack.
421 usemsgpack: boolean (optional, default is True)
422 If usemsgpack is True, the byte array stored by SDL is deserialized
423 using msgpack to yield the original object that was stored.
424 If usemsgpack is False, the byte array stored by SDL is returned
425 without further processing.
430 See the usemsgpack parameter for an explanation of the returned value type.
431 Answers None if the key is not found.
433 return self.sdl.get(namespace, key, usemsgpack)
435 def sdl_find_and_get(self, namespace, prefix, usemsgpack=True):
437 ** Deprecate Warning **
438 ** Will be removed in a future function **
440 Gets all key-value pairs in the specified namespace
441 with keys that start with the specified prefix,
442 optionally deserializing stored bytes using msgpack.
450 usemsgpack: boolean (optional, default is True)
451 If usemsgpack is True, the byte array stored by SDL is deserialized
452 using msgpack to yield the original value that was stored.
453 If usemsgpack is False, the byte array stored by SDL is returned
454 without further processing.
458 Dictionary of key-value pairs
459 Each key has the specified prefix.
460 The value object (its type) depends on the usemsgpack parameter,
461 but is either a Python object or raw bytes as discussed above.
462 Answers an empty dictionary if no keys matched the prefix.
464 return self.sdl.find_and_get(namespace, prefix, usemsgpack)
466 def sdl_delete(self, namespace, key):
468 ** Deprecate Warning **
469 ** Will be removed in a future function **
471 Deletes the key-value pair with the specified key in the specified namespace.
480 self.sdl.delete(namespace, key)
482 def _get_rnib_info(self, node_type):
484 Since the difference between get_list_gnb_ids and get_list_enb_ids is only note-type,
485 this function extracted from the duplicated logic.
490 Type of node. This is EnumDescriptor.
502 SdlTypeError: If function's argument is of an inappropriate type.
503 NotConnected: If SDL is not connected to the backend data storage.
504 RejectedByBackend: If backend data storage rejects the request.
505 BackendError: If the backend data storage fails to process the request.
507 nbid_strings: Set[bytes] = self.sdl.get_members(sdl_namespaces.E2_MANAGER, node_type, usemsgpack=False)
508 ret: List[NbIdentity] = []
509 for nbid_string in nbid_strings:
511 nbid.ParseFromString(nbid_string)
515 def get_list_gnb_ids(self):
517 Retrieves the list of gNodeb identity entities
519 gNodeb information is stored in SDL by E2Manager. Therefore, gNode information
520 is stored in SDL's `e2Manager` namespace as protobuf serialized.
528 SdlTypeError: If function's argument is of an inappropriate type.
529 NotConnected: If SDL is not connected to the backend data storage.
530 RejectedByBackend: If backend data storage rejects the request.
531 BackendError: If the backend data storage fails to process the request.
533 return self._get_rnib_info(Node.Type.Name(Node.Type.GNB))
535 def get_list_enb_ids(self):
537 Retrieves the list of eNodeb identity entities
539 eNodeb information is stored in SDL by E2Manager. Therefore, eNode information
540 is stored in SDL's `e2Manager` namespace as protobuf serialized.
548 SdlTypeError: If function's argument is of an inappropriate type.
549 NotConnected: If SDL is not connected to the backend data storage.
550 RejectedByBackend: If backend data storage rejects the request.
551 BackendError: If the backend data storage fails to process the request.
553 return self._get_rnib_info(Node.Type.Name(Node.Type.ENB))
557 def healthcheck(self):
559 this needs to be understood how this is supposed to work
561 return self._rmr_loop.healthcheck() and self.sdl.healthcheck()
563 # Convenience function for discovering config change events
565 def config_check(self, timeout=0):
567 Checks the watcher for configuration-file events. The watcher
568 prerequisites and event mask are documented in __init__().
572 timeout: int (optional)
573 Number of seconds to wait for a configuration-file event, default 0.
577 List of Events, possibly empty
578 An event is a tuple with objects wd, mask, cookie and name.
581 Event(wd=1, mask=1073742080, cookie=0, name='foo')
584 if not self._inotify:
586 events = self._inotify.read(timeout=timeout)
591 cleans up and stops the xapp rmr thread (currently). This is
592 critical for unit testing as pytest will never return if the
595 TODO: can we register a ctrl-c handler so this gets called on
596 ctrl-c? Because currently two ctrl-c are needed to stop.
601 self._rmr_loop.stop()
604 # Public classes that Xapp writers should instantiate or subclass
605 # to implement an Xapp.
608 class RMRXapp(_BaseXapp):
610 Represents an Xapp that reacts only to RMR messages; i.e., the Xapp
611 only performs an action when a message is received. Clients should
612 invoke the run method, which has a loop that waits for RMR messages
613 and calls the appropriate client-registered consume callback on each.
615 If environment variable CONFIG_FILE is defined, and that variable
616 contains a path to an existing file, this class polls a watcher
617 defined on that file to detect file-write events, and invokes a
618 configuration-change handler on each event. The handler is also
619 invoked at startup. If no handler function is supplied to the
620 constructor, this class defines a default handler that only logs a
625 default_handler: function
626 A function with the signature (summary, sbuf) to be called when a
627 message type is received for which no other handler is registered.
628 default_handler argument summary: dict
629 The RMR message summary, a dict of key-value pairs
630 default_handler argument sbuf: ctypes c_void_p
631 Pointer to an RMR message buffer. The user must call free on this when done.
632 config_handler: function (optional, default is documented above)
633 A function with the signature (json) to be called at startup and each time
634 a configuration-file change event is detected. The JSON object is read from
635 the configuration file, if the prerequisites are met.
636 config_handler argument json: dict
637 The contents of the configuration file, parsed as JSON.
638 rmr_port: integer (optional, default is 4562)
639 Initialize RMR to listen on this port
640 rmr_wait_for_ready: boolean (optional, default is True)
641 Wait for RMR to signal ready before starting the dispatch loop
642 use_fake_sdl: boolean (optional, default is False)
643 Use an in-memory store instead of the real SDL service
644 post_init: function (optional, default None)
645 Run this function after the app initializes and before the dispatch loop starts;
646 its signature should be post_init(self)
649 def __init__(self, default_handler, config_handler=None, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False,
656 rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl, post_init=post_init
660 self._default_handler = default_handler
661 self._config_handler = config_handler
664 # used for thread control
665 self._keep_going = True
667 # register a default healthcheck handler
668 # this default checks that rmr is working and SDL is working
669 # the user can override this and register their own handler
670 # if they wish since the "last registered callback wins".
671 def handle_healthcheck(self, summary, sbuf):
672 healthy = self.healthcheck()
673 payload = b"OK\n" if healthy else b"ERROR [RMR or SDL is unhealthy]\n"
674 self.rmr_rts(sbuf, new_payload=payload, new_mtype=Constants.RIC_HEALTH_CHECK_RESP)
677 self.register_callback(handle_healthcheck, Constants.RIC_HEALTH_CHECK_REQ)
679 # define a default configuration-change handler if none was provided.
680 if not config_handler:
681 def handle_config_change(self, config):
682 self.logger.debug("xapp_frame: default config handler invoked")
684 self._config_handler = handle_config_change
686 # call the config handler at startup if prereqs were met
688 with open(self._config_path) as json_file:
689 data = json.load(json_file)
690 self.logger.debug("run: invoking config handler at start")
691 self._config_handler(self, data)
693 def register_callback(self, handler, message_type):
695 registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type
700 a function with the signature (summary, sbuf) to be called
701 when a message of type message_type is received
703 the rmr message summary
704 sbuf: ctypes c_void_p
705 Pointer to an rmr message buffer. The user must call free on this when done.
708 the message type to look for
710 Note if this method is called multiple times for a single message type, the "last one wins".
712 self._dispatch[message_type] = handler
714 def run(self, thread=False, rmr_timeout=5, inotify_timeout=0):
716 This function should be called when the reactive Xapp is ready to start.
717 After start, the Xapp's handlers will be called on received messages.
721 thread: bool (optional, default is False)
722 If False, execution is not returned and the framework loops forever.
723 If True, a thread is started to run the queue read/dispatch loop
724 and execution is returned to caller; the thread can be stopped
725 by calling the .stop() method.
727 rmr_timeout: integer (optional, default is 5 seconds)
728 Length of time to wait for an RMR message to arrive.
730 inotify_timeout: integer (optional, default is 0 seconds)
731 Length of time to wait for an inotify event to arrive.
735 while self._keep_going:
739 (summary, sbuf) = self._rmr_loop.rcv_queue.get(block=True, timeout=rmr_timeout)
741 func = self._dispatch.get(summary[rmr.RMR_MS_MSG_TYPE], None)
743 func = self._default_handler
744 self.logger.debug("run: invoking msg handler on type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
745 func(self, summary, sbuf)
750 # poll configuration file watcher
752 events = self.config_check(timeout=inotify_timeout)
754 with open(self._config_path) as json_file:
755 data = json.load(json_file)
756 self.logger.debug("run: invoking config handler on change event {}".format(event))
757 self._config_handler(self, data)
758 except Exception as error:
759 self.logger.error("run: configuration handler failed: {}".format(error))
762 Thread(target=loop).start()
768 Sets the flag to end the dispatch loop.
771 self.logger.debug("Setting flag to end framework work loop.")
772 self._keep_going = False
775 class Xapp(_BaseXapp):
777 Represents a generic Xapp where the client provides a single function
778 for the framework to call at startup time (instead of providing callback
779 functions by message type). The Xapp writer must implement and provide a
780 function with a loop-forever construct similar to the `run` function in
781 the `RMRXapp` class. That function should poll to retrieve RMR messages
782 and dispatch them appropriately, poll for configuration changes, etc.
787 This function is called when the Xapp class's run method is invoked.
788 The function signature must be just function(self)
789 rmr_port: integer (optional, default is 4562)
790 Initialize RMR to listen on this port
791 rmr_wait_for_ready: boolean (optional, default is True)
792 Wait for RMR to signal ready before starting the dispatch loop
793 use_fake_sdl: boolean (optional, default is False)
794 Use an in-memory store instead of the real SDL service
797 def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
802 For the other parameters, see class _BaseXapp.
805 super().__init__(rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl)
806 self._entrypoint = entrypoint
810 This function should be called when the general Xapp is ready to start.
812 self._entrypoint(self)
814 # there is no need for stop currently here (base has, and nothing
815 # special to do here)