import json
import os
import queue
+import time
from threading import Thread
import inotify_simple
from mdclogpy import Logger
from ricxappframe import xapp_rmr
from ricxappframe.rmr import rmr
from ricxappframe.xapp_sdl import SDLWrapper
-
+import requests
# message-type constants
RIC_HEALTH_CHECK_REQ = 100
RIC_HEALTH_CHECK_RESP = 101
# environment variable with path to configuration file
CONFIG_FILE_ENV = "CONFIG_FILE"
+CONFIG_FILE_PATH = "CONFIG_FILE_PATH"
class _BaseXapp:
self._inotify = None
self.logger.warning("__init__: NOT watching any config file")
+ # used for thread control of Registration of Xapp
+ self._keep_registration = True
+
+ # configuration data for xapp registration and deregistration
+ self._config_data = None
+ self._configfile_path = os.environ.get(CONFIG_FILE_PATH, None)
+ if self._configfile_path and os.path.isfile(self._configfile_path):
+ with open(self._configfile_path) as json_file:
+ self._config_data = json.load(json_file)
+ else:
+ self._keep_registration = False
+ self.logger.warning("__init__: Cannot Read config file for xapp Registration")
+
+ Thread(target=self.registerXapp).start()
+
# run the optionally provided user post init
if post_init:
post_init(self)
+ def get_service(self, host, service):
+ """
+ To find the url for connecting to the service
+
+ Parameters
+ ----------
+ host: string
+ defines the hostname in the url
+ service: string
+ defines the servicename in the url
+
+ Returns
+ -------
+ string
+ url for the service
+ """
+ app_namespace = self._config_data.get("APP_NAMESPACE")
+ if app_namespace == "":
+ app_namespace = self._config_data.get("DEFAULT_XAPP_NS")
+ svc = service.format(app_namespace.upper(), host.upper())
+ url = svc.replace("-", "_").split("//")
+
+ if len(url) > 1:
+ return url[1]
+ return ""
+
+ def do_post(self, plt_namespace, url, msg):
+ """
+ registration of the xapp using the url and json msg
+
+ Parameters
+ ----------
+ plt_namespace: string
+ platform namespace where the xapp is running
+ url: string
+ url for xapp registration
+ msg: string
+ json msg containing the xapp details
+
+ Returns
+ -------
+ bool
+ whether or not the xapp is registered
+ """
+ try:
+ request_url = url.format(plt_namespace, plt_namespace)
+ resp = requests.post(request_url, json=msg)
+ self.logger.debug("Post to '{}' done, status : {}".format(request_url, resp.status_code))
+ return resp.status_code == 200 or resp.status_code == 201
+ except requests.exceptions.RequestException as err:
+ self.logger.error("Error : {}".format(err))
+ return format(err)
+ except requests.exceptions.HTTPError as errh:
+ self.logger.error("Http Error: {}".format(errh))
+ return errh
+ except requests.exceptions.ConnectionError as errc:
+ self.logger.error("Error Connecting: {}".format(errc))
+ return errc
+ except requests.exceptions.Timeout as errt:
+ self.logger.error("Timeout Error: {}".format(errt))
+ return errt
+
+ def register(self):
+ """
+ function to registers the xapp
+
+ Returns
+ -------
+ bool
+ whether or not the xapp is registered
+ """
+ hostname = self._config_data.get("hostname")
+ xappname = self._config_data.get("name")
+ xappversion = self._config_data.get("version")
+ pltnamespace = self._config_data.get("PLT_NAMESPACE")
+ if pltnamespace == "":
+ pltnamespace = self._config_data.get("DEFAULT_PLT_NS")
+ http_endpoint = self.get_service(hostname, self._config_data.get("SERVICE_HTTP"))
+ rmr_endpoint = self.get_service(hostname, self._config_data.get("SERVICE_RMR"))
+ if http_endpoint == "" or rmr_endpoint == "":
+ self.logger.warning("Couldn't resolve service endpoints: http_endpoint={} rmr_endpoint={}".format(http_endpoint, rmr_endpoint))
+ return None
+ try:
+ request_string = {
+ "appName": hostname,
+ "httpEndpoint": http_endpoint,
+ "rmrEndpoint": rmr_endpoint,
+ "appInstanceName": xappname,
+ "appVersion": xappversion,
+ "configPath": self._config_data.get("CONFIG_PATH")
+ }
+ request_body = json.dumps(request_string)
+ except TypeError:
+ self.logger.error("Unable to serialize the object")
+ return "Error searializing the object"
+
+ return self.do_post(pltnamespace, self._config_data.get("REGISTER_PATH"), request_body)
+
+ def registerXapp(self):
+ """
+ registers the xapp
+ """
+ while self._keep_registration:
+ time.sleep(5)
+ # checking for rmr/sdl/xapp health
+ healthy = self.healthcheck()
+ if not healthy:
+ self.logger.warning("Application='{}' is not ready yet, waiting ...".format(self._config_data.get("name")))
+ continue
+
+ self.logger.debug("Application='{}' is now up and ready, continue with registration ...".format(self._config_data.get("name")))
+ self.register()
+ self.logger.debug("Registration done, proceeding with startup ...")
+ break
+
+ def deregister(self):
+ """
+ Deregisters the xapp
+
+ Returns
+ -------
+ bool
+ whether or not the xapp is registered
+ """
+ healthy = self.healthcheck()
+ if not healthy:
+ self.logger.error("RMR or SDL or xapp == Not Healthy")
+ return None
+ if self._config_data is None:
+ return None
+ name = self._config_data.get("hostname")
+ xappname = self._config_data.get("name")
+ pltnamespace = self._config_data.get("PLT_NAMESPACE")
+ if pltnamespace == "":
+ pltnamespace = self._config_data.get("PLT_NAMESPACE")
+ try:
+ request_string = {
+ "appName": name,
+ "appInstanceName": xappname,
+ }
+ request_body = json.dumps(request_string)
+ except TypeError:
+ self.logger.error("Error Serializing the object")
+ return "Error serializing the object"
+
+ return self.do_post(pltnamespace, self._config_data.get("DEREGISTER_PATH"), request_body)
+
+ def xapp_shutdown(self):
+ """
+ Deregisters the xapp while shutting down
+ """
+ self.deregister()
+ self.logger.debug("Wait for xapp to get unregistered")
+ time.sleep(10)
+
# Public rmr methods
def rmr_get_messages(self):
TODO: can we register a ctrl-c handler so this gets called on
ctrl-c? Because currently two ctrl-c are needed to stop.
"""
+
+ self.xapp_shutdown()
+
self._rmr_loop.stop()