import json
import time
from contextlib import suppress
-from ricxappframe.xapp_frame import _BaseXapp, Xapp, RMRXapp, RIC_HEALTH_CHECK_REQ, RIC_HEALTH_CHECK_RESP
+
+from ricxappframe.util.constants import Constants
+from ricxappframe.xapp_frame import _BaseXapp, Xapp, RMRXapp
from ricxappframe.constants import sdl_namespaces
rmr_xapp = None
health_pay = None
def post_init(self):
- self.rmr_send(b"", RIC_HEALTH_CHECK_REQ)
+ self.rmr_send(b"", Constants.RIC_HEALTH_CHECK_REQ)
def default_handler(self, summary, sbuf):
pass
health_pay = summary["payload"]
self.rmr_free(sbuf)
- rmr_xapp_health.register_callback(health_handler, RIC_HEALTH_CHECK_RESP)
+ rmr_xapp_health.register_callback(health_handler, Constants.RIC_HEALTH_CHECK_RESP)
rmr_xapp_health.run(thread=True) # in unit tests we need to thread here or else execution is not returned!
time.sleep(1)