import json
import time
from contextlib import suppress
-from ricxappframe.xapp_frame import Xapp, RMRXapp, RIC_HEALTH_CHECK_REQ, RIC_HEALTH_CHECK_RESP
+from ricxappframe.xapp_frame import _BaseXapp, Xapp, RMRXapp, RIC_HEALTH_CHECK_REQ, RIC_HEALTH_CHECK_RESP
+from ricxappframe.constants import sdl_namespaces
rmr_xapp = None
rmr_xapp_health = None
gen_xapp = None
+rnib_xapp = None
def test_rmr_init():
val = json.dumps({"test send 60001": 2}).encode()
self.rmr_send(val, 60001)
+ self.sdl_delete("testns", "bogus")
+
global gen_xapp
gen_xapp = Xapp(entrypoint=entry, use_fake_sdl=True)
gen_xapp.run()
assert health_pay == b"OK\n"
+def test_get_list_nodeb(rnib_information):
+ global rnib_xapp
+ rnib_xapp = _BaseXapp(rmr_port=4777, rmr_wait_for_ready=False, use_fake_sdl=True)
+
+ # Test there is no rnib information.
+ gnb_list = rnib_xapp.get_list_gnb_ids()
+ enb_list = rnib_xapp.get_list_enb_ids()
+ assert len(gnb_list) == 0
+ assert len(enb_list) == 0
+
+ # Add rnib information directly.
+ for rnib in rnib_information:
+ rnib_xapp.sdl.add_member(sdl_namespaces.E2_MANAGER, "ENB", rnib, usemsgpack=False)
+ rnib_xapp.sdl.add_member(sdl_namespaces.E2_MANAGER, "GNB", rnib, usemsgpack=False)
+
+ gnb_list = rnib_xapp.get_list_gnb_ids()
+ assert len(gnb_list) == len(rnib_information)
+ for gnb in gnb_list:
+ assert gnb.SerializeToString() in rnib_information
+
+ enb_list = rnib_xapp.get_list_enb_ids()
+ assert len(enb_list) == len(rnib_information)
+ for enb in enb_list:
+ assert enb.SerializeToString() in rnib_information
+
+
def teardown_module():
"""
this is like a "finally"; the name of this function is pytest magic
rmr_xapp.stop()
with suppress(Exception):
rmr_xapp_health.stop()
+ with suppress(Exception):
+ rnib_xapp.stop()