RIC-642 related changes: REST subscription, rnib enhancements, symptomdata, rest...
[ric-plt/xapp-frame-py.git] / tests / test_xapps.py
index a31bd48..20ebe86 100644 (file)
 import json
 import time
 from contextlib import suppress
-from ricxappframe.xapp_frame import Xapp, RMRXapp, RIC_HEALTH_CHECK_REQ, RIC_HEALTH_CHECK_RESP
+
+from ricxappframe.util.constants import Constants
+from ricxappframe.xapp_frame import _BaseXapp, Xapp, RMRXapp
+from ricxappframe.constants import sdl_namespaces
+
+import ricxappframe.entities.rnib.nb_identity_pb2 as pb_nb
 
 rmr_xapp = None
 rmr_xapp_health = None
 gen_xapp = None
+rnib_xapp = None
 
 
 def test_rmr_init():
@@ -85,7 +91,7 @@ def test_rmr_healthcheck():
     health_pay = None
 
     def post_init(self):
-        self.rmr_send(b"", RIC_HEALTH_CHECK_REQ)
+        self.rmr_send(b"", Constants.RIC_HEALTH_CHECK_REQ)
 
     def default_handler(self, summary, sbuf):
         pass
@@ -98,7 +104,7 @@ def test_rmr_healthcheck():
         health_pay = summary["payload"]
         self.rmr_free(sbuf)
 
-    rmr_xapp_health.register_callback(health_handler, RIC_HEALTH_CHECK_RESP)
+    rmr_xapp_health.register_callback(health_handler, Constants.RIC_HEALTH_CHECK_RESP)
     rmr_xapp_health.run(thread=True)  # in unit tests we need to thread here or else execution is not returned!
 
     time.sleep(1)
@@ -106,6 +112,177 @@ def test_rmr_healthcheck():
     assert health_pay == b"OK\n"
 
 
