X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=ricxappframe%2Fxapp_frame.py;h=e939b0e5a0ea57577f20bb62b78d170e248d5e98;hb=e0de19a1c51e607b7e7a4a71a326a09486583539;hp=6b70bb6a944e38f5f9e16c657597ee8b2ea7c75e;hpb=f9cd5cc676355485c2d9c8bc2be22ddad4874382;p=ric-plt%2Fxapp-frame-py.git diff --git a/ricxappframe/xapp_frame.py b/ricxappframe/xapp_frame.py index 6b70bb6..e939b0e 100644 --- a/ricxappframe/xapp_frame.py +++ b/ricxappframe/xapp_frame.py @@ -1,7 +1,3 @@ -""" -Framework for python xapps -Framework here means Xapp classes that can be subclassed -""" # ================================================================================== # Copyright (c) 2020 Nokia # Copyright (c) 2020 AT&T Intellectual Property. @@ -18,62 +14,292 @@ Framework here means Xapp classes that can be subclassed # See the License for the specific language governing permissions and # limitations under the License. # ================================================================================== +""" +This framework for Python Xapps provides classes that Xapp writers +should instantiate and/or subclass depending on their needs. +""" + +import json +import os +import queue +import time from threading import Thread -from ricxappframe import xapp_rmr -from ricxappframe.xapp_sdl import SDLWrapper -from rmr import rmr -from mdclogpy import Logger +from typing import List, Set +import inotify_simple +from mdclogpy import Logger -mdc_logger = Logger(name=__name__) +from ricxappframe import xapp_rmr +from ricxappframe.constants import sdl_namespaces +import ricxappframe.entities.rnib.nodeb_info_pb2 as pb_nbi +import ricxappframe.entities.rnib.cell_pb2 as pb_cell +from ricxappframe.entities.rnib.nb_identity_pb2 import NbIdentity +from ricxappframe.entities.rnib.nodeb_info_pb2 import Node -# Private base class; not for direct client use +from ricxappframe.rmr import rmr +from ricxappframe.util.constants import Constants +from ricxappframe.xapp_sdl import SDLWrapper +import requests class _BaseXapp: """ - Base xapp; not for client use directly + This class initializes RMR, starts a thread that checks for incoming + messages, provisions an SDL object and optionally creates a + config-file watcher. This private base class should not be + instantiated by clients directly, but it defines many public methods + that may be used by clients. + + If environment variable CONFIG_FILE is defined, and that variable + contains a path to an existing file, a watcher is defined to monitor + modifications (writes) to that file using the Linux kernel's inotify + feature. The watcher must be polled by calling method + config_check(). + + Parameters + ---------- + rmr_port: int (optional, default is 4562) + Port on which the RMR library listens for incoming messages. + + rmr_wait_for_ready: bool (optional, default is True) + If this is True, then init waits until RMR is ready to send, + which includes having a valid routing file. This can be set + to False if the client wants to *receive only*. + + use_fake_sdl: bool (optional, default is False) + if this is True, it uses the DBaaS "fake dict backend" instead + of Redis or other backends. Set this to True when developing + an xapp or during unit testing to eliminate the need for DBaaS. + + post_init: function (optional, default is None) + Runs this user-provided function at the end of the init method; + its signature should be post_init(self) """ def __init__(self, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None): """ - Init - - Parameters - ---------- - rmr_port: int - port to listen on - - rmr_wait_for_ready: bool (optional) - if this is True, then init waits until rmr is ready to send, which includes having a valid routing file. - this can be set to False if the client only wants to *receive only* - - use_fake_sdl: bool (optional) - if this is True, it uses dbaas' "fake dict backend" instead of Redis or other backends. - Set this to true when developing your xapp or during unit testing to completely avoid needing a dbaas running or any network at all - - post_init: function (optional) - runs this user provided function after the base xapp is initialized - it's signature should be post_init(self) + Documented in the class comment. """ + # PUBLIC, can be used by xapps using self.(name): + self.logger = Logger(name=__name__) + self._appthread = None # Start rmr rcv thread self._rmr_loop = xapp_rmr.RmrLoop(port=rmr_port, wait_for_ready=rmr_wait_for_ready) self._mrc = self._rmr_loop.mrc # for convenience # SDL - self._sdl = SDLWrapper(use_fake_sdl) + self.sdl = SDLWrapper(use_fake_sdl) + + # Config + # The environment variable specifies the path to the Xapp config file + self._config_path = os.environ.get(Constants.CONFIG_FILE_ENV, None) + if self._config_path and os.path.isfile(self._config_path): + self._inotify = inotify_simple.INotify() + self._inotify.add_watch(self._config_path, inotify_simple.flags.MODIFY) + self.logger.debug("__init__: watching config file {}".format(self._config_path)) + else: + self._inotify = None + self.logger.warning("__init__: NOT watching any config file") + + # used for thread control of Registration of Xapp + self._keep_registration = True + + # configuration data for xapp registration and deregistration + self._config_data = None + if self._config_path and os.path.isfile(self._config_path): + with open(self._config_path) as json_file: + self._config_data = json.load(json_file) + else: + self._keep_registration = False + self.logger.error("__init__: Cannot Read config file for xapp Registration") + self._config_data = {} + + self._appthread = Thread(target=self.registerXapp).start() # run the optionally provided user post init if post_init: post_init(self) + def get_service(self, host, service): + """ + To find the url for connecting to the service + + Parameters + ---------- + host: string + defines the hostname in the url + service: string + defines the servicename in the url + + Returns + ------- + string + url for the service + """ + app_namespace = self._config_data.get("APP_NAMESPACE") + if app_namespace is None: + app_namespace = Constants.DEFAULT_XAPP_NS + self.logger.debug("service : {} host : {},appnamespace : {}".format(service, host, app_namespace)) + if app_namespace is not None and host is not None: + svc = service.format(app_namespace.upper(), host.upper()) + urlkey = svc.replace("-", "_") + url = os.environ.get(urlkey).split("//") + self.logger.debug("Service urlkey : {} and url: {}".format(urlkey, url)) + if len(url) > 1: + return url[1] + return "" + + def do_post(self, plt_namespace, url, msg): + """ + registration of the xapp using the url and json msg + + Parameters + ---------- + plt_namespace: string + platform namespace where the xapp is running + url: string + url for xapp registration + msg: string + json msg containing the xapp details + + Returns + ------- + bool + whether or not the xapp is registered + """ + if url is None: + self.logger.error("url is empty ") + return False + if plt_namespace is None: + self.logger.error("plt_namespace is empty") + return False + try: + request_url = url.format(plt_namespace, plt_namespace) + resp = requests.post(request_url, json=msg) + self.logger.debug("Post to '{}' done, status : {}".format(request_url, resp.status_code)) + self.logger.debug("Response Text : {}".format(resp.text)) + return resp.status_code == 200 or resp.status_code == 201 + except requests.exceptions.RequestException as err: + self.logger.error("Error : {}".format(err)) + return format(err) + except requests.exceptions.HTTPError as errh: + self.logger.error("Http Error: {}".format(errh)) + return errh + except requests.exceptions.ConnectionError as errc: + self.logger.error("Error Connecting: {}".format(errc)) + return errc + except requests.exceptions.Timeout as errt: + self.logger.error("Timeout Error: {}".format(errt)) + return errt + + def register(self): + """ + function to registers the xapp + + Returns + ------- + bool + whether or not the xapp is registered + """ + hostname = os.environ.get("HOSTNAME") + xappname = self._config_data.get("name") + xappversion = self._config_data.get("version") + pltnamespace = os.environ.get("PLT_NAMESPACE") + if pltnamespace is None: + pltnamespace = Constants.DEFAULT_PLT_NS + self.logger.debug("config details hostname : {} xappname: {} xappversion : {} pltnamespace : {}".format( + hostname, xappname, xappversion, pltnamespace)) + + http_endpoint = self.get_service(hostname, Constants.SERVICE_HTTP) + rmr_endpoint = self.get_service(hostname, Constants.SERVICE_RMR) + if http_endpoint == "" or rmr_endpoint == "": + self.logger.error( + "Couldn't resolve service endpoints: http_endpoint={} rmr_endpoint={}".format(http_endpoint, + rmr_endpoint)) + return False + self.logger.debug( + "config details hostname : {} xappname: {} xappversion : {} pltnamespace : {} http_endpoint : {} rmr_endpoint " + ": {} configpath : {}".format(hostname, xappname, xappversion, pltnamespace, http_endpoint, rmr_endpoint, + self._config_data.get("CONFIG_PATH"))) + request_string = { + "appName": hostname, + "appVersion": xappversion, + "configPath": "", + "appInstanceName": xappname, + "httpEndpoint": http_endpoint, + "rmrEndpoint": rmr_endpoint, + "config": json.dumps(self._config_data) + } + self.logger.info("REQUEST STRING :{}".format(request_string)) + return self.do_post(pltnamespace, Constants.REGISTER_PATH, request_string) + + def registerXapp(self): + """ + registers the xapp + """ + retries = 5 + while self._keep_registration and retries > 0: + time.sleep(2) + retries = retries-1 + # checking for rmr/sdl/xapp health + healthy = self.healthcheck() + if not healthy: + self.logger.warning( + "Application='{}' is not ready yet, waiting ...".format(self._config_data.get("name"))) + continue + + self.logger.debug("Application='{}' is now up and ready, continue with registration ...".format( + self._config_data.get("name"))) + if self.register(): + self.logger.debug("Registration done, proceeding with startup ...") + break + + def deregister(self): + """ + Deregisters the xapp + + Returns + ------- + bool + whether or not the xapp is registered + """ + healthy = self.healthcheck() + if not healthy: + self.logger.error("RMR or SDL or xapp == Not Healthy") + return None + if self._config_data is None: + return None + name = os.environ.get("HOSTNAME") + xappname = self._config_data.get("name") + pltnamespace = os.environ.get("PLT_NAMESPACE") + if pltnamespace is None: + pltnamespace = Constants.DEFAULT_PLT_NS + request_string = { + "appName": name, + "appInstanceName": xappname, + } + + return self.do_post(pltnamespace, Constants.DEREGISTER_PATH, request_string) + + def xapp_shutdown(self): + """ + Deregisters the xapp while shutting down + """ + self.deregister() + self.logger.debug("Wait for xapp to get unregistered") + time.sleep(10) + # Public rmr methods def rmr_get_messages(self): """ - returns a generator iterable over all current messages in the queue that have not yet been read by the client xapp + Returns a generator iterable over all items in the queue that + have not yet been read by the client xapp. Each item is a tuple + (S, sbuf) where S is a message summary dict and sbuf is the raw + message. The caller MUST call rmr.rmr_free_msg(sbuf) when + finished with each sbuf to prevent memory leaks! """ while not self._rmr_loop.rcv_queue.empty(): (summary, sbuf) = self._rmr_loop.rcv_queue.get() @@ -97,7 +323,8 @@ class _BaseXapp: bool whether or not the send worked after retries attempts """ - sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True, mtype=mtype) + sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True, + mtype=mtype) for _ in range(retries): sbuf = rmr.rmr_send_msg(self._mrc, sbuf) @@ -110,10 +337,10 @@ class _BaseXapp: def rmr_rts(self, sbuf, new_payload=None, new_mtype=None, retries=100): """ - Allows the xapp to return to sender, possibly adjusting the payload and message type before doing so - - This does NOT free the sbuf for the caller as the caller may wish to perform multiple rts per buffer. - The client needs to free. + Allows the xapp to return to sender, possibly adjusting the + payload and message type before doing so. This does NOT free + the sbuf for the caller as the caller may wish to perform + multiple rts per buffer. The client needs to free. Parameters ---------- @@ -123,8 +350,8 @@ class _BaseXapp: New payload to set new_mtype: int (optional) New message type (replaces the received message) - retries: int (optional) - Number of times to retry at the application level before excepting RMRFailure + retries: int (optional, default 100) + Number of times to retry at the application level Returns ------- @@ -136,13 +363,17 @@ class _BaseXapp: if sbuf.contents.state == 0: return True + self.logger.warning("RTS Failed! Summary: {}".format(rmr.message_summary(sbuf))) return False def rmr_free(self, sbuf): """ - Free an rmr message buffer after use + Frees an rmr message buffer after use + + Note: this does not need to be a class method, self is not + used. However if we break it out as a function we need a home + for it. - Note: this does not need to be a class method, self is not used. However if we break it out as a function we need a home for it. Parameters ---------- sbuf: ctypes c_void_p @@ -150,126 +381,491 @@ class _BaseXapp: """ rmr.rmr_free_msg(sbuf) - # SDL - # NOTE, even though these are passthroughs, the seperate SDL wrapper is useful for other applications like A1. - # Therefore, we don't embed that SDLWrapper functionality here so that it can be instantiated on it's own. + # Convenience (pass-thru) function for invoking SDL. - def sdl_set(self, ns, key, value, usemsgpack=True): + def sdl_set(self, namespace, key, value, usemsgpack=True): """ - set a key + ** Deprecate Warning ** + ** Will be removed in a future function ** + + Stores a key-value pair to SDL, optionally serializing the value + to bytes using msgpack. Parameters ---------- - ns: string - the sdl namespace + namespace: string + SDL namespace key: string - the sdl key + SDL key value: - if usemsgpack is True, value can be anything serializable by msgpack - if usemsgpack is False, value must be bytes - usemsgpack: boolean (optional) - determines whether the value is serialized using msgpack + Object or byte array to store. See the `usemsgpack` parameter. + usemsgpack: boolean (optional, default is True) + Determines whether the value is serialized using msgpack before storing. + If usemsgpack is True, the msgpack function `packb` is invoked + on the value to yield a byte array that is then sent to SDL. + Stated differently, if usemsgpack is True, the value can be anything + that is serializable by msgpack. + If usemsgpack is False, the value must be bytes. """ - self._sdl.set(ns, key, value, usemsgpack) + self.sdl.set(namespace, key, value, usemsgpack) - def sdl_get(self, ns, key, usemsgpack=True): + def sdl_get(self, namespace, key, usemsgpack=True): """ - get a key + ** Deprecate Warning ** + ** Will be removed in a future function ** + + Gets the value for the specified namespace and key from SDL, + optionally deserializing stored bytes using msgpack. Parameters ---------- - ns: string - the sdl namespace + namespace: string + SDL namespace key: string - the sdl key - usemsgpack: boolean (optional) - if usemsgpack is True, the value is deserialized using msgpack - if usemsgpack is False, the value is returned as raw bytes + SDL key + usemsgpack: boolean (optional, default is True) + If usemsgpack is True, the byte array stored by SDL is deserialized + using msgpack to yield the original object that was stored. + If usemsgpack is False, the byte array stored by SDL is returned + without further processing. Returns ------- - None (if not exist) or see above; depends on usemsgpack + Value + See the usemsgpack parameter for an explanation of the returned value type. + Answers None if the key is not found. """ - return self._sdl.get(ns, key, usemsgpack) + return self.sdl.get(namespace, key, usemsgpack) - def sdl_find_and_get(self, ns, prefix, usemsgpack=True): + def sdl_find_and_get(self, namespace, prefix, usemsgpack=True): """ - get all k v pairs that start with prefix + ** Deprecate Warning ** + ** Will be removed in a future function ** + + Gets all key-value pairs in the specified namespace + with keys that start with the specified prefix, + optionally deserializing stored bytes using msgpack. Parameters ---------- - ns: string - the sdl namespace - key: string - the sdl key + nnamespaces: string + SDL namespace prefix: string - the prefix - usemsgpack: boolean (optional) - if usemsgpack is True, the value returned is a dict where each value has been deserialized using msgpack - if usemsgpack is False, the value returned is as a dict mapping keys to raw bytes + the key prefix + usemsgpack: boolean (optional, default is True) + If usemsgpack is True, the byte array stored by SDL is deserialized + using msgpack to yield the original value that was stored. + If usemsgpack is False, the byte array stored by SDL is returned + without further processing. Returns ------- - {} (if no keys match) or see above; depends on usemsgpack + Dictionary of key-value pairs + Each key has the specified prefix. + The value object (its type) depends on the usemsgpack parameter, + but is either a Python object or raw bytes as discussed above. + Answers an empty dictionary if no keys matched the prefix. """ - return self._sdl.find_and_get(ns, prefix, usemsgpack) + return self.sdl.find_and_get(namespace, prefix, usemsgpack) - def sdl_delete(self, ns, key): + def sdl_delete(self, namespace, key): """ - delete a key + ** Deprecate Warning ** + ** Will be removed in a future function ** + + Deletes the key-value pair with the specified key in the specified namespace. Parameters ---------- - ns: string - the sdl namespace + namespace: string + SDL namespace key: string - the sdl key + SDL key + """ + self.sdl.delete(namespace, key) + + def _get_rnib_info(self, node_type): + """ + Since the difference between get_list_gnb_ids and get_list_enb_ids is only node-type, + this function extracted from the duplicated logic. + + Parameters + ---------- + node_type: string + Type of node. This is EnumDescriptor. + Available node types + - UNKNOWN + - ENB + - GNB + + Returns + ------- + List: (NbIdentity) + + Raises + ------ + SdlTypeError: If function's argument is of an inappropriate type. + NotConnected: If SDL is not connected to the backend data storage. + RejectedByBackend: If backend data storage rejects the request. + BackendError: If the backend data storage fails to process the request. + """ + nbid_strings: Set[bytes] = self.sdl.get_members(sdl_namespaces.E2_MANAGER, node_type, usemsgpack=False) + ret: List[NbIdentity] = [] + for nbid_string in nbid_strings: + nbid = NbIdentity() + nbid.ParseFromString(nbid_string) + ret.append(nbid) + return ret + + def get_list_gnb_ids(self): + """ + Retrieves the list of gNodeb identity entities + + gNodeb information is stored in SDL by E2Manager. Therefore, gNode information + is stored in SDL's `e2Manager` namespace as protobuf serialized. + + Returns + ------- + List: (NbIdentity) + + Raises + ------ + SdlTypeError: If function's argument is of an inappropriate type. + NotConnected: If SDL is not connected to the backend data storage. + RejectedByBackend: If backend data storage rejects the request. + BackendError: If the backend data storage fails to process the request. + """ + return self._get_rnib_info(Node.Type.Name(Node.GNB)) + + def get_list_enb_ids(self): + """ + Retrieves the list of eNodeb identity entities + + eNodeb information is stored in SDL by E2Manager. Therefore, eNode information + is stored in SDL's `e2Manager` namespace as protobuf serialized. + + Returns + ------- + List: (NbIdentity) + + Raises + ------ + SdlTypeError: If function's argument is of an inappropriate type. + NotConnected: If SDL is not connected to the backend data storage. + RejectedByBackend: If backend data storage rejects the request. + BackendError: If the backend data storage fails to process the request. + """ + return self._get_rnib_info(Node.Type.Name(Node.ENB)) + + """ + Following RNIB methods are made to be inline of the go-lang based RNIB methods. + Method names are same as in repository: + gerrit.o-ran-sc.org/r/ric-plt/xapp-frame/pkg/rnib + """ + def GetNodeb(self, inventoryName): + """ + Returns nodeb info + In RNIB SDL key is defined following way: RAN: + + Parameters + ---------- + inventoryName: string + + Returns + ------- + NodebInfo() + + Raises + ------ + SdlTypeError: If function's argument is of an inappropriate type. + NotConnected: If SDL is not connected to the backend data storage. + RejectedByBackend: If backend data storage rejects the request. + BackendError: If the backend data storage fails to process the request. + """ + nbid_string: Set[bytes] = self.sdl_get(sdl_namespaces.E2_MANAGER, 'RAN:' + inventoryName, usemsgpack=False) + if nbid_string is not None: + nbinfo = pb_nbi.NodebInfo() + nbinfo.ParseFromString(nbid_string) + return nbinfo + return None + + def GetNodebByGlobalNbId(self, nodeType, plmnId, nbId): + """ + Returns nodeb identity based on type, plmn id and node id + In RNIB SDL key is defined following way: :: + + Parameters + ---------- + nodeType: string + plmnId: string + nbId: string + + Returns + ------- + NbIdentity() + + Raises + ------ + SdlTypeError: If function's argument is of an inappropriate type. + NotConnected: If SDL is not connected to the backend data storage. + RejectedByBackend: If backend data storage rejects the request. + BackendError: If the backend data storage fails to process the request. + """ + nbid_string: Set[bytes] = self.sdl_get(sdl_namespaces.E2_MANAGER, nodeType + ':' + plmnId + ':' + nbId, usemsgpack=False) + if nbid_string is not None: + nbid = NbIdentity() + nbid.ParseFromString(nbid_string) + return nbid + return None + + def GetCellList(self, inventoryName): """ - self._sdl.delete(ns, key) + Returns nodeb served cell list from the saved node data + In RNIB SDL key is defined following way: RAN: - # Health + Parameters + ---------- + nodeType: string + plmnId: string + nbId: string + + Returns + ------- + ServedCellInfo() in case of ENB + ServedNRCell() in case of GNB + + Raises + ------ + SdlTypeError: If function's argument is of an inappropriate type. + NotConnected: If SDL is not connected to the backend data storage. + RejectedByBackend: If backend data storage rejects the request. + BackendError: If the backend data storage fails to process the request. + """ + nodeb = self.GetNodeb(inventoryName) + if nodeb is not None: + if nodeb.HasField('enb'): + return nodeb.enb.served_cells + elif nodeb.HasField('gnb'): + return nodeb.gnb.served_nr_cells + return None + + def GetCellById(self, cell_type, cell_id): + """ + Returns cell info by cell type and id. + In RNIB SDL keys are defined based on the cell type: + ENB type CELL: + GNB type NRCELL: + + Parameters + ---------- + cell_type: string + Available cell types + - ENB + - GNB + + Returns + ------- + Cell() + + Raises + ------ + SdlTypeError: If function's argument is of an inappropriate type. + NotConnected: If SDL is not connected to the backend data storage. + RejectedByBackend: If backend data storage rejects the request. + BackendError: If the backend data storage fails to process the request. + """ + cellstr = None + if cell_type == pb_cell.Cell.Type.Name(pb_cell.Cell.LTE_CELL): + cellstr = 'CELL' + elif cell_type == pb_cell.Cell.Type.Name(pb_cell.Cell.NR_CELL): + cellstr = 'NRCELL' + if cellstr is not None: + cell_string: Set[bytes] = self.sdl_get(sdl_namespaces.E2_MANAGER, cellstr + ':' + cell_id, usemsgpack=False) + if cell_string is not None: + cell = pb_cell.Cell() + cell.ParseFromString(cell_string) + return cell + return None + + def GetListNodebIds(self): + """ + Returns both enb and gnb NbIdentity list + + Returns + ------- + List: (NbIdentity) + + Raises + ------ + SdlTypeError: If function's argument is of an inappropriate type. + NotConnected: If SDL is not connected to the backend data storage. + RejectedByBackend: If backend data storage rejects the request. + BackendError: If the backend data storage fails to process the request. + """ + nlist1 = self._get_rnib_info(Node.Type.Name(Node.ENB)) + nlist2 = self._get_rnib_info(Node.Type.Name(Node.GNB)) + + for n in nlist2: + nlist1.append(n) + return nlist1 + + def GetCell(self, inventoryName, pci): + """ + Returns cell info using pci + In RNIB SDL key is defined following way: PCI:: + + Parameters + ---------- + inventoryName: string + pci: int + + Returns + ------- + Cell() + + Raises + ------ + SdlTypeError: If function's argument is of an inappropriate type. + NotConnected: If SDL is not connected to the backend data storage. + RejectedByBackend: If backend data storage rejects the request. + BackendError: If the backend data storage fails to process the request. + """ + cell_string: Set[bytes] = self.sdl_get(sdl_namespaces.E2_MANAGER, 'PCI:{0:s}:{1:02x}'.format(inventoryName, pci), usemsgpack=False) + if cell_string is not None: + cell = pb_cell.Cell() + cell.ParseFromString(cell_string) + return cell + return None + + def GetRanFunctionDefinition(self, inventoryName, ran_function_oid): + """ + Returns GNB ran function definition list based on the ran_function_oid + In RNIB SDL key is defined following way: RAN: + + Parameters + ---------- + inventoryName: string + ran_function_oid: int + + Returns + ------- + array of ran_function_definition matching to ran_function_oid + + Raises + ------ + SdlTypeError: If function's argument is of an inappropriate type. + NotConnected: If SDL is not connected to the backend data storage. + RejectedByBackend: If backend data storage rejects the request. + BackendError: If the backend data storage fails to process the request. + """ + nodeb = self.GetNodeb(inventoryName) + if nodeb is not None: + if nodeb.HasField('gnb') and nodeb.gnb.ran_functions is not None: + ranFDList = [] + for rf in nodeb.gnb.ran_functions: + if rf.ran_function_oid == ran_function_oid: + ranFDList.append(rf.ran_function_definition) + return ranFDList + return None def healthcheck(self): """ this needs to be understood how this is supposed to work """ - return self._rmr_loop.healthcheck() and self._sdl.healthcheck() + return self._rmr_loop.healthcheck() and self.sdl.healthcheck() + + # Convenience function for discovering config change events + + def config_check(self, timeout=0): + """ + Checks the watcher for configuration-file events. The watcher + prerequisites and event mask are documented in __init__(). + + Parameters + ---------- + timeout: int (optional) + Number of seconds to wait for a configuration-file event, default 0. + + Returns + ------- + List of Events, possibly empty + An event is a tuple with objects wd, mask, cookie and name. + For example:: + + Event(wd=1, mask=1073742080, cookie=0, name='foo') + + """ + if not self._inotify: + return [] + events = self._inotify.read(timeout=timeout) + return list(events) def stop(self): """ - cleans up and stops the xapp rmr thread (currently) - This is critical for unit testing as pytest will never return if the thread is running. + cleans up and stops the xapp rmr thread (currently). This is + critical for unit testing as pytest will never return if the + thread is running. - TODO: can we register a ctrl-c handler so this gets called on ctrl-c? Because currently two ctrl-c are needed to stop + TODO: can we register a ctrl-c handler so this gets called on + ctrl-c? Because currently two ctrl-c are needed to stop. """ + if self._appthread is not None: + self._appthread.join() + + self.xapp_shutdown() + self._rmr_loop.stop() -# Public Classes to subclass (these subclass _BaseXapp) +# Public classes that Xapp writers should instantiate or subclass +# to implement an Xapp. class RMRXapp(_BaseXapp): """ - Represents an xapp that is purely driven by rmr messages (i.e., when messages are received, the xapp does something - When run is called, the xapp framework waits for rmr messages, and calls the client provided consume callback on every one + Represents an Xapp that reacts only to RMR messages; i.e., the Xapp + only performs an action when a message is received. Clients should + invoke the run method, which has a loop that waits for RMR messages + and calls the appropriate client-registered consume callback on each. + + If environment variable CONFIG_FILE is defined, and that variable + contains a path to an existing file, this class polls a watcher + defined on that file to detect file-write events, and invokes a + configuration-change handler on each event. The handler is also + invoked at startup. If no handler function is supplied to the + constructor, this class defines a default handler that only logs a + message. + + Parameters + ---------- + default_handler: function + A function with the signature (summary, sbuf) to be called when a + message type is received for which no other handler is registered. + default_handler argument summary: dict + The RMR message summary, a dict of key-value pairs + default_handler argument sbuf: ctypes c_void_p + Pointer to an RMR message buffer. The user must call free on this when done. + config_handler: function (optional, default is documented above) + A function with the signature (json) to be called at startup and each time + a configuration-file change event is detected. The JSON object is read from + the configuration file, if the prerequisites are met. + config_handler argument json: dict + The contents of the configuration file, parsed as JSON. + rmr_port: integer (optional, default is 4562) + Initialize RMR to listen on this port + rmr_wait_for_ready: boolean (optional, default is True) + Wait for RMR to signal ready before starting the dispatch loop + use_fake_sdl: boolean (optional, default is False) + Use an in-memory store instead of the real SDL service + post_init: function (optional, default None) + Run this function after the app initializes and before the dispatch loop starts; + its signature should be post_init(self) """ - def __init__(self, default_handler, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None): + def __init__(self, default_handler, config_handler=None, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, + post_init=None): """ - Parameters - ---------- - default_handler: function - a function with the signature (summary, sbuf) to be called when a message of type message_type is received - summary: dict - the rmr message summary - sbuf: ctypes c_void_p - Pointer to an rmr message buffer. The user must call free on this when done. - - post_init: function (optional) - optionally runs this function after the app initializes and before the run loop - it's signature should be post_init(self) - - For the other parameters, see _BaseXapp + Also see _BaseXapp """ # init base super().__init__( @@ -278,11 +874,38 @@ class RMRXapp(_BaseXapp): # setup callbacks self._default_handler = default_handler + self._config_handler = config_handler self._dispatch = {} # used for thread control self._keep_going = True + # register a default healthcheck handler + # this default checks that rmr is working and SDL is working + # the user can override this and register their own handler + # if they wish since the "last registered callback wins". + def handle_healthcheck(self, summary, sbuf): + healthy = self.healthcheck() + payload = b"OK\n" if healthy else b"ERROR [RMR or SDL is unhealthy]\n" + self.rmr_rts(sbuf, new_payload=payload, new_mtype=Constants.RIC_HEALTH_CHECK_RESP) + self.rmr_free(sbuf) + + self.register_callback(handle_healthcheck, Constants.RIC_HEALTH_CHECK_REQ) + + # define a default configuration-change handler if none was provided. + if not config_handler: + def handle_config_change(self, config): + self.logger.debug("xapp_frame: default config handler invoked") + + self._config_handler = handle_config_change + + # call the config handler at startup if prereqs were met + if self._inotify: + with open(self._config_path) as json_file: + data = json.load(json_file) + self.logger.debug("run: invoking config handler at start") + self._config_handler(self, data) + def register_callback(self, handler, message_type): """ registers this xapp to call handler(summary, buf) when an rmr message is received of type message_type @@ -290,11 +913,12 @@ class RMRXapp(_BaseXapp): Parameters ---------- handler: function - a function with the signature (summary, sbuf) to be called when a message of type message_type is received - summary: dict - the rmr message summary - sbuf: ctypes c_void_p - Pointer to an rmr message buffer. The user must call free on this when done. + a function with the signature (summary, sbuf) to be called + when a message of type message_type is received + summary: dict + the rmr message summary + sbuf: ctypes c_void_p + Pointer to an rmr message buffer. The user must call free on this when done. message:type: int the message type to look for @@ -303,48 +927,95 @@ class RMRXapp(_BaseXapp): """ self._dispatch[message_type] = handler - def run(self): + def run(self, thread=False, rmr_timeout=5, inotify_timeout=0): """ - This function should be called when the client xapp is ready to wait for their handlers to be called on received messages + This function should be called when the reactive Xapp is ready to start. + After start, the Xapp's handlers will be called on received messages. + + Parameters + ---------- + thread: bool (optional, default is False) + If False, execution is not returned and the framework loops forever. + If True, a thread is started to run the queue read/dispatch loop + and execution is returned to caller; the thread can be stopped + by calling the .stop() method. + + rmr_timeout: integer (optional, default is 5 seconds) + Length of time to wait for an RMR message to arrive. - execution is returned to caller + inotify_timeout: integer (optional, default is 0 seconds) + Length of time to wait for an inotify event to arrive. """ def loop(): while self._keep_going: - if not self._rmr_loop.rcv_queue.empty(): - (summary, sbuf) = self._rmr_loop.rcv_queue.get() + + # poll RMR + try: + (summary, sbuf) = self._rmr_loop.rcv_queue.get(block=True, timeout=rmr_timeout) # dispatch - func = self._dispatch.get(summary["message type"], None) + func = self._dispatch.get(summary[rmr.RMR_MS_MSG_TYPE], None) if not func: func = self._default_handler + self.logger.debug("run: invoking msg handler on type {}".format(summary[rmr.RMR_MS_MSG_TYPE])) func(self, summary, sbuf) - - Thread(target=loop).start() + except queue.Empty: + # the get timed out + pass + + # poll configuration file watcher + try: + events = self.config_check(timeout=inotify_timeout) + for event in events: + with open(self._config_path) as json_file: + data = json.load(json_file) + self.logger.debug("run: invoking config handler on change event {}".format(event)) + self._config_handler(self, data) + except Exception as error: + self.logger.error("run: configuration handler failed: {}".format(error)) + + if thread: + Thread(target=loop).start() + else: + loop() def stop(self): """ - stops the rmr xapp completely. + Sets the flag to end the dispatch loop. """ super().stop() - mdc_logger.debug("Stopping queue reading thread..") + self.logger.debug("Setting flag to end framework work loop.") self._keep_going = False class Xapp(_BaseXapp): """ - Represents an xapp where the client provides a generic function to call, which is mostly likely a loop-forever loop + Represents a generic Xapp where the client provides a single function + for the framework to call at startup time (instead of providing callback + functions by message type). The Xapp writer must implement and provide a + function with a loop-forever construct similar to the `run` function in + the `RMRXapp` class. That function should poll to retrieve RMR messages + and dispatch them appropriately, poll for configuration changes, etc. + + Parameters + ---------- + entrypoint: function + This function is called when the Xapp class's run method is invoked. + The function signature must be just function(self) + rmr_port: integer (optional, default is 4562) + Initialize RMR to listen on this port + rmr_wait_for_ready: boolean (optional, default is True) + Wait for RMR to signal ready before starting the dispatch loop + use_fake_sdl: boolean (optional, default is False) + Use an in-memory store instead of the real SDL service """ def __init__(self, entrypoint, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False): """ Parameters ---------- - entrypoint: function - this function is called when the xapp runs; this is the user code - it's signature should be function(self) - For the other parameters, see _BaseXapp + For the other parameters, see class _BaseXapp. """ # init base super().__init__(rmr_port=rmr_port, rmr_wait_for_ready=rmr_wait_for_ready, use_fake_sdl=use_fake_sdl) @@ -352,8 +1023,9 @@ class Xapp(_BaseXapp): def run(self): """ - This function should be called when the client xapp is ready to start their code + This function should be called when the general Xapp is ready to start. """ self._entrypoint(self) - # there is no need for stop currently here (base has, and nothing special to do here) + # there is no need for stop currently here (base has, and nothing + # special to do here)