Merge "Add configuration-change API"
[ric-plt/xapp-frame-py.git] / tests / test_xapps.py
1 # ==================================================================================
2 #       Copyright (c) 2020 Nokia
3 #       Copyright (c) 2020 AT&T Intellectual Property.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #          http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 # ==================================================================================
17 import json
18 import time
19 from contextlib import suppress
20 from ricxappframe.xapp_frame import Xapp, RMRXapp, RIC_HEALTH_CHECK_REQ, RIC_HEALTH_CHECK_RESP
21
22 rmr_xapp = None
23 rmr_xapp_health = None
24 gen_xapp = None
25
26
27 def test_rmr_init():
28
29     # test variables
30     def_pay = None
31     sixty_pay = None
32
33     # create rmr app
34
35     def default_handler(self, summary, sbuf):
36         nonlocal def_pay
37         def_pay = json.loads(summary["payload"])
38         self.rmr_free(sbuf)
39
40     def sixtythou_handler(self, summary, sbuf):
41         nonlocal sixty_pay
42         sixty_pay = json.loads(summary["payload"])
43         self.rmr_free(sbuf)
44
45     global rmr_xapp
46     rmr_xapp = RMRXapp(default_handler, rmr_port=4564, use_fake_sdl=True)
47     rmr_xapp.register_callback(sixtythou_handler, 60000)
48     rmr_xapp.run(thread=True)  # in unit tests we need to thread here or else execution is not returned!
49
50     time.sleep(1)
51
52     # create a general xapp that will demonstrate some SDL functionality and launch some requests against the rmr xapp
53
54     def entry(self):
55
56         time.sleep(1)
57
58         self.sdl_set("testns", "mykey", 6)
59         assert self.sdl_get("testns", "mykey") == 6
60         assert self.sdl_find_and_get("testns", "myk") == {"mykey": 6}
61         assert self.healthcheck()
62
63         val = json.dumps({"test send 60000": 1}).encode()
64         self.rmr_send(val, 60000)
65
66         val = json.dumps({"test send 60001": 2}).encode()
67         self.rmr_send(val, 60001)
68
69         self.sdl_delete("testns", "bogus")
70
71     global gen_xapp
72     gen_xapp = Xapp(entrypoint=entry, use_fake_sdl=True)
73     gen_xapp.run()
74
75     time.sleep(1)
76
77     assert def_pay == {"test send 60001": 2}
78     assert sixty_pay == {"test send 60000": 1}
79
80
81 def test_rmr_healthcheck():
82     # thanos uses the rmr xapp to healthcheck the rmr xapp
83
84     # test variables
85     health_pay = None
86
87     def post_init(self):
88         self.rmr_send(b"", RIC_HEALTH_CHECK_REQ)
89
90     def default_handler(self, summary, sbuf):
91         pass
92
93     global rmr_xapp_health
94     rmr_xapp_health = RMRXapp(default_handler, post_init=post_init, rmr_port=4666, use_fake_sdl=True)
95
96     def health_handler(self, summary, sbuf):
97         nonlocal health_pay
98         health_pay = summary["payload"]
99         self.rmr_free(sbuf)
100
101     rmr_xapp_health.register_callback(health_handler, RIC_HEALTH_CHECK_RESP)
102     rmr_xapp_health.run(thread=True)  # in unit tests we need to thread here or else execution is not returned!
103
104     time.sleep(1)
105
106     assert health_pay == b"OK\n"
107
108
109 def teardown_module():
110     """
111     this is like a "finally"; the name of this function is pytest magic
112     safer to put down here since certain failures above can lead to pytest never returning
113     for example if an exception gets raised before stop is called in any test function above, pytest will hang forever
114     """
115     with suppress(Exception):
116         gen_xapp.stop()
117     with suppress(Exception):
118         rmr_xapp.stop()
119     with suppress(Exception):
120         rmr_xapp_health.stop()