1 # ==================================================================================
2 # Copyright (c) 2020 Nokia
3 # Copyright (c) 2020 AT&T Intellectual Property.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 # ==================================================================================
19 from contextlib import suppress
21 from ricxappframe.util.constants import Constants
22 from ricxappframe.xapp_frame import _BaseXapp, Xapp, RMRXapp
23 from ricxappframe.constants import sdl_namespaces
26 rmr_xapp_health = None
39 def default_handler(self, summary, sbuf):
41 def_pay = json.loads(summary["payload"])
44 def sixtythou_handler(self, summary, sbuf):
46 sixty_pay = json.loads(summary["payload"])
50 rmr_xapp = RMRXapp(default_handler, rmr_port=4564, use_fake_sdl=True)
51 rmr_xapp.register_callback(sixtythou_handler, 60000)
52 rmr_xapp.run(thread=True) # in unit tests we need to thread here or else execution is not returned!
56 # create a general xapp that will demonstrate some SDL functionality and launch some requests against the rmr xapp
62 self.sdl_set("testns", "mykey", 6)
63 assert self.sdl_get("testns", "mykey") == 6
64 assert self.sdl_find_and_get("testns", "myk") == {"mykey": 6}
65 assert self.healthcheck()
67 val = json.dumps({"test send 60000": 1}).encode()
68 self.rmr_send(val, 60000)
70 val = json.dumps({"test send 60001": 2}).encode()
71 self.rmr_send(val, 60001)
73 self.sdl_delete("testns", "bogus")
76 gen_xapp = Xapp(entrypoint=entry, use_fake_sdl=True)
81 assert def_pay == {"test send 60001": 2}
82 assert sixty_pay == {"test send 60000": 1}
85 def test_rmr_healthcheck():
86 # thanos uses the rmr xapp to healthcheck the rmr xapp
92 self.rmr_send(b"", Constants.RIC_HEALTH_CHECK_REQ)
94 def default_handler(self, summary, sbuf):
97 global rmr_xapp_health
98 rmr_xapp_health = RMRXapp(default_handler, post_init=post_init, rmr_port=4666, use_fake_sdl=True)
100 def health_handler(self, summary, sbuf):
102 health_pay = summary["payload"]
105 rmr_xapp_health.register_callback(health_handler, Constants.RIC_HEALTH_CHECK_RESP)
106 rmr_xapp_health.run(thread=True) # in unit tests we need to thread here or else execution is not returned!
110 assert health_pay == b"OK\n"
113 def test_get_list_nodeb(rnib_information):
115 rnib_xapp = _BaseXapp(rmr_port=4777, rmr_wait_for_ready=False, use_fake_sdl=True)
117 # Test there is no rnib information.
118 gnb_list = rnib_xapp.get_list_gnb_ids()
119 enb_list = rnib_xapp.get_list_enb_ids()
120 assert len(gnb_list) == 0
121 assert len(enb_list) == 0
123 # Add rnib information directly.
124 for rnib in rnib_information:
125 rnib_xapp.sdl.add_member(sdl_namespaces.E2_MANAGER, "ENB", rnib, usemsgpack=False)
126 rnib_xapp.sdl.add_member(sdl_namespaces.E2_MANAGER, "GNB", rnib, usemsgpack=False)
128 gnb_list = rnib_xapp.get_list_gnb_ids()
129 assert len(gnb_list) == len(rnib_information)
131 assert gnb.SerializeToString() in rnib_information
133 enb_list = rnib_xapp.get_list_enb_ids()
134 assert len(enb_list) == len(rnib_information)
136 assert enb.SerializeToString() in rnib_information
139 def teardown_module():
141 this is like a "finally"; the name of this function is pytest magic
142 safer to put down here since certain failures above can lead to pytest never returning
143 for example if an exception gets raised before stop is called in any test function above, pytest will hang forever
145 with suppress(Exception):
147 with suppress(Exception):
149 with suppress(Exception):
150 rmr_xapp_health.stop()
151 with suppress(Exception):