1 # ==================================================================================
2 # Copyright (c) 2020 Nokia
3 # Copyright (c) 2020 AT&T Intellectual Property.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ==================================================================================
19 from contextlib import suppress
21 from ricxappframe.util.constants import Constants
22 from ricxappframe.xapp_frame import _BaseXapp, Xapp, RMRXapp
23 from ricxappframe.constants import sdl_namespaces
25 import ricxappframe.entities.rnib.nb_identity_pb2 as pb_nb
28 rmr_xapp_health = None
41 def default_handler(self, summary, sbuf):
43 def_pay = json.loads(summary["payload"])
46 def sixtythou_handler(self, summary, sbuf):
48 sixty_pay = json.loads(summary["payload"])
52 rmr_xapp = RMRXapp(default_handler, rmr_port=4564, use_fake_sdl=True)
53 rmr_xapp.register_callback(sixtythou_handler, 60000)
54 rmr_xapp.run(thread=True) # in unit tests we need to thread here or else execution is not returned!
58 # create a general xapp that will demonstrate some SDL functionality and launch some requests against the rmr xapp
64 self.sdl_set("testns", "mykey", 6)
65 assert self.sdl_get("testns", "mykey") == 6
66 assert self.sdl_find_and_get("testns", "myk") == {"mykey": 6}
67 assert self.healthcheck()
69 val = json.dumps({"test send 60000": 1}).encode()
70 self.rmr_send(val, 60000)
72 val = json.dumps({"test send 60001": 2}).encode()
73 self.rmr_send(val, 60001)
75 self.sdl_delete("testns", "bogus")
78 gen_xapp = Xapp(entrypoint=entry, use_fake_sdl=True)
83 assert def_pay == {"test send 60001": 2}
84 assert sixty_pay == {"test send 60000": 1}
87 def test_rmr_healthcheck():
88 # thanos uses the rmr xapp to healthcheck the rmr xapp
94 self.rmr_send(b"", Constants.RIC_HEALTH_CHECK_REQ)
96 def default_handler(self, summary, sbuf):
99 global rmr_xapp_health
100 rmr_xapp_health = RMRXapp(default_handler, post_init=post_init, rmr_port=4666, use_fake_sdl=True)
102 def health_handler(self, summary, sbuf):
104 health_pay = summary["payload"]
107 rmr_xapp_health.register_callback(health_handler, Constants.RIC_HEALTH_CHECK_RESP)
108 rmr_xapp_health.run(thread=True) # in unit tests we need to thread here or else execution is not returned!
112 assert health_pay == b"OK\n"
115 def test_rnib_get_list_nodeb(rnib_information):
117 rnib_xapp = _BaseXapp(rmr_port=4777, rmr_wait_for_ready=False, use_fake_sdl=True)
119 # Test there is no rnib information.
120 gnb_list = rnib_xapp.get_list_gnb_ids()
121 enb_list = rnib_xapp.get_list_enb_ids()
122 assert len(gnb_list) == 0
123 assert len(enb_list) == 0
125 # Add rnib information directly.
126 for rnib in rnib_information:
127 rnib_xapp.sdl.add_member(sdl_namespaces.E2_MANAGER, "ENB", rnib, usemsgpack=False)
128 rnib_xapp.sdl.add_member(sdl_namespaces.E2_MANAGER, "GNB", rnib, usemsgpack=False)
130 gnb_list = rnib_xapp.get_list_gnb_ids()
131 assert len(gnb_list) == len(rnib_information)
133 assert gnb.SerializeToString() in rnib_information
135 enb_list = rnib_xapp.get_list_enb_ids()
136 assert len(enb_list) == len(rnib_information)
138 assert enb.SerializeToString() in rnib_information
143 def test_rnib_get_list_all_nodeb(rnib_information):
145 rnib_xapp = _BaseXapp(rmr_port=4777, rmr_wait_for_ready=False, use_fake_sdl=True)
147 # Add rnib information directly.
148 for rnib in rnib_information:
149 rnib_xapp.sdl.add_member(sdl_namespaces.E2_MANAGER, "GNB", rnib, usemsgpack=False)
151 nb_list = rnib_xapp.GetListNodebIds()
152 assert len(nb_list) == 2
154 for rnib in rnib_information:
155 rnib_xapp.sdl.add_member(sdl_namespaces.E2_MANAGER, "ENB", rnib, usemsgpack=False)
157 nb_list = rnib_xapp.GetListNodebIds()
158 assert len(nb_list) == 4
163 def test_rnib_get_list_cells(rnib_cellinformation):
166 rnib_xapp = _BaseXapp(rmr_port=4777, rmr_wait_for_ready=False, use_fake_sdl=True)
168 mynb = pb_nb.NbIdentity()
169 mynb.inventory_name = "nodeb_1234"
170 mynb.global_nb_id.plmn_id = "plmn_1234"
171 mynb.global_nb_id.nb_id = "nb_1234"
172 mynb.connection_status = 1
173 rnib_xapp.sdl.add_member(sdl_namespaces.E2_MANAGER, "ENB", mynb.SerializeToString(), usemsgpack=False)
175 # Add rnib information directly.
176 for rnib in rnib_cellinformation:
177 rnib_xapp.sdl.add_member(sdl_namespaces.E2_MANAGER, "ENBCELL1", rnib, usemsgpack=False)
178 rnib_xapp.sdl.add_member(sdl_namespaces.E2_MANAGER, "ENBCELL2", rnib, usemsgpack=False)
182 def test_rnib_get_nodeb(rnib_helpers):
184 rnib_xapp = _BaseXapp(rmr_port=4777, rmr_wait_for_ready=False, use_fake_sdl=True)
185 nb1 = rnib_helpers.createNodebInfo('nodeb_1234', 'GNB', '192.168.1.1', 8088)
186 rnib_xapp.sdl.set(sdl_namespaces.E2_MANAGER, "RAN:" + 'nodeb_1234', nb1.SerializeToString(), usemsgpack=False)
187 nb2 = rnib_helpers.createNodebInfo('nodeb_1234', 'ENB', '192.168.1.2', 8088)
188 rnib_xapp.sdl.set(sdl_namespaces.E2_MANAGER, "RAN:" + 'nodeb_1235', nb2.SerializeToString(), usemsgpack=False)
190 gnb = rnib_xapp.GetNodeb('nodeb_1235')
192 gnb = rnib_xapp.GetNodeb('nodeb_1234')
194 gnb = rnib_xapp.GetNodeb('nodeb_1230')
199 def test_rnib_get_cell(rnib_helpers):
201 rnib_xapp = _BaseXapp(rmr_port=4777, rmr_wait_for_ready=False, use_fake_sdl=True)
202 c1 = rnib_helpers.createCell('c1234', 8)
203 rnib_xapp.sdl.set(sdl_namespaces.E2_MANAGER, "PCI:c1234:08", c1.SerializeToString(), usemsgpack=False)
204 c2 = rnib_helpers.createCell('c1235', 11)
205 rnib_xapp.sdl.set(sdl_namespaces.E2_MANAGER, "PCI:c1235:0b", c2.SerializeToString(), usemsgpack=False)
207 cell = rnib_xapp.GetCell('c1235', 11)
209 cell = rnib_xapp.GetCell('c1234', 8)
211 cell = rnib_xapp.GetCell('c1236', 11)
216 def test_rnib_get_cell_by_id(rnib_helpers):
218 rnib_xapp = _BaseXapp(rmr_port=4777, rmr_wait_for_ready=False, use_fake_sdl=True)
219 c1 = rnib_helpers.createCell('c1234', 8)
220 rnib_xapp.sdl.set(sdl_namespaces.E2_MANAGER, "CELL:c1234", c1.SerializeToString(), usemsgpack=False)
221 c2 = rnib_helpers.createCell('c1235', 11)
222 rnib_xapp.sdl.set(sdl_namespaces.E2_MANAGER, "CELL:c1235", c2.SerializeToString(), usemsgpack=False)
224 cell = rnib_xapp.GetCellById('LTE_CELL', 'c1235')
226 cell = rnib_xapp.GetCellById('LTE_CELL', 'c1234')
228 cell = rnib_xapp.GetCellById('LTE_CELL', 'c1236')
233 def test_rnib_get_cells(rnib_helpers):
235 rnib_xapp = _BaseXapp(rmr_port=4777, rmr_wait_for_ready=False, use_fake_sdl=True)
236 nb1 = rnib_helpers.createNodebInfo('nodeb_1234', 'GNB', '192.168.1.1', 8088)
237 rnib_xapp.sdl.set(sdl_namespaces.E2_MANAGER, "RAN:" + 'nodeb_1234', nb1.SerializeToString(), usemsgpack=False)
238 nb2 = rnib_helpers.createNodebInfo('nodeb_1234', 'ENB', '192.168.1.2', 8088)
239 rnib_xapp.sdl.set(sdl_namespaces.E2_MANAGER, "RAN:" + 'nodeb_1235', nb2.SerializeToString(), usemsgpack=False)
241 sc = rnib_xapp.GetCellList('nodeb_1235')
242 assert sc == nb2.enb.served_cells
243 sc = rnib_xapp.GetCellList('nodeb_1234')
244 assert sc == nb1.gnb.served_nr_cells
245 sc = rnib_xapp.GetCellList('nodeb_1230')
250 def test_rnib_get_global_nodeb(rnib_helpers):
252 rnib_xapp = _BaseXapp(rmr_port=4777, rmr_wait_for_ready=False, use_fake_sdl=True)
253 nb1 = rnib_helpers.createNodeb('nodeb_1234', '358', 'nb_1234')
254 rnib_xapp.sdl.set(sdl_namespaces.E2_MANAGER, "GNB:" + '358:' + 'nodeb_1234', nb1.SerializeToString(), usemsgpack=False)
255 nb2 = rnib_helpers.createNodeb('nodeb_1235', '356', 'nb_1235')
256 rnib_xapp.sdl.set(sdl_namespaces.E2_MANAGER, "GNB:" + '356:' + 'nodeb_1235', nb2.SerializeToString(), usemsgpack=False)
258 gnb = rnib_xapp.GetNodebByGlobalNbId('GNB', '356', 'nodeb_1235')
260 gnb = rnib_xapp.GetNodebByGlobalNbId('GNB', '358', 'nodeb_1234')
262 gnb = rnib_xapp.GetNodebByGlobalNbId('GNB', '356', 'nodeb_1230')
267 def test_rnib_get_ranfunction(rnib_helpers):
269 rnib_xapp = _BaseXapp(rmr_port=4777, rmr_wait_for_ready=False, use_fake_sdl=True)
270 nb1 = rnib_helpers.createNodebInfo('nodeb_1234', 'GNB', '192.168.1.1', 8088)
271 rnib_xapp.sdl.set(sdl_namespaces.E2_MANAGER, "RAN:" + 'nodeb_1234', nb1.SerializeToString(), usemsgpack=False)
272 nb2 = rnib_helpers.createNodebInfo('nodeb_1235', 'GNB', '192.168.1.2', 8088)
273 rnib_xapp.sdl.set(sdl_namespaces.E2_MANAGER, "RAN:" + 'nodeb_1235', nb2.SerializeToString(), usemsgpack=False)
274 nb3 = rnib_helpers.createNodebInfo('nodeb_1236', 'GNB', '192.168.1.2', 8088)
275 rnib_xapp.sdl.set(sdl_namespaces.E2_MANAGER, "RAN:" + 'nodeb_1236', nb3.SerializeToString(), usemsgpack=False)
276 nb4 = rnib_helpers.createNodebInfo('nodeb_1237', 'GNB', '192.168.1.2', 8088)
277 rnib_xapp.sdl.set(sdl_namespaces.E2_MANAGER, "RAN:" + 'nodeb_1237', nb4.SerializeToString(), usemsgpack=False)
279 sc = rnib_xapp.GetRanFunctionDefinition('nodeb_1235', "1.3.6.1.4.1.1.2.2.2")
280 assert sc == ['te524367153']
281 sc = rnib_xapp.GetRanFunctionDefinition('nodeb_1235', "1.3.6.1.4.1.1.2.2.5")
286 def teardown_module():
288 this is like a "finally"; the name of this function is pytest magic
289 safer to put down here since certain failures above can lead to pytest never returning
290 for example if an exception gets raised before stop is called in any test function above, pytest will hang forever
292 with suppress(Exception):
294 with suppress(Exception):
296 with suppress(Exception):
297 rmr_xapp_health.stop()