from ricxappframe.entities.rnib.nb_identity_pb2 import NbIdentity
from ricxappframe.entities.rnib.nodeb_info_pb2 import Node
from ricxappframe.rmr import rmr
+from ricxappframe.util.constants import Constants
from ricxappframe.xapp_sdl import SDLWrapper
import requests
-# message-type constants
-RIC_HEALTH_CHECK_REQ = 100
-RIC_HEALTH_CHECK_RESP = 101
-
-# environment variable with path to configuration file
-CONFIG_FILE_ENV = "CONFIG_FILE"
-CONFIG_FILE_PATH = "CONFIG_FILE_PATH"
class _BaseXapp:
# Config
# The environment variable specifies the path to the Xapp config file
- self._config_path = os.environ.get(CONFIG_FILE_ENV, None)
+ self._config_path = os.environ.get(Constants.CONFIG_FILE_ENV, None)
if self._config_path and os.path.isfile(self._config_path):
self._inotify = inotify_simple.INotify()
self._inotify.add_watch(self._config_path, inotify_simple.flags.MODIFY)
# configuration data for xapp registration and deregistration
self._config_data = None
- self._configfile_path = os.environ.get(CONFIG_FILE_PATH, None)
- if self._configfile_path and os.path.isfile(self._configfile_path):
- with open(self._configfile_path) as json_file:
+ if self._config_path and os.path.isfile(self._config_path):
+ with open(self._config_path) as json_file:
self._config_data = json.load(json_file)
else:
self._keep_registration = False
- self.logger.warning("__init__: Cannot Read config file for xapp Registration")
+ self.logger.error("__init__: Cannot Read config file for xapp Registration")
+ self._config_data = {}
Thread(target=self.registerXapp).start()
url for the service
"""
app_namespace = self._config_data.get("APP_NAMESPACE")
- if app_namespace == "":
- app_namespace = self._config_data.get("DEFAULT_XAPP_NS")
- svc = service.format(app_namespace.upper(), host.upper())
- url = svc.replace("-", "_").split("//")
-
- if len(url) > 1:
- return url[1]
+ if app_namespace is None:
+ app_namespace = Constants.DEFAULT_XAPP_NS
+ self.logger.debug("service : {} host : {},appnamespace : {}".format(service, host, app_namespace))
+ if app_namespace is not None and host is not None:
+ svc = service.format(app_namespace.upper(), host.upper())
+ urlkey = svc.replace("-", "_")
+ url = os.environ.get(urlkey).split("//")
+ self.logger.debug("Service urlkey : {} and url: {}".format(urlkey, url))
+ if len(url) > 1:
+ return url[1]
return ""
def do_post(self, plt_namespace, url, msg):
bool
whether or not the xapp is registered
"""
+ if url is None:
+ self.logger.error("url is empty ")
+ return False
+ if plt_namespace is None:
+ self.logger.error("plt_namespace is empty")
+ return False
try:
request_url = url.format(plt_namespace, plt_namespace)
resp = requests.post(request_url, json=msg)
self.logger.debug("Post to '{}' done, status : {}".format(request_url, resp.status_code))
+ self.logger.debug("Response Text : {}".format(resp.text))
return resp.status_code == 200 or resp.status_code == 201
except requests.exceptions.RequestException as err:
self.logger.error("Error : {}".format(err))
bool
whether or not the xapp is registered
"""
- hostname = self._config_data.get("hostname")
+ hostname = os.environ.get("HOSTNAME")
xappname = self._config_data.get("name")
xappversion = self._config_data.get("version")
- pltnamespace = self._config_data.get("PLT_NAMESPACE")
- if pltnamespace == "":
- pltnamespace = self._config_data.get("DEFAULT_PLT_NS")
- http_endpoint = self.get_service(hostname, self._config_data.get("SERVICE_HTTP"))
- rmr_endpoint = self.get_service(hostname, self._config_data.get("SERVICE_RMR"))
+ pltnamespace = os.environ.get("PLT_NAMESPACE")
+ if pltnamespace is None:
+ pltnamespace = Constants.DEFAULT_PLT_NS
+ self.logger.debug("config details hostname : {} xappname: {} xappversion : {} pltnamespace : {}".format(
+ hostname, xappname, xappversion, pltnamespace))
+
+ http_endpoint = self.get_service(hostname, Constants.SERVICE_HTTP)
+ rmr_endpoint = self.get_service(hostname, Constants.SERVICE_RMR)
if http_endpoint == "" or rmr_endpoint == "":
- self.logger.warning("Couldn't resolve service endpoints: http_endpoint={} rmr_endpoint={}".format(http_endpoint, rmr_endpoint))
- return None
- try:
- request_string = {
- "appName": hostname,
- "httpEndpoint": http_endpoint,
- "rmrEndpoint": rmr_endpoint,
- "appInstanceName": xappname,
- "appVersion": xappversion,
- "configPath": self._config_data.get("CONFIG_PATH")
- }
- request_body = json.dumps(request_string)
- except TypeError:
- self.logger.error("Unable to serialize the object")
- return "Error searializing the object"
-
- return self.do_post(pltnamespace, self._config_data.get("REGISTER_PATH"), request_body)
+ self.logger.error(
+ "Couldn't resolve service endpoints: http_endpoint={} rmr_endpoint={}".format(http_endpoint,
+ rmr_endpoint))
+ return False
+ self.logger.debug(
+ "config details hostname : {} xappname: {} xappversion : {} pltnamespace : {} http_endpoint : {} rmr_endpoint "
+ ": {} configpath : {}".format(hostname, xappname, xappversion, pltnamespace, http_endpoint, rmr_endpoint,
+ self._config_data.get("CONFIG_PATH")))
+ request_string = {
+ "appName": hostname,
+ "appVersion": xappversion,
+ "configPath": "",
+ "appInstanceName": xappname,
+ "httpEndpoint": http_endpoint,
+ "rmrEndpoint": rmr_endpoint,
+ "config": json.dumps(self._config_data)
+ }
+ self.logger.info("REQUEST STRING :{}".format(request_string))
+ return self.do_post(pltnamespace, Constants.REGISTER_PATH, request_string)
def registerXapp(self):
"""
registers the xapp
"""
- while self._keep_registration:
- time.sleep(5)
+ retries = 5
+ while self._keep_registration and retries > 0:
+ time.sleep(2)
+ retries = retries-1
# checking for rmr/sdl/xapp health
healthy = self.healthcheck()
if not healthy:
- self.logger.warning("Application='{}' is not ready yet, waiting ...".format(self._config_data.get("name")))
+ self.logger.warning(
+ "Application='{}' is not ready yet, waiting ...".format(self._config_data.get("name")))
continue
- self.logger.debug("Application='{}' is now up and ready, continue with registration ...".format(self._config_data.get("name")))
- self.register()
- self.logger.debug("Registration done, proceeding with startup ...")
- break
+ self.logger.debug("Application='{}' is now up and ready, continue with registration ...".format(
+ self._config_data.get("name")))
+ if self.register():
+ self.logger.debug("Registration done, proceeding with startup ...")
+ break
def deregister(self):
"""
return None
if self._config_data is None:
return None
- name = self._config_data.get("hostname")
+ name = os.environ.get("HOSTNAME")
xappname = self._config_data.get("name")
- pltnamespace = self._config_data.get("PLT_NAMESPACE")
- if pltnamespace == "":
- pltnamespace = self._config_data.get("PLT_NAMESPACE")
- try:
- request_string = {
+ pltnamespace = os.environ.get("PLT_NAMESPACE")
+ if pltnamespace is None:
+ pltnamespace = Constants.DEFAULT_PLT_NS
+ request_string = {
"appName": name,
"appInstanceName": xappname,
- }
- request_body = json.dumps(request_string)
- except TypeError:
- self.logger.error("Error Serializing the object")
- return "Error serializing the object"
+ }
- return self.do_post(pltnamespace, self._config_data.get("DEREGISTER_PATH"), request_body)
+ return self.do_post(pltnamespace, Constants.DEREGISTER_PATH, request_string)
def xapp_shutdown(self):
"""
bool
whether or not the send worked after retries attempts
"""
- sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True, mtype=mtype)
+ sbuf = rmr.rmr_alloc_msg(vctx=self._mrc, size=len(payload), payload=payload, gen_transaction_id=True,
+ mtype=mtype)
for _ in range(retries):
sbuf = rmr.rmr_send_msg(self._mrc, sbuf)
its signature should be post_init(self)
"""
- def __init__(self, default_handler, config_handler=None, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False, post_init=None):
+ def __init__(self, default_handler, config_handler=None, rmr_port=4562, rmr_wait_for_ready=True, use_fake_sdl=False,
+ post_init=None):
"""
Also see _BaseXapp
"""
def handle_healthcheck(self, summary, sbuf):
healthy = self.healthcheck()
payload = b"OK\n" if healthy else b"ERROR [RMR or SDL is unhealthy]\n"
- self.rmr_rts(sbuf, new_payload=payload, new_mtype=RIC_HEALTH_CHECK_RESP)
+ self.rmr_rts(sbuf, new_payload=payload, new_mtype=Constants.RIC_HEALTH_CHECK_RESP)
self.rmr_free(sbuf)
- self.register_callback(handle_healthcheck, RIC_HEALTH_CHECK_REQ)
+ self.register_callback(handle_healthcheck, Constants.RIC_HEALTH_CHECK_REQ)
# define a default configuration-change handler if none was provided.
if not config_handler:
def handle_config_change(self, config):
self.logger.debug("xapp_frame: default config handler invoked")
+
self._config_handler = handle_config_change
# call the config handler at startup if prereqs were met