Implement healthcheck handler
[ric-plt/xapp-frame-py.git] / tests / test_xapps.py
1 # ==================================================================================
2 #       Copyright (c) 2020 Nokia
3 #       Copyright (c) 2020 AT&T Intellectual Property.
4 #
5 #   Licensed under the Apache License, Version 2.0 (the "License");
6 #   you may not use this file except in compliance with the License.
7 #   You may obtain a copy of the License at
8 #
9 #          http://www.apache.org/licenses/LICENSE-2.0
10 #
11 #   Unless required by applicable law or agreed to in writing, software
12 #   distributed under the License is distributed on an "AS IS" BASIS,
13 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 #   See the License for the specific language governing permissions and
15 #   limitations under the License.
16 # ==================================================================================
17 import json
18 import time
19 from contextlib import suppress
20 from ricxappframe.xapp_frame import Xapp, RMRXapp, RIC_HEALTH_CHECK_REQ, RIC_HEALTH_CHECK_RESP
21
22 rmr_xapp = None
23 rmr_xapp_health = None
24 gen_xapp = None
25
26
27 def test_rmr_init():
28
29     # test variables
30     def_pay = None
31     sixty_pay = None
32
33     # create rmr app
34
35     def default_handler(self, summary, sbuf):
36         nonlocal def_pay
37         def_pay = json.loads(summary["payload"])
38         self.rmr_free(sbuf)
39
40     def sixtythou_handler(self, summary, sbuf):
41         nonlocal sixty_pay
42         sixty_pay = json.loads(summary["payload"])
43         self.rmr_free(sbuf)
44
45     global rmr_xapp
46     rmr_xapp = RMRXapp(default_handler, rmr_port=4564, use_fake_sdl=True)
47     rmr_xapp.register_callback(sixtythou_handler, 60000)
48     rmr_xapp.run(thread=True)  # in unit tests we need to thread here or else execution is not returned!
49
50     time.sleep(1)
51
52     # create a general xapp that will demonstrate some SDL functionality and launch some requests against the rmr xapp
53
54     def entry(self):
55
56         time.sleep(1)
57
58         self.sdl_set("testns", "mykey", 6)
59         assert self.sdl_get("testns", "mykey") == 6
60         assert self.sdl_find_and_get("testns", "myk") == {"mykey": 6}
61         assert self.healthcheck()
62
63         val = json.dumps({"test send 60000": 1}).encode()
64         self.rmr_send(val, 60000)
65
66         val = json.dumps({"test send 60001": 2}).encode()
67         self.rmr_send(val, 60001)
68
69     global gen_xapp
70     gen_xapp = Xapp(entrypoint=entry, use_fake_sdl=True)
71     gen_xapp.run()
72
73     time.sleep(1)
74
75     assert def_pay == {"test send 60001": 2}
76     assert sixty_pay == {"test send 60000": 1}
77
78
79 def test_rmr_healthcheck():
80     # thanos uses the rmr xapp to healthcheck the rmr xapp
81
82     # test variables
83     health_pay = None
84
85     def post_init(self):
86         self.rmr_send(b"", RIC_HEALTH_CHECK_REQ)
87
88     def default_handler(self, summary, sbuf):
89         pass
90
91     global rmr_xapp_health
92     rmr_xapp_health = RMRXapp(default_handler, post_init=post_init, rmr_port=4666, use_fake_sdl=True)
93
94     def health_handler(self, summary, sbuf):
95         nonlocal health_pay
96         health_pay = summary["payload"]
97         self.rmr_free(sbuf)
98
99     rmr_xapp_health.register_callback(health_handler, RIC_HEALTH_CHECK_RESP)
100     rmr_xapp_health.run(thread=True)  # in unit tests we need to thread here or else execution is not returned!
101
102     time.sleep(1)
103
104     assert health_pay == b"OK\n"
105
106
107 def teardown_module():
108     """
109     this is like a "finally"; the name of this function is pytest magic
110     safer to put down here since certain failures above can lead to pytest never returning
111     for example if an exception gets raised before stop is called in any test function above, pytest will hang forever
112     """
113     with suppress(Exception):
114         gen_xapp.stop()
115     with suppress(Exception):
116         rmr_xapp.stop()
117     with suppress(Exception):
118         rmr_xapp_health.stop()