1 # ==================================================================================
2 # Copyright (c) 2020 Nokia
3 # Copyright (c) 2020 AT&T Intellectual Property.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ==================================================================================
18 This framework for Python Xapps provides classes that Xapp writers
19 should instantiate and/or subclass depending on their needs.
26 from threading import Thread
27 from typing import List, Set
30 from mdclogpy import Logger
32 from ricxappframe import xapp_rmr
33 from ricxappframe.constants import sdl_namespaces
34 from ricxappframe.entities.rnib.nb_identity_pb2 import NbIdentity
35 from ricxappframe.entities.rnib.nodeb_info_pb2 import Node
36 from ricxappframe.rmr import rmr
37 from ricxappframe.xapp_sdl import SDLWrapper
39 # message-type constants
40 RIC_HEALTH_CHECK_REQ = 100
41 RIC_HEALTH_CHECK_RESP = 101
43 # environment variable with path to configuration file
44 CONFIG_FILE_ENV = "CONFIG_FILE"
45 CONFIG_FILE_PATH = "CONFIG_FILE_PATH"
50 This class initializes RMR, starts a thread that checks for incoming
51 messages, provisions an SDL object and optionally creates a
52 config-file watcher. This private base class should not be
53 instantiated by clients directly, but it defines many public methods
54 that may be used by clients.
56 If environment variable CONFIG_FILE is defined, and that variable
57 contains a path to an existing file, a watcher is defined to monitor
58 modifications (writes) to that file using the Linux kernel's inotify
59 feature. The watcher must be polled by calling method
64 rmr_port: int (optional, default is 4562)
65 Port on which the RMR library listens for incoming messages.
67 rmr_wait_for_ready: bool (optional, default is True)
68 If this is True, then init waits until RMR is ready to send,
69 which includes having a valid routing file. This can be set
70 to False if the client wants to *receive only*.
72 use_fake_sdl: bool (optional, default is False)
73 if this is True, it uses the DBaaS "fake dict backend" instead
74 of Redis or other backends. Set this to True when developing
75 an xapp or during unit testing to eliminate the need for DBaaS.
77 post_init: function (optional, default is None)
78 Runs this user-provided function at the end of the init method;
79 its signature should be post_init(self)
82 def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
84 Documented in the class comment.
86 # PUBLIC, can be used by xapps using self.(name):
87 self.logger = Logger(name=__name__)
89 # Start rmr rcv thread
90 self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready)
91 self._mrc = self._rmr_loop.mrc # for convenience
94 self.sdl = SDLWrapper(use_fake_sdl)
97 # The environment variable specifies the path to the Xapp config file
98 self._config_path = os.environ.get(CONFIG_FILE_ENV, None)
99 if self._config_path and os.path.isfile(self._config_path):
100 self._inotify = inotify_simple.INotify()
101 self._inotify.add_watch(self._config_path, inotify_simple.flags.MODIFY)
102 self.logger.debug("__init__: watching config file {}".format(self._config_path))
105 self.logger.warning("__init__: NOT watching any config file")
107 # used for thread control of Registration of Xapp
108 self._keep_registration = True
110 # configuration data for xapp registration and deregistration
111 self._config_data = None
112 self._configfile_path = os.environ.get(CONFIG_FILE_PATH, None)
113 if self._configfile_path and os.path.isfile(self._configfile_path):
114 with open(self._configfile_path) as json_file:
115 self._config_data = json.load(json_file)
117 self._keep_registration = False
118 self.logger.warning("__init__: Cannot Read config file for xapp Registration")
120 Thread(target=self.registerXapp).start()
122 # run the optionally provided user post init
126 def get_service(self, host, service):
128 To find the url for connecting to the service
133 defines the hostname in the url
135 defines the servicename in the url
142 app_namespace = self._config_data.get("APP_NAMESPACE")
143 if app_namespace == "":
144 app_namespace = self._config_data.get("DEFAULT_XAPP_NS")
145 svc = service.format(app_namespace.upper(), host.upper())
146 url = svc.replace("-", "_").split("//")
152 def do_post(self, plt_namespace, url, msg):
154 registration of the xapp using the url and json msg
158 plt_namespace: string
159 platform namespace where the xapp is running
161 url for xapp registration
163 json msg containing the xapp details
168 whether or not the xapp is registered
171 request_url = url.format(plt_namespace, plt_namespace)
172 resp = requests.post(request_url, json=msg)
173 self.logger.debug("Post to '{}' done, status : {}".format(request_url, resp.status_code))
174 return resp.status_code == 200 or resp.status_code == 201
175 except requests.exceptions.RequestException as err:
176 self.logger.error("Error : {}".format(err))
178 except requests.exceptions.HTTPError as errh:
179 self.logger.error("Http Error: {}".format(errh))
181 except requests.exceptions.ConnectionError as errc:
182 self.logger.error("Error Connecting: {}".format(errc))
184 except requests.exceptions.Timeout as errt:
185 self.logger.error("Timeout Error: {}".format(errt))
190 function to registers the xapp
195 whether or not the xapp is registered
197 hostname = self._config_data.get("hostname")
198 xappname = self._config_data.get("name")
199 xappversion = self._config_data.get("version")
200 pltnamespace = self._config_data.get("PLT_NAMESPACE")
201 if pltnamespace == "":
202 pltnamespace = self._config_data.get("DEFAULT_PLT_NS")
203 http_endpoint = self.get_service(hostname, self._config_data.get("SERVICE_HTTP"))
204 rmr_endpoint = self.get_service(hostname, self._config_data.get("SERVICE_RMR"))
205 if http_endpoint == "" or rmr_endpoint == "":
206 self.logger.warning("Couldn't resolve service endpoints: http_endpoint={} rmr_endpoint={}".format(http_endpoint, rmr_endpoint))
211 "httpEndpoint": http_endpoint,
212 "rmrEndpoint": rmr_endpoint,
213 "appInstanceName": xappname,
214 "appVersion": xappversion,
215 "configPath": self._config_data.get("CONFIG_PATH")
217 request_body = json.dumps(request_string)
219 self.logger.error("Unable to serialize the object")
220 return "Error searializing the object"
222 return self.do_post(pltnamespace, self._config_data.get("REGISTER_PATH"), request_body)
224 def registerXapp(self):
228 while self._keep_registration:
230 # checking for rmr/sdl/xapp health
231 healthy = self.healthcheck()
233 self.logger.warning("Application='{}' is not ready yet, waiting ...".format(self._config_data.get("name")))
236 self.logger.debug("Application='{}' is now up and ready, continue with registration ...".format(self._config_data.get("name")))
238 self.logger.debug("Registration done, proceeding with startup ...")
241 def deregister(self):
248 whether or not the xapp is registered
250 healthy = self.healthcheck()
252 self.logger.error("RMR or SDL or xapp == Not Healthy")
254 if self._config_data is None:
256 name = self._config_data.get("hostname")
257 xappname = self._config_data.get("name")
258 pltnamespace = self._config_data.get("PLT_NAMESPACE")
259 if pltnamespace == "":
260 pltnamespace = self._config_data.get("PLT_NAMESPACE")
264 "appInstanceName": xappname,
266 request_body = json.dumps(request_string)
268 self.logger.error("Error Serializing the object")
269 return "Error serializing the object"
271 return self.do_post(pltnamespace, self._config_data.get("DEREGISTER_PATH"), request_body)
273 def xapp_shutdown(self):
275 Deregisters the xapp while shutting down
278 self.logger.debug("Wait for xapp to get unregistered")
283 def rmr_get_messages(self):
285 Returns a generator iterable over all items in the queue that
286 have not yet been read by the client xapp. Each item is a tuple
287 (S, sbuf) where S is a message summary dict and sbuf is the raw
288 message. The caller MUST call rmr.rmr_free_msg(sbuf) when
289 finished with each sbuf to prevent memory leaks!
291 while not self._rmr_loop.rcv_queue.empty():
292 (summary, sbuf) = self._rmr_loop.rcv_queue.get()
293 yield (summary, sbuf)
295 def rmr_send(self, payload, mtype, retries=100):
297 Allocates a buffer, sets payload and mtype, and sends
305 retries: int (optional)
306 Number of times to retry at the application level before excepting RMRFailure
311 whether or not the send worked after retries attempts
313 sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True, mtype=mtype)
315 for _ in range(retries):
316 sbuf = rmr.rmr_send_msg(self._mrc, sbuf)
317 if sbuf.contents.state == 0:
324 def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100):
326 Allows the xapp to return to sender, possibly adjusting the
327 payload and message type before doing so. This does NOT free
328 the sbuf for the caller as the caller may wish to perform
329 multiple rts per buffer. The client needs to free.
333 sbuf: ctypes c_void_p
334 Pointer to an rmr message buffer
335 new_payload: bytes (optional)
337 new_mtype: int (optional)
338 New message type (replaces the received message)
339 retries: int (optional, default 100)
340 Number of times to retry at the application level
345 whether or not the send worked after retries attempts
347 for _ in range(retries):
348 sbuf = rmr.rmr_rts_msg(self._mrc, sbuf, payload=new_payload, mtype=new_mtype)
349 if sbuf.contents.state == 0:
352 self.logger.warning("RTS Failed! Summary: {}".format(rmr.message_summary(sbuf)))
355 def rmr_free(self, sbuf):
357 Frees an rmr message buffer after use
359 Note: this does not need to be a class method, self is not
360 used. However if we break it out as a function we need a home
365 sbuf: ctypes c_void_p
366 Pointer to an rmr message buffer
368 rmr.rmr_free_msg(sbuf)
370 # Convenience (pass-thru) function for invoking SDL.
372 def sdl_set(self, namespace, key, value, usemsgpack=True):
374 ** Deprecate Warning **
375 ** Will be removed in a future function **
377 Stores a key-value pair to SDL, optionally serializing the value
378 to bytes using msgpack.
387 Object or byte array to store. See the `usemsgpack` parameter.
388 usemsgpack: boolean (optional, default is True)
389 Determines whether the value is serialized using msgpack before storing.
390 If usemsgpack is True, the msgpack function `packb` is invoked
391 on the value to yield a byte array that is then sent to SDL.
392 Stated differently, if usemsgpack is True, the value can be anything
393 that is serializable by msgpack.
394 If usemsgpack is False, the value must be bytes.
396 self.sdl.set(namespace, key, value, usemsgpack)
398 def sdl_get(self, namespace, key, usemsgpack=True):
400 ** Deprecate Warning **
401 ** Will be removed in a future function **
403 Gets the value for the specified namespace and key from SDL,
404 optionally deserializing stored bytes using msgpack.
412 usemsgpack: boolean (optional, default is True)
413 If usemsgpack is True, the byte array stored by SDL is deserialized
414 using msgpack to yield the original object that was stored.
415 If usemsgpack is False, the byte array stored by SDL is returned
416 without further processing.
421 See the usemsgpack parameter for an explanation of the returned value type.
422 Answers None if the key is not found.
424 return self.sdl.get(namespace, key, usemsgpack)
426 def sdl_find_and_get(self, namespace, prefix, usemsgpack=True):
428 ** Deprecate Warning **
429 ** Will be removed in a future function **
431 Gets all key-value pairs in the specified namespace
432 with keys that start with the specified prefix,
433 optionally deserializing stored bytes using msgpack.
441 usemsgpack: boolean (optional, default is True)
442 If usemsgpack is True, the byte array stored by SDL is deserialized
443 using msgpack to yield the original value that was stored.
444 If usemsgpack is False, the byte array stored by SDL is returned
445 without further processing.
449 Dictionary of key-value pairs
450 Each key has the specified prefix.
451 The value object (its type) depends on the usemsgpack parameter,
452 but is either a Python object or raw bytes as discussed above.
453 Answers an empty dictionary if no keys matched the prefix.
455 return self.sdl.find_and_get(namespace, prefix, usemsgpack)
457 def sdl_delete(self, namespace, key):
459 ** Deprecate Warning **
460 ** Will be removed in a future function **
462 Deletes the key-value pair with the specified key in the specified namespace.
471 self.sdl.delete(namespace, key)
473 def _get_rnib_info(self, node_type):
475 Since the difference between get_list_gnb_ids and get_list_enb_ids is only note-type,
476 this function extracted from the duplicated logic.
481 Type of node. This is EnumDescriptor.
493 SdlTypeError: If function's argument is of an inappropriate type.
494 NotConnected: If SDL is not connected to the backend data storage.
495 RejectedByBackend: If backend data storage rejects the request.
496 BackendError: If the backend data storage fails to process the request.
498 nbid_strings: Set[bytes] = self.sdl.get_members(sdl_namespaces.E2_MANAGER, node_type, usemsgpack=False)
499 ret: List[NbIdentity] = []
500 for nbid_string in nbid_strings:
502 nbid.ParseFromString(nbid_string)
506 def get_list_gnb_ids(self):
508 Retrieves the list of gNodeb identity entities
510 gNodeb information is stored in SDL by E2Manager. Therefore, gNode information
511 is stored in SDL's `e2Manager` namespace as protobuf serialized.
519 SdlTypeError: If function's argument is of an inappropriate type.
520 NotConnected: If SDL is not connected to the backend data storage.
521 RejectedByBackend: If backend data storage rejects the request.
522 BackendError: If the backend data storage fails to process the request.
524 return self._get_rnib_info(Node.Type.Name(Node.Type.GNB))
526 def get_list_enb_ids(self):
528 Retrieves the list of eNodeb identity entities
530 eNodeb information is stored in SDL by E2Manager. Therefore, eNode information
531 is stored in SDL's `e2Manager` namespace as protobuf serialized.
539 SdlTypeError: If function's argument is of an inappropriate type.
540 NotConnected: If SDL is not connected to the backend data storage.
541 RejectedByBackend: If backend data storage rejects the request.
542 BackendError: If the backend data storage fails to process the request.
544 return self._get_rnib_info(Node.Type.Name(Node.Type.ENB))
548 def healthcheck(self):
550 this needs to be understood how this is supposed to work
552 return self._rmr_loop.healthcheck() and self.sdl.healthcheck()
554 # Convenience function for discovering config change events
556 def config_check(self, timeout=0):
558 Checks the watcher for configuration-file events. The watcher
559 prerequisites and event mask are documented in __init__().
563 timeout: int (optional)
564 Number of seconds to wait for a configuration-file event, default 0.
568 List of Events, possibly empty
569 An event is a tuple with objects wd, mask, cookie and name.
572 Event(wd=1, mask=1073742080, cookie=0, name='foo')
575 if not self._inotify:
577 events = self._inotify.read(timeout=timeout)
582 cleans up and stops the xapp rmr thread (currently). This is
583 critical for unit testing as pytest will never return if the
586 TODO: can we register a ctrl-c handler so this gets called on
587 ctrl-c? Because currently two ctrl-c are needed to stop.
592 self._rmr_loop.stop()
595 # Public classes that Xapp writers should instantiate or subclass
596 # to implement an Xapp.
599 class RMRXapp(_BaseXapp):
601 Represents an Xapp that reacts only to RMR messages; i.e., the Xapp
602 only performs an action when a message is received. Clients should
603 invoke the run method, which has a loop that waits for RMR messages
604 and calls the appropriate client-registered consume callback on each.
606 If environment variable CONFIG_FILE is defined, and that variable
607 contains a path to an existing file, this class polls a watcher
608 defined on that file to detect file-write events, and invokes a
609 configuration-change handler on each event. The handler is also
610 invoked at startup. If no handler function is supplied to the
611 constructor, this class defines a default handler that only logs a
616 default_handler: function
617 A function with the signature (summary, sbuf) to be called when a
618 message type is received for which no other handler is registered.
619 default_handler argument summary: dict
620 The RMR message summary, a dict of key-value pairs
621 default_handler argument sbuf: ctypes c_void_p
622 Pointer to an RMR message buffer. The user must call free on this when done.
623 config_handler: function (optional, default is documented above)
624 A function with the signature (json) to be called at startup and each time
625 a configuration-file change event is detected. The JSON object is read from
626 the configuration file, if the prerequisites are met.
627 config_handler argument json: dict
628 The contents of the configuration file, parsed as JSON.
629 rmr_port: integer (optional, default is 4562)
630 Initialize RMR to listen on this port
631 rmr_wait_for_ready: boolean (optional, default is True)
632 Wait for RMR to signal ready before starting the dispatch loop
633 use_fake_sdl: boolean (optional, default is False)
634 Use an in-memory store instead of the real SDL service
635 post_init: function (optional, default None)
636 Run this function after the app initializes and before the dispatch loop starts;
637 its signature should be post_init(self)
640 def __init__(self, default_handler, config_handler=None, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
646 rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl, post_init=post_init
650 self._default_handler = default_handler
651 self._config_handler = config_handler
654 # used for thread control
655 self._keep_going = True
657 # register a default healthcheck handler
658 # this default checks that rmr is working and SDL is working
659 # the user can override this and register their own handler
660 # if they wish since the "last registered callback wins".
661 def handle_healthcheck(self, summary, sbuf):
662 healthy = self.healthcheck()
663 payload = b"OK\n" if healthy else b"ERROR [RMR or SDL is unhealthy]\n"
664 self.rmr_rts(sbuf, new_payload=payload, new_mtype=RIC_HEALTH_CHECK_RESP)
667 self.register_callback(handle_healthcheck, RIC_HEALTH_CHECK_REQ)
669 # define a default configuration-change handler if none was provided.
670 if not config_handler:
671 def handle_config_change(self, config):
672 self.logger.debug("xapp_frame: default config handler invoked")
673 self._config_handler = handle_config_change
675 # call the config handler at startup if prereqs were met
677 with open(self._config_path) as json_file:
678 data = json.load(json_file)
679 self.logger.debug("run: invoking config handler at start")
680 self._config_handler(self, data)
682 def register_callback(self, handler, message_type):
684 registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type
689 a function with the signature (summary, sbuf) to be called
690 when a message of type message_type is received
692 the rmr message summary
693 sbuf: ctypes c_void_p
694 Pointer to an rmr message buffer. The user must call free on this when done.
697 the message type to look for
699 Note if this method is called multiple times for a single message type, the "last one wins".
701 self._dispatch[message_type] = handler
703 def run(self, thread=False, rmr_timeout=5, inotify_timeout=0):
705 This function should be called when the reactive Xapp is ready to start.
706 After start, the Xapp's handlers will be called on received messages.
710 thread: bool (optional, default is False)
711 If False, execution is not returned and the framework loops forever.
712 If True, a thread is started to run the queue read/dispatch loop
713 and execution is returned to caller; the thread can be stopped
714 by calling the .stop() method.
716 rmr_timeout: integer (optional, default is 5 seconds)
717 Length of time to wait for an RMR message to arrive.
719 inotify_timeout: integer (optional, default is 0 seconds)
720 Length of time to wait for an inotify event to arrive.
724 while self._keep_going:
728 (summary, sbuf) = self._rmr_loop.rcv_queue.get(block=True, timeout=rmr_timeout)
730 func = self._dispatch.get(summary[rmr.RMR_MS_MSG_TYPE], None)
732 func = self._default_handler
733 self.logger.debug("run: invoking msg handler on type {}".format(summary[rmr.RMR_MS_MSG_TYPE]))
734 func(self, summary, sbuf)
739 # poll configuration file watcher
741 events = self.config_check(timeout=inotify_timeout)
743 with open(self._config_path) as json_file:
744 data = json.load(json_file)
745 self.logger.debug("run: invoking config handler on change event {}".format(event))
746 self._config_handler(self, data)
747 except Exception as error:
748 self.logger.error("run: configuration handler failed: {}".format(error))
751 Thread(target=loop).start()
757 Sets the flag to end the dispatch loop.
760 self.logger.debug("Setting flag to end framework work loop.")
761 self._keep_going = False
764 class Xapp(_BaseXapp):
766 Represents a generic Xapp where the client provides a single function
767 for the framework to call at startup time (instead of providing callback
768 functions by message type). The Xapp writer must implement and provide a
769 function with a loop-forever construct similar to the `run` function in
770 the `RMRXapp` class. That function should poll to retrieve RMR messages
771 and dispatch them appropriately, poll for configuration changes, etc.
776 This function is called when the Xapp class's run method is invoked.
777 The function signature must be just function(self)
778 rmr_port: integer (optional, default is 4562)
779 Initialize RMR to listen on this port
780 rmr_wait_for_ready: boolean (optional, default is True)
781 Wait for RMR to signal ready before starting the dispatch loop
782 use_fake_sdl: boolean (optional, default is False)
783 Use an in-memory store instead of the real SDL service
786 def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False):
791 For the other parameters, see class _BaseXapp.
794 super().__init__(rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl)
795 self._entrypoint = entrypoint
799 This function should be called when the general Xapp is ready to start.
801 self._entrypoint(self)
803 # there is no need for stop currently here (base has, and nothing
804 # special to do here)