import json
import os
import queue
+import time
from threading import Thread
+from typing import List, Set
+
import inotify_simple
from mdclogpy import Logger
+
from ricxappframe import xapp_rmr
-from ricxappframe.rmr import rmr
-from ricxappframe.xapp_sdl import SDLWrapper
+from ricxappframe.constants import sdl_namespaces
-# message-type constants
-RIC_HEALTH_CHECK_REQ = 100
-RIC_HEALTH_CHECK_RESP = 101
+import ricxappframe.entities.rnib.nodeb_info_pb2 as pb_nbi
+import ricxappframe.entities.rnib.cell_pb2 as pb_cell
+from ricxappframe.entities.rnib.nb_identity_pb2 import NbIdentity
+from ricxappframe.entities.rnib.nodeb_info_pb2 import Node
-# environment variable with path to configuration file
-CONFIG_FILE_ENV = "CONFIG_FILE"
+from ricxappframe.rmr import rmr
+from ricxappframe.util.constants import Constants
+from ricxappframe.xapp_sdl import SDLWrapper
+import requests
class _BaseXapp:
"""
# PUBLIC, can be used by xapps using self.(name):
self.logger = Logger(name=__name__)
+ self._appthread = None
# Start rmr rcv thread
self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready)
# Config
# The environment variable specifies the path to the Xapp config file
- self._config_path = os.environ.get(CONFIG_FILE_ENV, None)
+ self._config_path = os.environ.get(Constants.CONFIG_FILE_ENV, None)
if self._config_path and os.path.isfile(self._config_path):
self._inotify = inotify_simple.INotify()
self._inotify.add_watch(self._config_path, inotify_simple.flags.MODIFY)
self._inotify = None
self.logger.warning("__init__: NOT watching any config file")
+ # used for thread control of Registration of Xapp
+ self._keep_registration = True
+
+ # configuration data for xapp registration and deregistration
+ self._config_data = None
+ if self._config_path and os.path.isfile(self._config_path):
+ with open(self._config_path) as json_file:
+ self._config_data = json.load(json_file)
+ else:
+ self._keep_registration = False
+ self.logger.error("__init__: Cannot Read config file for xapp Registration")
+ self._config_data = {}
+
+ self._appthread = Thread(target=self.registerXapp).start()
+
# run the optionally provided user post init
if post_init:
post_init(self)
+ def get_service(self, host, service):
+ """
+ To find the url for connecting to the service
+
+ Parameters
+ ----------
+ host: string
+ defines the hostname in the url
+ service: string
+ defines the servicename in the url
+
+ Returns
+ -------
+ string
+ url for the service
+ """
+ app_namespace = self._config_data.get("APP_NAMESPACE")
+ if app_namespace is None:
+ app_namespace = Constants.DEFAULT_XAPP_NS
+ self.logger.debug("service : {} host : {},appnamespace : {}".format(service, host, app_namespace))
+ if app_namespace is not None and host is not None:
+ svc = service.format(app_namespace.upper(), host.upper())
+ urlkey = svc.replace("-", "_")
+ url = os.environ.get(urlkey).split("//")
+ self.logger.debug("Service urlkey : {} and url: {}".format(urlkey, url))
+ if len(url) > 1:
+ return url[1]
+ return ""
+
+ def do_post(self, plt_namespace, url, msg):
+ """
+ registration of the xapp using the url and json msg
+
+ Parameters
+ ----------
+ plt_namespace: string
+ platform namespace where the xapp is running
+ url: string
+ url for xapp registration
+ msg: string
+ json msg containing the xapp details
+
+ Returns
+ -------
+ bool
+ whether or not the xapp is registered
+ """
+ if url is None:
+ self.logger.error("url is empty ")
+ return False
+ if plt_namespace is None:
+ self.logger.error("plt_namespace is empty")
+ return False
+ try:
+ request_url = url.format(plt_namespace, plt_namespace)
+ resp = requests.post(request_url, json=msg)
+ self.logger.debug("Post to '{}' done, status : {}".format(request_url, resp.status_code))
+ self.logger.debug("Response Text : {}".format(resp.text))
+ return resp.status_code == 200 or resp.status_code == 201
+ except requests.exceptions.RequestException as err:
+ self.logger.error("Error : {}".format(err))
+ return format(err)
+ except requests.exceptions.HTTPError as errh:
+ self.logger.error("Http Error: {}".format(errh))
+ return errh
+ except requests.exceptions.ConnectionError as errc:
+ self.logger.error("Error Connecting: {}".format(errc))
+ return errc
+ except requests.exceptions.Timeout as errt:
+ self.logger.error("Timeout Error: {}".format(errt))
+ return errt
+
+ def register(self):
+ """
+ function to registers the xapp
+
+ Returns
+ -------
+ bool
+ whether or not the xapp is registered
+ """
+ hostname = os.environ.get("HOSTNAME")
+ xappname = self._config_data.get("name")
+ xappversion = self._config_data.get("version")
+ pltnamespace = os.environ.get("PLT_NAMESPACE")
+ if pltnamespace is None:
+ pltnamespace = Constants.DEFAULT_PLT_NS
+ self.logger.debug("config details hostname : {} xappname: {} xappversion : {} pltnamespace : {}".format(
+ hostname, xappname, xappversion, pltnamespace))
+
+ http_endpoint = self.get_service(hostname, Constants.SERVICE_HTTP)
+ rmr_endpoint = self.get_service(hostname, Constants.SERVICE_RMR)
+ if http_endpoint == "" or rmr_endpoint == "":
+ self.logger.error(
+ "Couldn't resolve service endpoints: http_endpoint={} rmr_endpoint={}".format(http_endpoint,
+ rmr_endpoint))
+ return False
+ self.logger.debug(
+ "config details hostname : {} xappname: {} xappversion : {} pltnamespace : {} http_endpoint : {} rmr_endpoint "
+ ": {} configpath : {}".format(hostname, xappname, xappversion, pltnamespace, http_endpoint, rmr_endpoint,
+ self._config_data.get("CONFIG_PATH")))
+ request_string = {
+ "appName": hostname,
+ "appVersion": xappversion,
+ "configPath": "",
+ "appInstanceName": xappname,
+ "httpEndpoint": http_endpoint,
+ "rmrEndpoint": rmr_endpoint,
+ "config": json.dumps(self._config_data)
+ }
+ self.logger.info("REQUEST STRING :{}".format(request_string))
+ return self.do_post(pltnamespace, Constants.REGISTER_PATH, request_string)
+
+ def registerXapp(self):
+ """
+ registers the xapp
+ """
+ retries = 5
+ while self._keep_registration and retries > 0:
+ time.sleep(2)
+ retries = retries-1
+ # checking for rmr/sdl/xapp health
+ healthy = self.healthcheck()
+ if not healthy:
+ self.logger.warning(
+ "Application='{}' is not ready yet, waiting ...".format(self._config_data.get("name")))
+ continue
+
+ self.logger.debug("Application='{}' is now up and ready, continue with registration ...".format(
+ self._config_data.get("name")))
+ if self.register():
+ self.logger.debug("Registration done, proceeding with startup ...")
+ break
+
+ def deregister(self):
+ """
+ Deregisters the xapp
+
+ Returns
+ -------
+ bool
+ whether or not the xapp is registered
+ """
+ healthy = self.healthcheck()
+ if not healthy:
+ self.logger.error("RMR or SDL or xapp == Not Healthy")
+ return None
+ if self._config_data is None:
+ return None
+ name = os.environ.get("HOSTNAME")
+ xappname = self._config_data.get("name")
+ pltnamespace = os.environ.get("PLT_NAMESPACE")
+ if pltnamespace is None:
+ pltnamespace = Constants.DEFAULT_PLT_NS
+ request_string = {
+ "appName": name,
+ "appInstanceName": xappname,
+ }
+
+ return self.do_post(pltnamespace, Constants.DEREGISTER_PATH, request_string)
+
+ def xapp_shutdown(self):
+ """
+ Deregisters the xapp while shutting down
+ """
+ self.deregister()
+ self.logger.debug("Wait for xapp to get unregistered")
+ time.sleep(10)
+
# Public rmr methods
def rmr_get_messages(self):
bool
whether or not the send worked after retries attempts
"""
- sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True, mtype=mtype)
+ sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True,
+ mtype=mtype)
for _ in range(retries):
sbuf = rmr.rmr_send_msg(self._mrc, sbuf)
"""
self.sdl.delete(namespace, key)
- # Health
+ def _get_rnib_info(self, node_type):
+ """
+ Since the difference between get_list_gnb_ids and get_list_enb_ids is only node-type,
+ this function extracted from the duplicated logic.
+
+ Parameters
+ ----------
+ node_type: string
+ Type of node. This is EnumDescriptor.
+ Available node types
+ - UNKNOWN
+ - ENB
+ - GNB
+
+ Returns
+ -------
+ List: (NbIdentity)
+
+ Raises
+ ------
+ SdlTypeError: If function's argument is of an inappropriate type.
+ NotConnected: If SDL is not connected to the backend data storage.
+ RejectedByBackend: If backend data storage rejects the request.
+ BackendError: If the backend data storage fails to process the request.
+ """
+ nbid_strings: Set[bytes] = self.sdl.get_members(sdl_namespaces.E2_MANAGER, node_type, usemsgpack=False)
+ ret: List[NbIdentity] = []
+ for nbid_string in nbid_strings:
+ nbid = NbIdentity()
+ nbid.ParseFromString(nbid_string)
+ ret.append(nbid)
+ return ret
+
+ def get_list_gnb_ids(self):
+ """
+ Retrieves the list of gNodeb identity entities
+
+ gNodeb information is stored in SDL by E2Manager. Therefore, gNode information
+ is stored in SDL's `e2Manager` namespace as protobuf serialized.
+
+ Returns
+ -------
+ List: (NbIdentity)
+
+ Raises
+ ------
+ SdlTypeError: If function's argument is of an inappropriate type.
+ NotConnected: If SDL is not connected to the backend data storage.
+ RejectedByBackend: If backend data storage rejects the request.
+ BackendError: If the backend data storage fails to process the request.
+ """
+ return self._get_rnib_info(Node.Type.Name(Node.GNB))
+
+ def get_list_enb_ids(self):
+ """
+ Retrieves the list of eNodeb identity entities
+
+ eNodeb information is stored in SDL by E2Manager. Therefore, eNode information
+ is stored in SDL's `e2Manager` namespace as protobuf serialized.
+
+ Returns
+ -------
+ List: (NbIdentity)
+
+ Raises
+ ------
+ SdlTypeError: If function's argument is of an inappropriate type.
+ NotConnected: If SDL is not connected to the backend data storage.
+ RejectedByBackend: If backend data storage rejects the request.
+ BackendError: If the backend data storage fails to process the request.
+ """
+ return self._get_rnib_info(Node.Type.Name(Node.ENB))
+
+ """
+ Following RNIB methods are made to be inline of the go-lang based RNIB methods.
+ Method names are same as in repository:
+ gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/rnib
+ """
+ def GetNodeb(self, inventoryName):
+ """
+ Returns nodeb info
+ In RNIB SDL key is defined following way: RAN:<inventoryName>
+
+ Parameters
+ ----------
+ inventoryName: string
+
+ Returns
+ -------
+ NodebInfo()
+
+ Raises
+ ------
+ SdlTypeError: If function's argument is of an inappropriate type.
+ NotConnected: If SDL is not connected to the backend data storage.
+ RejectedByBackend: If backend data storage rejects the request.
+ BackendError: If the backend data storage fails to process the request.
+ """
+ nbid_string: Set[bytes] = self.sdl_get(sdl_namespaces.E2_MANAGER, 'RAN:' + inventoryName, usemsgpack=False)
+ if nbid_string is not None:
+ nbinfo = pb_nbi.NodebInfo()
+ nbinfo.ParseFromString(nbid_string)
+ return nbinfo
+ return None
+
+ def GetNodebByGlobalNbId(self, nodeType, plmnId, nbId):
+ """
+ Returns nodeb identity based on type, plmn id and node id
+ In RNIB SDL key is defined following way: <nodeType>:<plmnId>:<nbId>
+
+ Parameters
+ ----------
+ nodeType: string
+ plmnId: string
+ nbId: string
+
+ Returns
+ -------
+ NbIdentity()
+
+ Raises
+ ------
+ SdlTypeError: If function's argument is of an inappropriate type.
+ NotConnected: If SDL is not connected to the backend data storage.
+ RejectedByBackend: If backend data storage rejects the request.
+ BackendError: If the backend data storage fails to process the request.
+ """
+ nbid_string: Set[bytes] = self.sdl_get(sdl_namespaces.E2_MANAGER, nodeType + ':' + plmnId + ':' + nbId, usemsgpack=False)
+ if nbid_string is not None:
+ nbid = NbIdentity()
+ nbid.ParseFromString(nbid_string)
+ return nbid
+ return None
+
+ def GetCellList(self, inventoryName):
+ """
+ Returns nodeb served cell list from the saved node data
+ In RNIB SDL key is defined following way: RAN:<inventoryName>
+
+ Parameters
+ ----------
+ nodeType: string
+ plmnId: string
+ nbId: string
+
+ Returns
+ -------
+ ServedCellInfo() in case of ENB
+ ServedNRCell() in case of GNB
+
+ Raises
+ ------
+ SdlTypeError: If function's argument is of an inappropriate type.
+ NotConnected: If SDL is not connected to the backend data storage.
+ RejectedByBackend: If backend data storage rejects the request.
+ BackendError: If the backend data storage fails to process the request.
+ """
+ nodeb = self.GetNodeb(inventoryName)
+ if nodeb is not None:
+ if nodeb.HasField('enb'):
+ return nodeb.enb.served_cells
+ elif nodeb.HasField('gnb'):
+ return nodeb.gnb.served_nr_cells
+ return None
+
+ def GetCellById(self, cell_type, cell_id):
+ """
+ Returns cell info by cell type and id.
+ In RNIB SDL keys are defined based on the cell type:
+ ENB type CELL:<cell_id>
+ GNB type NRCELL:<cell_id>
+
+ Parameters
+ ----------
+ cell_type: string
+ Available cell types
+ - ENB
+ - GNB
+
+ Returns
+ -------
+ Cell()
+
+ Raises
+ ------
+ SdlTypeError: If function's argument is of an inappropriate type.
+ NotConnected: If SDL is not connected to the backend data storage.
+ RejectedByBackend: If backend data storage rejects the request.
+ BackendError: If the backend data storage fails to process the request.
+ """
+ cellstr = None
+ if cell_type == pb_cell.Cell.Type.Name(pb_cell.Cell.LTE_CELL):
+ cellstr = 'CELL'
+ elif cell_type == pb_cell.Cell.Type.Name(pb_cell.Cell.NR_CELL):
+ cellstr = 'NRCELL'
+ if cellstr is not None:
+ cell_string: Set[bytes] = self.sdl_get(sdl_namespaces.E2_MANAGER, cellstr + ':' + cell_id, usemsgpack=False)
+ if cell_string is not None:
+ cell = pb_cell.Cell()
+ cell.ParseFromString(cell_string)
+ return cell
+ return None
+
+ def GetListNodebIds(self):
+ """
+ Returns both enb and gnb NbIdentity list
+
+ Returns
+ -------
+ List: (NbIdentity)
+
+ Raises
+ ------
+ SdlTypeError: If function's argument is of an inappropriate type.
+ NotConnected: If SDL is not connected to the backend data storage.
+ RejectedByBackend: If backend data storage rejects the request.
+ BackendError: If the backend data storage fails to process the request.
+ """
+ nlist1 = self._get_rnib_info(Node.Type.Name(Node.ENB))
+ nlist2 = self._get_rnib_info(Node.Type.Name(Node.GNB))
+
+ for n in nlist2:
+ nlist1.append(n)
+ return nlist1
+
+ def GetCell(self, inventoryName, pci):
+ """
+ Returns cell info using pci
+ In RNIB SDL key is defined following way: PCI:<inventoryName>:<pci hex val>
+
+ Parameters
+ ----------
+ inventoryName: string
+ pci: int
+
+ Returns
+ -------
+ Cell()
+
+ Raises
+ ------
+ SdlTypeError: If function's argument is of an inappropriate type.
+ NotConnected: If SDL is not connected to the backend data storage.
+ RejectedByBackend: If backend data storage rejects the request.
+ BackendError: If the backend data storage fails to process the request.
+ """
+ cell_string: Set[bytes] = self.sdl_get(sdl_namespaces.E2_MANAGER, 'PCI:{0:s}:{1:02x}'.format(inventoryName, pci), usemsgpack=False)
+ if cell_string is not None:
+ cell = pb_cell.Cell()
+ cell.ParseFromString(cell_string)
+ return cell
+ return None
+
+ def GetRanFunctionDefinition(self, inventoryName, ran_function_oid):
+ """
+ Returns GNB ran function definition list based on the ran_function_oid
+ In RNIB SDL key is defined following way: RAN:<inventoryName>
+
+ Parameters
+ ----------
+ inventoryName: string
+ ran_function_oid: int
+
+ Returns
+ -------
+ array of ran_function_definition matching to ran_function_oid
+
+ Raises
+ ------
+ SdlTypeError: If function's argument is of an inappropriate type.
+ NotConnected: If SDL is not connected to the backend data storage.
+ RejectedByBackend: If backend data storage rejects the request.
+ BackendError: If the backend data storage fails to process the request.
+ """
+ nodeb = self.GetNodeb(inventoryName)
+ if nodeb is not None:
+ if nodeb.HasField('gnb') and nodeb.gnb.ran_functions is not None:
+ ranFDList = []
+ for rf in nodeb.gnb.ran_functions:
+ if rf.ran_function_oid == ran_function_oid:
+ ranFDList.append(rf.ran_function_definition)
+ return ranFDList
+ return None
def healthcheck(self):
"""
TODO: can we register a ctrl-c handler so this gets called on
ctrl-c? Because currently two ctrl-c are needed to stop.
"""
+ if self._appthread is not None:
+ self._appthread.join()
+
+ self.xapp_shutdown()
+
self._rmr_loop.stop()
its signature should be post_init(self)
"""
- def __init__(self, default_handler, config_handler=None, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
+ def __init__(self, default_handler, config_handler=None, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False,
+ post_init=None):
"""
Also see _BaseXapp
"""
def handle_healthcheck(self, summary, sbuf):
healthy = self.healthcheck()
payload = b"OK\n" if healthy else b"ERROR [RMR or SDL is unhealthy]\n"
- self.rmr_rts(sbuf, new_payload=payload, new_mtype=RIC_HEALTH_CHECK_RESP)
+ self.rmr_rts(sbuf, new_payload=payload, new_mtype=Constants.RIC_HEALTH_CHECK_RESP)
self.rmr_free(sbuf)
- self.register_callback(handle_healthcheck, RIC_HEALTH_CHECK_REQ)
+ self.register_callback(handle_healthcheck, Constants.RIC_HEALTH_CHECK_REQ)
# define a default configuration-change handler if none was provided.
if not config_handler:
def handle_config_change(self, config):
self.logger.debug("xapp_frame: default config handler invoked")
+
self._config_handler = handle_config_change
# call the config handler at startup if prereqs were met