import json
import time
from contextlib import suppress
-from ricxappframe.xapp_frame import Xapp, RMRXapp
+from ricxappframe.xapp_frame import Xapp, RMRXapp, RIC_HEALTH_CHECK_REQ, RIC_HEALTH_CHECK_RESP
rmr_xapp = None
+rmr_xapp_health = None
gen_xapp = None
-def test_flow():
+def test_rmr_init():
# test variables
- def_called = 0
- sixty_called = 0
+ def_pay = None
+ sixty_pay = None
# create rmr app
- global rmr_xapp
-
- def post_init(self):
- print("hey")
def default_handler(self, summary, sbuf):
- nonlocal def_called
- def_called += 1
- assert json.loads(summary["payload"]) == {"test send 60001": 1}
+ nonlocal def_pay
+ def_pay = json.loads(summary["payload"])
self.rmr_free(sbuf)
- rmr_xapp = RMRXapp(default_handler, post_init=post_init, rmr_port=4564, rmr_wait_for_ready=False, use_fake_sdl=True)
-
def sixtythou_handler(self, summary, sbuf):
- nonlocal sixty_called
- sixty_called += 1
- assert json.loads(summary["payload"]) == {"test send 60000": 1}
+ nonlocal sixty_pay
+ sixty_pay = json.loads(summary["payload"])
self.rmr_free(sbuf)
+ global rmr_xapp
+ rmr_xapp = RMRXapp(default_handler, rmr_port=4564, use_fake_sdl=True)
rmr_xapp.register_callback(sixtythou_handler, 60000)
rmr_xapp.run(thread=True) # in unit tests we need to thread here or else execution is not returned!
time.sleep(1)
+ # create a general xapp that will demonstrate some SDL functionality and launch some requests against the rmr xapp
+
def entry(self):
+ time.sleep(1)
+
self.sdl_set("testns", "mykey", 6)
assert self.sdl_get("testns", "mykey") == 6
assert self.sdl_find_and_get("testns", "myk") == {"mykey": 6}
self.rmr_send(val, 60001)
global gen_xapp
- gen_xapp = Xapp(entrypoint=entry, rmr_wait_for_ready=False, use_fake_sdl=True)
+ gen_xapp = Xapp(entrypoint=entry, use_fake_sdl=True)
gen_xapp.run()
time.sleep(1)
- assert def_called == 1
- assert sixty_called == 1
+ assert def_pay == {"test send 60001": 2}
+ assert sixty_pay == {"test send 60000": 1}
+
+
+def test_rmr_healthcheck():
+ # thanos uses the rmr xapp to healthcheck the rmr xapp
+
+ # test variables
+ health_pay = None
+
+ def post_init(self):
+ self.rmr_send(b"", RIC_HEALTH_CHECK_REQ)
+
+ def default_handler(self, summary, sbuf):
+ pass
+
+ global rmr_xapp_health
+ rmr_xapp_health = RMRXapp(default_handler, post_init=post_init, rmr_port=4666, use_fake_sdl=True)
+
+ def health_handler(self, summary, sbuf):
+ nonlocal health_pay
+ health_pay = summary["payload"]
+ self.rmr_free(sbuf)
+
+ rmr_xapp_health.register_callback(health_handler, RIC_HEALTH_CHECK_RESP)
+ rmr_xapp_health.run(thread=True) # in unit tests we need to thread here or else execution is not returned!
+
+ time.sleep(1)
+
+ assert health_pay == b"OK\n"
def teardown_module():
gen_xapp.stop()
with suppress(Exception):
rmr_xapp.stop()
+ with suppress(Exception):
+ rmr_xapp_health.stop()