+def test_rnib_get_list_nodeb(rnib_information):
+    global rnib_xapp
+    rnib_xapp = _BaseXapp(rmr_port=4777, rmr_wait_for_ready=False, use_fake_sdl=True)
+
+    # Test there is no rnib information.
+    gnb_list = rnib_xapp.get_list_gnb_ids()
+    enb_list = rnib_xapp.get_list_enb_ids()
+    assert len(gnb_list) == 0
+    assert len(enb_list) == 0
+
+    # Add rnib information directly.
+    for rnib in rnib_information:
+        rnib_xapp.sdl.add_member(sdl_namespaces.E2_MANAGER, "ENB", rnib, usemsgpack=False)
+        rnib_xapp.sdl.add_member(sdl_namespaces.E2_MANAGER, "GNB", rnib, usemsgpack=False)
+
+    gnb_list = rnib_xapp.get_list_gnb_ids()
+    assert len(gnb_list) == len(rnib_information)
+    for gnb in gnb_list:
+        assert gnb.SerializeToString() in rnib_information
+
+    enb_list = rnib_xapp.get_list_enb_ids()
+    assert len(enb_list) == len(rnib_information)
+    for enb in enb_list:
+        assert enb.SerializeToString() in rnib_information
+
+    rnib_xapp.stop()
+
+
+def test_rnib_get_list_all_nodeb(rnib_information):
+    global rnib_xapp
+    rnib_xapp = _BaseXapp(rmr_port=4777, rmr_wait_for_ready=False, use_fake_sdl=True)
+
+    # Add rnib information directly.
+    for rnib in rnib_information:
+        rnib_xapp.sdl.add_member(sdl_namespaces.E2_MANAGER, "GNB", rnib, usemsgpack=False)
+
+    nb_list = rnib_xapp.GetListNodebIds()
+    assert len(nb_list) == 2
+
+    for rnib in rnib_information:
+        rnib_xapp.sdl.add_member(sdl_namespaces.E2_MANAGER, "ENB", rnib, usemsgpack=False)
+
+    nb_list = rnib_xapp.GetListNodebIds()
+    assert len(nb_list) == 4
+
+    rnib_xapp.stop()
+
+
+def test_rnib_get_list_cells(rnib_cellinformation):
+    global rnib_xapp
+
+    rnib_xapp = _BaseXapp(rmr_port=4777, rmr_wait_for_ready=False, use_fake_sdl=True)
+
+    mynb = pb_nb.NbIdentity()
+    mynb.inventory_name = "nodeb_1234"
+    mynb.global_nb_id.plmn_id = "plmn_1234"
+    mynb.global_nb_id.nb_id = "nb_1234"
+    mynb.connection_status = 1
+    rnib_xapp.sdl.add_member(sdl_namespaces.E2_MANAGER, "ENB", mynb.SerializeToString(), usemsgpack=False)
+
+    # Add rnib information directly.
+    for rnib in rnib_cellinformation:
+        rnib_xapp.sdl.add_member(sdl_namespaces.E2_MANAGER, "ENBCELL1", rnib, usemsgpack=False)
+        rnib_xapp.sdl.add_member(sdl_namespaces.E2_MANAGER, "ENBCELL2", rnib, usemsgpack=False)
+    rnib_xapp.stop()
+
+
+def test_rnib_get_nodeb(rnib_helpers):
+    global rnib_xapp
+    rnib_xapp = _BaseXapp(rmr_port=4777, rmr_wait_for_ready=False, use_fake_sdl=True)
+    nb1 = rnib_helpers.createNodebInfo('nodeb_1234', 'GNB', '192.168.1.1', 8088)
+    rnib_xapp.sdl.set(sdl_namespaces.E2_MANAGER, "RAN:" + 'nodeb_1234', nb1.SerializeToString(), usemsgpack=False)
+    nb2 = rnib_helpers.createNodebInfo('nodeb_1234', 'ENB', '192.168.1.2', 8088)
+    rnib_xapp.sdl.set(sdl_namespaces.E2_MANAGER, "RAN:" + 'nodeb_1235', nb2.SerializeToString(), usemsgpack=False)
+
+    gnb = rnib_xapp.GetNodeb('nodeb_1235')
+    assert gnb == nb2
+    gnb = rnib_xapp.GetNodeb('nodeb_1234')
+    assert gnb == nb1
+    gnb = rnib_xapp.GetNodeb('nodeb_1230')
+    assert gnb is None
+    rnib_xapp.stop()
+
+
+def test_rnib_get_cell(rnib_helpers):
+    global rnib_xapp
+    rnib_xapp = _BaseXapp(rmr_port=4777, rmr_wait_for_ready=False, use_fake_sdl=True)
+    c1 = rnib_helpers.createCell('c1234', 8)
+    rnib_xapp.sdl.set(sdl_namespaces.E2_MANAGER, "PCI:c1234:08", c1.SerializeToString(), usemsgpack=False)
+    c2 = rnib_helpers.createCell('c1235', 11)
+    rnib_xapp.sdl.set(sdl_namespaces.E2_MANAGER, "PCI:c1235:0b", c2.SerializeToString(), usemsgpack=False)
+
+    cell = rnib_xapp.GetCell('c1235', 11)
+    assert cell == c2
+    cell = rnib_xapp.GetCell('c1234', 8)
+    assert cell == c1
+    cell = rnib_xapp.GetCell('c1236', 11)
+    assert cell is None
+    rnib_xapp.stop()
+
+
+def test_rnib_get_cell_by_id(rnib_helpers):
+    global rnib_xapp
+    rnib_xapp = _BaseXapp(rmr_port=4777, rmr_wait_for_ready=False, use_fake_sdl=True)
+    c1 = rnib_helpers.createCell('c1234', 8)
+    rnib_xapp.sdl.set(sdl_namespaces.E2_MANAGER, "CELL:c1234", c1.SerializeToString(), usemsgpack=False)
+    c2 = rnib_helpers.createCell('c1235', 11)
+    rnib_xapp.sdl.set(sdl_namespaces.E2_MANAGER, "CELL:c1235", c2.SerializeToString(), usemsgpack=False)
+
+    cell = rnib_xapp.GetCellById('LTE_CELL', 'c1235')
+    assert cell == c2
+    cell = rnib_xapp.GetCellById('LTE_CELL', 'c1234')
+    assert cell == c1
+    cell = rnib_xapp.GetCellById('LTE_CELL', 'c1236')
+    assert cell is None
+    rnib_xapp.stop()
+
+
+def test_rnib_get_cells(rnib_helpers):
+    global rnib_xapp
+    rnib_xapp = _BaseXapp(rmr_port=4777, rmr_wait_for_ready=False, use_fake_sdl=True)
+    nb1 = rnib_helpers.createNodebInfo('nodeb_1234', 'GNB', '192.168.1.1', 8088)
+    rnib_xapp.sdl.set(sdl_namespaces.E2_MANAGER, "RAN:" + 'nodeb_1234', nb1.SerializeToString(), usemsgpack=False)
+    nb2 = rnib_helpers.createNodebInfo('nodeb_1234', 'ENB', '192.168.1.2', 8088)
+    rnib_xapp.sdl.set(sdl_namespaces.E2_MANAGER, "RAN:" + 'nodeb_1235', nb2.SerializeToString(), usemsgpack=False)
+
+    sc = rnib_xapp.GetCellList('nodeb_1235')
+    assert sc == nb2.enb.served_cells
+    sc = rnib_xapp.GetCellList('nodeb_1234')
+    assert sc == nb1.gnb.served_nr_cells
+    sc = rnib_xapp.GetCellList('nodeb_1230')
+    assert sc is None
+    rnib_xapp.stop()
+
+
+def test_rnib_get_global_nodeb(rnib_helpers):
+    global rnib_xapp
+    rnib_xapp = _BaseXapp(rmr_port=4777, rmr_wait_for_ready=False, use_fake_sdl=True)
+    nb1 = rnib_helpers.createNodeb('nodeb_1234', '358', 'nb_1234')
+    rnib_xapp.sdl.set(sdl_namespaces.E2_MANAGER, "GNB:" + '358:' + 'nodeb_1234', nb1.SerializeToString(), usemsgpack=False)
+    nb2 = rnib_helpers.createNodeb('nodeb_1235', '356', 'nb_1235')
+    rnib_xapp.sdl.set(sdl_namespaces.E2_MANAGER, "GNB:" + '356:' + 'nodeb_1235', nb2.SerializeToString(), usemsgpack=False)
+
+    gnb = rnib_xapp.GetNodebByGlobalNbId('GNB', '356', 'nodeb_1235')
+    assert gnb == nb2
+    gnb = rnib_xapp.GetNodebByGlobalNbId('GNB', '358', 'nodeb_1234')
+    assert gnb == nb1
+    gnb = rnib_xapp.GetNodebByGlobalNbId('GNB', '356', 'nodeb_1230')
+    assert gnb is None
+    rnib_xapp.stop()
+
+
+def test_rnib_get_ranfunction(rnib_helpers):
+    global rnib_xapp
+    rnib_xapp = _BaseXapp(rmr_port=4777, rmr_wait_for_ready=False, use_fake_sdl=True)
+    nb1 = rnib_helpers.createNodebInfo('nodeb_1234', 'GNB', '192.168.1.1', 8088)
+    rnib_xapp.sdl.set(sdl_namespaces.E2_MANAGER, "RAN:" + 'nodeb_1234', nb1.SerializeToString(), usemsgpack=False)
+    nb2 = rnib_helpers.createNodebInfo('nodeb_1235', 'GNB', '192.168.1.2', 8088)
+    rnib_xapp.sdl.set(sdl_namespaces.E2_MANAGER, "RAN:" + 'nodeb_1235', nb2.SerializeToString(), usemsgpack=False)
+    nb3 = rnib_helpers.createNodebInfo('nodeb_1236', 'GNB', '192.168.1.2', 8088)
+    rnib_xapp.sdl.set(sdl_namespaces.E2_MANAGER, "RAN:" + 'nodeb_1236', nb3.SerializeToString(), usemsgpack=False)
+    nb4 = rnib_helpers.createNodebInfo('nodeb_1237', 'GNB', '192.168.1.2', 8088)
+    rnib_xapp.sdl.set(sdl_namespaces.E2_MANAGER, "RAN:" + 'nodeb_1237', nb4.SerializeToString(), usemsgpack=False)
+
+    sc = rnib_xapp.GetRanFunctionDefinition('nodeb_1235', "1.3.6.1.4.1.1.2.2.2")
+    assert sc == ['te524367153']
+    sc = rnib_xapp.GetRanFunctionDefinition('nodeb_1235', "1.3.6.1.4.1.1.2.2.5")
+    assert sc == []
+    rnib_xapp.stop()
+
+
 def teardown_module():
     """
     this is like a "finally"; the name of this function is pytest magic