# limitations under the License.
# ==================================================================================
"""
-This framework for Python Xapps provides classes that Xapp writers should
-instantiate and/or subclass depending on their needs.
+This framework for Python Xapps provides classes that Xapp writers
+should instantiate and/or subclass depending on their needs.
"""
import json
import os
import queue
+import time
from threading import Thread
+from typing import List, Set
+
import inotify_simple
from mdclogpy import Logger
+
from ricxappframe import xapp_rmr
+from ricxappframe.constants import sdl_namespaces
+from ricxappframe.entities.rnib.nb_identity_pb2 import NbIdentity
+from ricxappframe.entities.rnib.nodeb_info_pb2 import Node
from ricxappframe.rmr import rmr
from ricxappframe.xapp_sdl import SDLWrapper
-
+import requests
# message-type constants
RIC_HEALTH_CHECK_REQ = 100
RIC_HEALTH_CHECK_RESP = 101
# environment variable with path to configuration file
CONFIG_FILE_ENV = "CONFIG_FILE"
-
-# Private base class; not for direct client use
+CONFIG_FILE_PATH = "CONFIG_FILE_PATH"
class _BaseXapp:
"""
- This base class initializes RMR by starting a thread that checks for
- incoming messages, and provisions an SDL object.
-
- If environment variable CONFIG_FILE_ENV is defined, and that value is a
- path to an existing file, a watcher is defined to monitor modifications
- (writes) to that file using the Linux kernel's inotify feature, and the
- configuration-change handler function is invoked. The watcher can be
- polled by calling method config_check().
+ This class initializes RMR, starts a thread that checks for incoming
+ messages, provisions an SDL object and optionally creates a
+ config-file watcher. This private base class should not be
+ instantiated by clients directly, but it defines many public methods
+ that may be used by clients.
+
+ If environment variable CONFIG_FILE is defined, and that variable
+ contains a path to an existing file, a watcher is defined to monitor
+ modifications (writes) to that file using the Linux kernel's inotify
+ feature. The watcher must be polled by calling method
+ config_check().
Parameters
----------
- rmr_port: int
- port to listen on
-
- rmr_wait_for_ready: bool (optional)
- If this is True, then init waits until rmr is ready to send, which
- includes having a valid routing file. This can be set to
- False if the client only wants to *receive only*.
-
- use_fake_sdl: bool (optional)
- if this is True, it uses the dbaas "fake dict backend" instead
- of Redis or other backends. Set this to true when developing
- an xapp or during unit testing to eliminate the need for DBAAS.
-
- post_init: function (optional)
- Runs this user-provided function after the base xapp is
- initialized; its signature should be post_init(self)
+ rmr_port: int (optional, default is 4562)
+ Port on which the RMR library listens for incoming messages.
+
+ rmr_wait_for_ready: bool (optional, default is True)
+ If this is True, then init waits until RMR is ready to send,
+ which includes having a valid routing file. This can be set
+ to False if the client wants to *receive only*.
+
+ use_fake_sdl: bool (optional, default is False)
+ if this is True, it uses the DBaaS "fake dict backend" instead
+ of Redis or other backends. Set this to True when developing
+ an xapp or during unit testing to eliminate the need for DBaaS.
+
+ post_init: function (optional, default is None)
+ Runs this user-provided function at the end of the init method;
+ its signature should be post_init(self)
"""
def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
self._mrc = self._rmr_loop.mrc # for convenience
# SDL
- self._sdl = SDLWrapper(use_fake_sdl)
+ self.sdl = SDLWrapper(use_fake_sdl)
# Config
# The environment variable specifies the path to the Xapp config file
self._inotify = None
self.logger.warning("__init__: NOT watching any config file")
+ # used for thread control of Registration of Xapp
+ self._keep_registration = True
+
+ # configuration data for xapp registration and deregistration
+ self._config_data = None
+ self._configfile_path = os.environ.get(CONFIG_FILE_PATH, None)
+ if self._configfile_path and os.path.isfile(self._configfile_path):
+ with open(self._configfile_path) as json_file:
+ self._config_data = json.load(json_file)
+ else:
+ self._keep_registration = False
+ self.logger.warning("__init__: Cannot Read config file for xapp Registration")
+
+ Thread(target=self.registerXapp).start()
+
# run the optionally provided user post init
if post_init:
post_init(self)
+ def get_service(self, host, service):
+ """
+ To find the url for connecting to the service
+
+ Parameters
+ ----------
+ host: string
+ defines the hostname in the url
+ service: string
+ defines the servicename in the url
+
+ Returns
+ -------
+ string
+ url for the service
+ """
+ app_namespace = self._config_data.get("APP_NAMESPACE")
+ if app_namespace == "":
+ app_namespace = self._config_data.get("DEFAULT_XAPP_NS")
+ svc = service.format(app_namespace.upper(), host.upper())
+ url = svc.replace("-", "_").split("//")
+
+ if len(url) > 1:
+ return url[1]
+ return ""
+
+ def do_post(self, plt_namespace, url, msg):
+ """
+ registration of the xapp using the url and json msg
+
+ Parameters
+ ----------
+ plt_namespace: string
+ platform namespace where the xapp is running
+ url: string
+ url for xapp registration
+ msg: string
+ json msg containing the xapp details
+
+ Returns
+ -------
+ bool
+ whether or not the xapp is registered
+ """
+ try:
+ request_url = url.format(plt_namespace, plt_namespace)
+ resp = requests.post(request_url, json=msg)
+ self.logger.debug("Post to '{}' done, status : {}".format(request_url, resp.status_code))
+ return resp.status_code == 200 or resp.status_code == 201
+ except requests.exceptions.RequestException as err:
+ self.logger.error("Error : {}".format(err))
+ return format(err)
+ except requests.exceptions.HTTPError as errh:
+ self.logger.error("Http Error: {}".format(errh))
+ return errh
+ except requests.exceptions.ConnectionError as errc:
+ self.logger.error("Error Connecting: {}".format(errc))
+ return errc
+ except requests.exceptions.Timeout as errt:
+ self.logger.error("Timeout Error: {}".format(errt))
+ return errt
+
+ def register(self):
+ """
+ function to registers the xapp
+
+ Returns
+ -------
+ bool
+ whether or not the xapp is registered
+ """
+ hostname = self._config_data.get("hostname")
+ xappname = self._config_data.get("name")
+ xappversion = self._config_data.get("version")
+ pltnamespace = self._config_data.get("PLT_NAMESPACE")
+ if pltnamespace == "":
+ pltnamespace = self._config_data.get("DEFAULT_PLT_NS")
+ http_endpoint = self.get_service(hostname, self._config_data.get("SERVICE_HTTP"))
+ rmr_endpoint = self.get_service(hostname, self._config_data.get("SERVICE_RMR"))
+ if http_endpoint == "" or rmr_endpoint == "":
+ self.logger.warning("Couldn't resolve service endpoints: http_endpoint={} rmr_endpoint={}".format(http_endpoint, rmr_endpoint))
+ return None
+ try:
+ request_string = {
+ "appName": hostname,
+ "httpEndpoint": http_endpoint,
+ "rmrEndpoint": rmr_endpoint,
+ "appInstanceName": xappname,
+ "appVersion": xappversion,
+ "configPath": self._config_data.get("CONFIG_PATH")
+ }
+ request_body = json.dumps(request_string)
+ except TypeError:
+ self.logger.error("Unable to serialize the object")
+ return "Error searializing the object"
+
+ return self.do_post(pltnamespace, self._config_data.get("REGISTER_PATH"), request_body)
+
+ def registerXapp(self):
+ """
+ registers the xapp
+ """
+ while self._keep_registration:
+ time.sleep(5)
+ # checking for rmr/sdl/xapp health
+ healthy = self.healthcheck()
+ if not healthy:
+ self.logger.warning("Application='{}' is not ready yet, waiting ...".format(self._config_data.get("name")))
+ continue
+
+ self.logger.debug("Application='{}' is now up and ready, continue with registration ...".format(self._config_data.get("name")))
+ self.register()
+ self.logger.debug("Registration done, proceeding with startup ...")
+ break
+
+ def deregister(self):
+ """
+ Deregisters the xapp
+
+ Returns
+ -------
+ bool
+ whether or not the xapp is registered
+ """
+ healthy = self.healthcheck()
+ if not healthy:
+ self.logger.error("RMR or SDL or xapp == Not Healthy")
+ return None
+ if self._config_data is None:
+ return None
+ name = self._config_data.get("hostname")
+ xappname = self._config_data.get("name")
+ pltnamespace = self._config_data.get("PLT_NAMESPACE")
+ if pltnamespace == "":
+ pltnamespace = self._config_data.get("PLT_NAMESPACE")
+ try:
+ request_string = {
+ "appName": name,
+ "appInstanceName": xappname,
+ }
+ request_body = json.dumps(request_string)
+ except TypeError:
+ self.logger.error("Error Serializing the object")
+ return "Error serializing the object"
+
+ return self.do_post(pltnamespace, self._config_data.get("DEREGISTER_PATH"), request_body)
+
+ def xapp_shutdown(self):
+ """
+ Deregisters the xapp while shutting down
+ """
+ self.deregister()
+ self.logger.debug("Wait for xapp to get unregistered")
+ time.sleep(10)
+
# Public rmr methods
def rmr_get_messages(self):
New payload to set
new_mtype: int (optional)
New message type (replaces the received message)
- retries: int (optional)
- Number of times to retry at the application level before
- throwing exception RMRFailure
+ retries: int (optional, default 100)
+ Number of times to retry at the application level
Returns
-------
# Convenience (pass-thru) function for invoking SDL.
- def sdl_set(self, ns, key, value, usemsgpack=True):
+ def sdl_set(self, namespace, key, value, usemsgpack=True):
"""
+ ** Deprecate Warning **
+ ** Will be removed in a future function **
+
Stores a key-value pair to SDL, optionally serializing the value
to bytes using msgpack.
Parameters
----------
- ns: string
+ namespace: string
SDL namespace
key: string
SDL key
that is serializable by msgpack.
If usemsgpack is False, the value must be bytes.
"""
- self._sdl.set(ns, key, value, usemsgpack)
+ self.sdl.set(namespace, key, value, usemsgpack)
- def sdl_get(self, ns, key, usemsgpack=True):
+ def sdl_get(self, namespace, key, usemsgpack=True):
"""
+ ** Deprecate Warning **
+ ** Will be removed in a future function **
+
Gets the value for the specified namespace and key from SDL,
optionally deserializing stored bytes using msgpack.
Parameters
----------
- ns: string
+ namespace: string
SDL namespace
key: string
SDL key
See the usemsgpack parameter for an explanation of the returned value type.
Answers None if the key is not found.
"""
- return self._sdl.get(ns, key, usemsgpack)
+ return self.sdl.get(namespace, key, usemsgpack)
- def sdl_find_and_get(self, ns, prefix, usemsgpack=True):
+ def sdl_find_and_get(self, namespace, prefix, usemsgpack=True):
"""
+ ** Deprecate Warning **
+ ** Will be removed in a future function **
+
Gets all key-value pairs in the specified namespace
with keys that start with the specified prefix,
optionally deserializing stored bytes using msgpack.
Parameters
----------
- ns: string
+ nnamespaces: string
SDL namespace
prefix: string
the key prefix
but is either a Python object or raw bytes as discussed above.
Answers an empty dictionary if no keys matched the prefix.
"""
- return self._sdl.find_and_get(ns, prefix, usemsgpack)
+ return self.sdl.find_and_get(namespace, prefix, usemsgpack)
- def sdl_delete(self, ns, key):
+ def sdl_delete(self, namespace, key):
"""
+ ** Deprecate Warning **
+ ** Will be removed in a future function **
+
Deletes the key-value pair with the specified key in the specified namespace.
Parameters
----------
- ns: string
+ namespace: string
SDL namespace
key: string
SDL key
"""
- self._sdl.delete(ns, key)
+ self.sdl.delete(namespace, key)
+
+ def _get_rnib_info(self, node_type):
+ """
+ Since the difference between get_list_gnb_ids and get_list_enb_ids is only note-type,
+ this function extracted from the duplicated logic.
+
+ Parameters
+ ----------
+ node_type: string
+ Type of node. This is EnumDescriptor.
+ Available node types
+ - UNKNOWN
+ - ENG
+ - GNB
+
+ Returns
+ -------
+ List: (NbIdentity)
+
+ Raises
+ -------
+ SdlTypeError: If function's argument is of an inappropriate type.
+ NotConnected: If SDL is not connected to the backend data storage.
+ RejectedByBackend: If backend data storage rejects the request.
+ BackendError: If the backend data storage fails to process the request.
+ """
+ nbid_strings: Set[bytes] = self.sdl.get_members(sdl_namespaces.E2_MANAGER, node_type, usemsgpack=False)
+ ret: List[NbIdentity] = []
+ for nbid_string in nbid_strings:
+ nbid = NbIdentity()
+ nbid.ParseFromString(nbid_string)
+ ret.append(nbid)
+ return ret
+
+ def get_list_gnb_ids(self):
+ """
+ Retrieves the list of gNodeb identity entities
+
+ gNodeb information is stored in SDL by E2Manager. Therefore, gNode information
+ is stored in SDL's `e2Manager` namespace as protobuf serialized.
+
+ Returns
+ -------
+ List: (NbIdentity)
+
+ Raises
+ -------
+ SdlTypeError: If function's argument is of an inappropriate type.
+ NotConnected: If SDL is not connected to the backend data storage.
+ RejectedByBackend: If backend data storage rejects the request.
+ BackendError: If the backend data storage fails to process the request.
+ """
+ return self._get_rnib_info(Node.Type.Name(Node.Type.GNB))
+
+ def get_list_enb_ids(self):
+ """
+ Retrieves the list of eNodeb identity entities
+
+ eNodeb information is stored in SDL by E2Manager. Therefore, eNode information
+ is stored in SDL's `e2Manager` namespace as protobuf serialized.
+
+ Returns
+ -------
+ List: (NbIdentity)
+
+ Raises
+ ------
+ SdlTypeError: If function's argument is of an inappropriate type.
+ NotConnected: If SDL is not connected to the backend data storage.
+ RejectedByBackend: If backend data storage rejects the request.
+ BackendError: If the backend data storage fails to process the request.
+ """
+ return self._get_rnib_info(Node.Type.Name(Node.Type.ENB))
# Health
"""
this needs to be understood how this is supposed to work
"""
- return self._rmr_loop.healthcheck() and self._sdl.healthcheck()
+ return self._rmr_loop.healthcheck() and self.sdl.healthcheck()
# Convenience function for discovering config change events
TODO: can we register a ctrl-c handler so this gets called on
ctrl-c? Because currently two ctrl-c are needed to stop.
"""
+
+ self.xapp_shutdown()
+
self._rmr_loop.stop()
class RMRXapp(_BaseXapp):
"""
- Represents an Xapp that reacts only to RMR messages; i.e., when
- messages are received, the Xapp does something. When run is called,
- the xapp framework waits for RMR messages, and calls the appropriate
- client-registered consume callback on each.
-
- If environment variable CONFIG_FILE_ENV is defined, and that value is a
- path to an existing file, the configuration-change handler is invoked at
- startup and on each configuration-file write event. If no handler is
- supplied, this class defines a default handler that logs each invocation.
+ Represents an Xapp that reacts only to RMR messages; i.e., the Xapp
+ only performs an action when a message is received. Clients should
+ invoke the run method, which has a loop that waits for RMR messages
+ and calls the appropriate client-registered consume callback on each.
+
+ If environment variable CONFIG_FILE is defined, and that variable
+ contains a path to an existing file, this class polls a watcher
+ defined on that file to detect file-write events, and invokes a
+ configuration-change handler on each event. The handler is also
+ invoked at startup. If no handler function is supplied to the
+ constructor, this class defines a default handler that only logs a
+ message.
Parameters
----------
default_handler: function
- A function with the signature (summary, sbuf) to be called
- when a message type is received for which no other handler is registered.
+ A function with the signature (summary, sbuf) to be called when a
+ message type is received for which no other handler is registered.
default_handler argument summary: dict
The RMR message summary, a dict of key-value pairs
default_handler argument sbuf: ctypes c_void_p
# the user can override this and register their own handler
# if they wish since the "last registered callback wins".
def handle_healthcheck(self, summary, sbuf):
- ok = self.healthcheck()
- payload = b"OK\n" if ok else b"ERROR [RMR or SDL is unhealthy]\n"
+ healthy = self.healthcheck()
+ payload = b"OK\n" if healthy else b"ERROR [RMR or SDL is unhealthy]\n"
self.rmr_rts(sbuf, new_payload=payload, new_mtype=RIC_HEALTH_CHECK_RESP)
self.rmr_free(sbuf)