import json
import os
import queue
+import time
from threading import Thread
import inotify_simple
from mdclogpy import Logger
from ricxappframe import xapp_rmr
from ricxappframe.rmr import rmr
from ricxappframe.xapp_sdl import SDLWrapper
-
+import requests
# message-type constants
RIC_HEALTH_CHECK_REQ = 100
RIC_HEALTH_CHECK_RESP = 101
# environment variable with path to configuration file
CONFIG_FILE_ENV = "CONFIG_FILE"
+CONFIG_FILE_PATH = "CONFIG_FILE_PATH"
class _BaseXapp:
self._mrc = self._rmr_loop.mrc # for convenience
# SDL
- self._sdl = SDLWrapper(use_fake_sdl)
+ self.sdl = SDLWrapper(use_fake_sdl)
# Config
# The environment variable specifies the path to the Xapp config file
self._inotify = None
self.logger.warning("__init__: NOT watching any config file")
+ # used for thread control of Registration of Xapp
+ self._keep_registration = True
+
+ # configuration data for xapp registration and deregistration
+ self._config_data = None
+ self._configfile_path = os.environ.get(CONFIG_FILE_PATH, None)
+ if self._configfile_path and os.path.isfile(self._configfile_path):
+ with open(self._configfile_path) as json_file:
+ self._config_data = json.load(json_file)
+ else:
+ self._keep_registration = False
+ self.logger.warning("__init__: Cannot Read config file for xapp Registration")
+
+ Thread(target=self.registerXapp).start()
+
# run the optionally provided user post init
if post_init:
post_init(self)
+ def get_service(self, host, service):
+ """
+ To find the url for connecting to the service
+
+ Parameters
+ ----------
+ host: string
+ defines the hostname in the url
+ service: string
+ defines the servicename in the url
+
+ Returns
+ -------
+ string
+ url for the service
+ """
+ app_namespace = self._config_data.get("APP_NAMESPACE")
+ if app_namespace == "":
+ app_namespace = self._config_data.get("DEFAULT_XAPP_NS")
+ svc = service.format(app_namespace.upper(), host.upper())
+ url = svc.replace("-", "_").split("//")
+
+ if len(url) > 1:
+ return url[1]
+ return ""
+
+ def do_post(self, plt_namespace, url, msg):
+ """
+ registration of the xapp using the url and json msg
+
+ Parameters
+ ----------
+ plt_namespace: string
+ platform namespace where the xapp is running
+ url: string
+ url for xapp registration
+ msg: string
+ json msg containing the xapp details
+
+ Returns
+ -------
+ bool
+ whether or not the xapp is registered
+ """
+ try:
+ request_url = url.format(plt_namespace, plt_namespace)
+ resp = requests.post(request_url, json=msg)
+ self.logger.debug("Post to '{}' done, status : {}".format(request_url, resp.status_code))
+ return resp.status_code == 200 or resp.status_code == 201
+ except requests.exceptions.RequestException as err:
+ self.logger.error("Error : {}".format(err))
+ return format(err)
+ except requests.exceptions.HTTPError as errh:
+ self.logger.error("Http Error: {}".format(errh))
+ return errh
+ except requests.exceptions.ConnectionError as errc:
+ self.logger.error("Error Connecting: {}".format(errc))
+ return errc
+ except requests.exceptions.Timeout as errt:
+ self.logger.error("Timeout Error: {}".format(errt))
+ return errt
+
+ def register(self):
+ """
+ function to registers the xapp
+
+ Returns
+ -------
+ bool
+ whether or not the xapp is registered
+ """
+ hostname = self._config_data.get("hostname")
+ xappname = self._config_data.get("name")
+ xappversion = self._config_data.get("version")
+ pltnamespace = self._config_data.get("PLT_NAMESPACE")
+ if pltnamespace == "":
+ pltnamespace = self._config_data.get("DEFAULT_PLT_NS")
+ http_endpoint = self.get_service(hostname, self._config_data.get("SERVICE_HTTP"))
+ rmr_endpoint = self.get_service(hostname, self._config_data.get("SERVICE_RMR"))
+ if http_endpoint == "" or rmr_endpoint == "":
+ self.logger.warning("Couldn't resolve service endpoints: http_endpoint={} rmr_endpoint={}".format(http_endpoint, rmr_endpoint))
+ return None
+ try:
+ request_string = {
+ "appName": hostname,
+ "httpEndpoint": http_endpoint,
+ "rmrEndpoint": rmr_endpoint,
+ "appInstanceName": xappname,
+ "appVersion": xappversion,
+ "configPath": self._config_data.get("CONFIG_PATH")
+ }
+ request_body = json.dumps(request_string)
+ except TypeError:
+ self.logger.error("Unable to serialize the object")
+ return "Error searializing the object"
+
+ return self.do_post(pltnamespace, self._config_data.get("REGISTER_PATH"), request_body)
+
+ def registerXapp(self):
+ """
+ registers the xapp
+ """
+ while self._keep_registration:
+ time.sleep(5)
+ # checking for rmr/sdl/xapp health
+ healthy = self.healthcheck()
+ if not healthy:
+ self.logger.warning("Application='{}' is not ready yet, waiting ...".format(self._config_data.get("name")))
+ continue
+
+ self.logger.debug("Application='{}' is now up and ready, continue with registration ...".format(self._config_data.get("name")))
+ self.register()
+ self.logger.debug("Registration done, proceeding with startup ...")
+ break
+
+ def deregister(self):
+ """
+ Deregisters the xapp
+
+ Returns
+ -------
+ bool
+ whether or not the xapp is registered
+ """
+ healthy = self.healthcheck()
+ if not healthy:
+ self.logger.error("RMR or SDL or xapp == Not Healthy")
+ return None
+ if self._config_data is None:
+ return None
+ name = self._config_data.get("hostname")
+ xappname = self._config_data.get("name")
+ pltnamespace = self._config_data.get("PLT_NAMESPACE")
+ if pltnamespace == "":
+ pltnamespace = self._config_data.get("PLT_NAMESPACE")
+ try:
+ request_string = {
+ "appName": name,
+ "appInstanceName": xappname,
+ }
+ request_body = json.dumps(request_string)
+ except TypeError:
+ self.logger.error("Error Serializing the object")
+ return "Error serializing the object"
+
+ return self.do_post(pltnamespace, self._config_data.get("DEREGISTER_PATH"), request_body)
+
+ def xapp_shutdown(self):
+ """
+ Deregisters the xapp while shutting down
+ """
+ self.deregister()
+ self.logger.debug("Wait for xapp to get unregistered")
+ time.sleep(10)
+
# Public rmr methods
def rmr_get_messages(self):
def sdl_set(self, namespace, key, value, usemsgpack=True):
"""
+ ** Deprecate Warning **
+ ** Will be removed in a future function **
+
Stores a key-value pair to SDL, optionally serializing the value
to bytes using msgpack.
that is serializable by msgpack.
If usemsgpack is False, the value must be bytes.
"""
- self._sdl.set(namespace, key, value, usemsgpack)
+ self.sdl.set(namespace, key, value, usemsgpack)
def sdl_get(self, namespace, key, usemsgpack=True):
"""
+ ** Deprecate Warning **
+ ** Will be removed in a future function **
+
Gets the value for the specified namespace and key from SDL,
optionally deserializing stored bytes using msgpack.
See the usemsgpack parameter for an explanation of the returned value type.
Answers None if the key is not found.
"""
- return self._sdl.get(namespace, key, usemsgpack)
+ return self.sdl.get(namespace, key, usemsgpack)
def sdl_find_and_get(self, namespace, prefix, usemsgpack=True):
"""
+ ** Deprecate Warning **
+ ** Will be removed in a future function **
+
Gets all key-value pairs in the specified namespace
with keys that start with the specified prefix,
optionally deserializing stored bytes using msgpack.
but is either a Python object or raw bytes as discussed above.
Answers an empty dictionary if no keys matched the prefix.
"""
- return self._sdl.find_and_get(namespace, prefix, usemsgpack)
+ return self.sdl.find_and_get(namespace, prefix, usemsgpack)
def sdl_delete(self, namespace, key):
"""
+ ** Deprecate Warning **
+ ** Will be removed in a future function **
+
Deletes the key-value pair with the specified key in the specified namespace.
Parameters
key: string
SDL key
"""
- self._sdl.delete(namespace, key)
+ self.sdl.delete(namespace, key)
# Health
"""
this needs to be understood how this is supposed to work
"""
- return self._rmr_loop.healthcheck() and self._sdl.healthcheck()
+ return self._rmr_loop.healthcheck() and self.sdl.healthcheck()
# Convenience function for discovering config change events
TODO: can we register a ctrl-c handler so this gets called on
ctrl-c? Because currently two ctrl-c are needed to stop.
"""
+
+ self.xapp_shutdown()
+
self._rmr_loop.stop()