X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=ricxappframe%2Fxapp_frame.py;h=feb5d664ef9cc4a1f8a34a2451d2ad078204b369;hb=2650675e23003fa2d2dca319a72ba04e92316634;hp=0d37f288c8fd3eb22299c91fb222470490749b43;hpb=1e92812fca9164e9ccd7e884f36e2411f6b013e1;p=ric-plt%2Fxapp-frame-py.git diff --git a/ricxappframe/xapp_frame.py b/ricxappframe/xapp_frame.py index 0d37f28..feb5d66 100644 --- a/ricxappframe/xapp_frame.py +++ b/ricxappframe/xapp_frame.py @@ -22,19 +22,21 @@ should instantiate and/or subclass depending on their needs. import json import os import queue +import time from threading import Thread +from typing import List, Set + import inotify_simple from mdclogpy import Logger + from ricxappframe import xapp_rmr +from ricxappframe.constants import sdl_namespaces +from ricxappframe.entities.rnib.nb_identity_pb2 import NbIdentity +from ricxappframe.entities.rnib.nodeb_info_pb2 import Node from ricxappframe.rmr import rmr +from ricxappframe.util.constants import Constants from ricxappframe.xapp_sdl import SDLWrapper - -# message-type constants -RIC_HEALTH_CHECK_REQ = 100 -RIC_HEALTH_CHECK_RESP = 101 - -# environment variable with path to configuration file -CONFIG_FILE_ENV = "CONFIG_FILE" +import requests class _BaseXapp: @@ -83,11 +85,11 @@ class _BaseXapp: self._mrc = self._rmr_loop.mrc # for convenience # SDL - self._sdl = SDLWrapper(use_fake_sdl) + self.sdl = SDLWrapper(use_fake_sdl) # Config # The environment variable specifies the path to the Xapp config file - self._config_path = os.environ.get(CONFIG_FILE_ENV, None) + self._config_path = os.environ.get(Constants.CONFIG_FILE_ENV, None) if self._config_path and os.path.isfile(self._config_path): self._inotify = inotify_simple.INotify() self._inotify.add_watch(self._config_path, inotify_simple.flags.MODIFY) @@ -96,10 +98,194 @@ class _BaseXapp: self._inotify = None self.logger.warning("__init__: NOT watching any config file") + # used for thread control of Registration of Xapp + self._keep_registration = True + + # configuration data for xapp registration and deregistration + self._config_data = None + if self._config_path and os.path.isfile(self._config_path): + with open(self._config_path) as json_file: + self._config_data = json.load(json_file) + else: + self._keep_registration = False + self.logger.error("__init__: Cannot Read config file for xapp Registration") + self._config_data = {} + + Thread(target=self.registerXapp).start() + # run the optionally provided user post init if post_init: post_init(self) + def get_service(self, host, service): + """ + To find the url for connecting to the service + + Parameters + ---------- + host: string + defines the hostname in the url + service: string + defines the servicename in the url + + Returns + ------- + string + url for the service + """ + app_namespace = self._config_data.get("APP_NAMESPACE") + if app_namespace is None: + app_namespace = Constants.DEFAULT_XAPP_NS + self.logger.debug("service : {} host : {},appnamespace : {}".format(service, host, app_namespace)) + if app_namespace is not None and host is not None: + svc = service.format(app_namespace.upper(), host.upper()) + urlkey = svc.replace("-", "_") + url = os.environ.get(urlkey).split("//") + self.logger.debug("Service urlkey : {} and url: {}".format(urlkey, url)) + if len(url) > 1: + return url[1] + return "" + + def do_post(self, plt_namespace, url, msg): + """ + registration of the xapp using the url and json msg + + Parameters + ---------- + plt_namespace: string + platform namespace where the xapp is running + url: string + url for xapp registration + msg: string + json msg containing the xapp details + + Returns + ------- + bool + whether or not the xapp is registered + """ + if url is None: + self.logger.error("url is empty ") + return False + if plt_namespace is None: + self.logger.error("plt_namespace is empty") + return False + try: + request_url = url.format(plt_namespace, plt_namespace) + resp = requests.post(request_url, json=msg) + self.logger.debug("Post to '{}' done, status : {}".format(request_url, resp.status_code)) + self.logger.debug("Response Text : {}".format(resp.text)) + return resp.status_code == 200 or resp.status_code == 201 + except requests.exceptions.RequestException as err: + self.logger.error("Error : {}".format(err)) + return format(err) + except requests.exceptions.HTTPError as errh: + self.logger.error("Http Error: {}".format(errh)) + return errh + except requests.exceptions.ConnectionError as errc: + self.logger.error("Error Connecting: {}".format(errc)) + return errc + except requests.exceptions.Timeout as errt: + self.logger.error("Timeout Error: {}".format(errt)) + return errt + + def register(self): + """ + function to registers the xapp + + Returns + ------- + bool + whether or not the xapp is registered + """ + hostname = os.environ.get("HOSTNAME") + xappname = self._config_data.get("name") + xappversion = self._config_data.get("version") + pltnamespace = os.environ.get("PLT_NAMESPACE") + if pltnamespace is None: + pltnamespace = Constants.DEFAULT_PLT_NS + self.logger.debug("config details hostname : {} xappname: {} xappversion : {} pltnamespace : {}".format( + hostname, xappname, xappversion, pltnamespace)) + + http_endpoint = self.get_service(hostname, Constants.SERVICE_HTTP) + rmr_endpoint = self.get_service(hostname, Constants.SERVICE_RMR) + if http_endpoint == "" or rmr_endpoint == "": + self.logger.error( + "Couldn't resolve service endpoints: http_endpoint={} rmr_endpoint={}".format(http_endpoint, + rmr_endpoint)) + return False + self.logger.debug( + "config details hostname : {} xappname: {} xappversion : {} pltnamespace : {} http_endpoint : {} rmr_endpoint " + ": {} configpath : {}".format(hostname, xappname, xappversion, pltnamespace, http_endpoint, rmr_endpoint, + self._config_data.get("CONFIG_PATH"))) + request_string = { + "appName": hostname, + "appVersion": xappversion, + "configPath": "", + "appInstanceName": xappname, + "httpEndpoint": http_endpoint, + "rmrEndpoint": rmr_endpoint, + "config": json.dumps(self._config_data) + } + self.logger.info("REQUEST STRING :{}".format(request_string)) + return self.do_post(pltnamespace, Constants.REGISTER_PATH, request_string) + + def registerXapp(self): + """ + registers the xapp + """ + retries = 5 + while self._keep_registration and retries > 0: + time.sleep(2) + retries = retries-1 + # checking for rmr/sdl/xapp health + healthy = self.healthcheck() + if not healthy: + self.logger.warning( + "Application='{}' is not ready yet, waiting ...".format(self._config_data.get("name"))) + continue + + self.logger.debug("Application='{}' is now up and ready, continue with registration ...".format( + self._config_data.get("name"))) + if self.register(): + self.logger.debug("Registration done, proceeding with startup ...") + break + + def deregister(self): + """ + Deregisters the xapp + + Returns + ------- + bool + whether or not the xapp is registered + """ + healthy = self.healthcheck() + if not healthy: + self.logger.error("RMR or SDL or xapp == Not Healthy") + return None + if self._config_data is None: + return None + name = os.environ.get("HOSTNAME") + xappname = self._config_data.get("name") + pltnamespace = os.environ.get("PLT_NAMESPACE") + if pltnamespace is None: + pltnamespace = Constants.DEFAULT_PLT_NS + request_string = { + "appName": name, + "appInstanceName": xappname, + } + + return self.do_post(pltnamespace, Constants.DEREGISTER_PATH, request_string) + + def xapp_shutdown(self): + """ + Deregisters the xapp while shutting down + """ + self.deregister() + self.logger.debug("Wait for xapp to get unregistered") + time.sleep(10) + # Public rmr methods def rmr_get_messages(self): @@ -132,7 +318,8 @@ class _BaseXapp: bool whether or not the send worked after retries attempts """ - sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True, mtype=mtype) + sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True, + mtype=mtype) for _ in range(retries): sbuf = rmr.rmr_send_msg(self._mrc, sbuf) @@ -158,9 +345,8 @@ class _BaseXapp: New payload to set new_mtype: int (optional) New message type (replaces the received message) - retries: int (optional) - Number of times to retry at the application level before - throwing exception RMRFailure + retries: int (optional, default 100) + Number of times to retry at the application level Returns ------- @@ -194,6 +380,9 @@ class _BaseXapp: def sdl_set(self, namespace, key, value, usemsgpack=True): """ + ** Deprecate Warning ** + ** Will be removed in a future function ** + Stores a key-value pair to SDL, optionally serializing the value to bytes using msgpack. @@ -213,10 +402,13 @@ class _BaseXapp: that is serializable by msgpack. If usemsgpack is False, the value must be bytes. """ - self._sdl.set(namespace, key, value, usemsgpack) + self.sdl.set(namespace, key, value, usemsgpack) def sdl_get(self, namespace, key, usemsgpack=True): """ + ** Deprecate Warning ** + ** Will be removed in a future function ** + Gets the value for the specified namespace and key from SDL, optionally deserializing stored bytes using msgpack. @@ -238,10 +430,13 @@ class _BaseXapp: See the usemsgpack parameter for an explanation of the returned value type. Answers None if the key is not found. """ - return self._sdl.get(namespace, key, usemsgpack) + return self.sdl.get(namespace, key, usemsgpack) def sdl_find_and_get(self, namespace, prefix, usemsgpack=True): """ + ** Deprecate Warning ** + ** Will be removed in a future function ** + Gets all key-value pairs in the specified namespace with keys that start with the specified prefix, optionally deserializing stored bytes using msgpack. @@ -266,10 +461,13 @@ class _BaseXapp: but is either a Python object or raw bytes as discussed above. Answers an empty dictionary if no keys matched the prefix. """ - return self._sdl.find_and_get(namespace, prefix, usemsgpack) + return self.sdl.find_and_get(namespace, prefix, usemsgpack) def sdl_delete(self, namespace, key): """ + ** Deprecate Warning ** + ** Will be removed in a future function ** + Deletes the key-value pair with the specified key in the specified namespace. Parameters @@ -279,7 +477,80 @@ class _BaseXapp: key: string SDL key """ - self._sdl.delete(namespace, key) + self.sdl.delete(namespace, key) + + def _get_rnib_info(self, node_type): + """ + Since the difference between get_list_gnb_ids and get_list_enb_ids is only note-type, + this function extracted from the duplicated logic. + + Parameters + ---------- + node_type: string + Type of node. This is EnumDescriptor. + Available node types + - UNKNOWN + - ENG + - GNB + + Returns + ------- + List: (NbIdentity) + + Raises + ------- + SdlTypeError: If function's argument is of an inappropriate type. + NotConnected: If SDL is not connected to the backend data storage. + RejectedByBackend: If backend data storage rejects the request. + BackendError: If the backend data storage fails to process the request. + """ + nbid_strings: Set[bytes] = self.sdl.get_members(sdl_namespaces.E2_MANAGER, node_type, usemsgpack=False) + ret: List[NbIdentity] = [] + for nbid_string in nbid_strings: + nbid = NbIdentity() + nbid.ParseFromString(nbid_string) + ret.append(nbid) + return ret + + def get_list_gnb_ids(self): + """ + Retrieves the list of gNodeb identity entities + + gNodeb information is stored in SDL by E2Manager. Therefore, gNode information + is stored in SDL's `e2Manager` namespace as protobuf serialized. + + Returns + ------- + List: (NbIdentity) + + Raises + ------- + SdlTypeError: If function's argument is of an inappropriate type. + NotConnected: If SDL is not connected to the backend data storage. + RejectedByBackend: If backend data storage rejects the request. + BackendError: If the backend data storage fails to process the request. + """ + return self._get_rnib_info(Node.Type.Name(Node.Type.GNB)) + + def get_list_enb_ids(self): + """ + Retrieves the list of eNodeb identity entities + + eNodeb information is stored in SDL by E2Manager. Therefore, eNode information + is stored in SDL's `e2Manager` namespace as protobuf serialized. + + Returns + ------- + List: (NbIdentity) + + Raises + ------ + SdlTypeError: If function's argument is of an inappropriate type. + NotConnected: If SDL is not connected to the backend data storage. + RejectedByBackend: If backend data storage rejects the request. + BackendError: If the backend data storage fails to process the request. + """ + return self._get_rnib_info(Node.Type.Name(Node.Type.ENB)) # Health @@ -287,7 +558,7 @@ class _BaseXapp: """ this needs to be understood how this is supposed to work """ - return self._rmr_loop.healthcheck() and self._sdl.healthcheck() + return self._rmr_loop.healthcheck() and self.sdl.healthcheck() # Convenience function for discovering config change events @@ -324,6 +595,9 @@ class _BaseXapp: TODO: can we register a ctrl-c handler so this gets called on ctrl-c? Because currently two ctrl-c are needed to stop. """ + + self.xapp_shutdown() + self._rmr_loop.stop() @@ -372,7 +646,8 @@ class RMRXapp(_BaseXapp): its signature should be post_init(self) """ - def __init__(self, default_handler, config_handler=None, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None): + def __init__(self, default_handler, config_handler=None, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, + post_init=None): """ Also see _BaseXapp """ @@ -396,15 +671,16 @@ class RMRXapp(_BaseXapp): def handle_healthcheck(self, summary, sbuf): healthy = self.healthcheck() payload = b"OK\n" if healthy else b"ERROR [RMR or SDL is unhealthy]\n" - self.rmr_rts(sbuf, new_payload=payload, new_mtype=RIC_HEALTH_CHECK_RESP) + self.rmr_rts(sbuf, new_payload=payload, new_mtype=Constants.RIC_HEALTH_CHECK_RESP) self.rmr_free(sbuf) - self.register_callback(handle_healthcheck, RIC_HEALTH_CHECK_REQ) + self.register_callback(handle_healthcheck, Constants.RIC_HEALTH_CHECK_REQ) # define a default configuration-change handler if none was provided. if not config_handler: def handle_config_change(self, config): self.logger.debug("xapp_frame: default config handler invoked") + self._config_handler = handle_config_change # call the config handler at startup if prereqs were met