+ assert def_pay == {"test send 60001": 2}
+ assert sixty_pay == {"test send 60000": 1}
+
+
+def test_rmr_healthcheck():
+ # thanos uses the rmr xapp to healthcheck the rmr xapp
+
+ # test variables
+ health_pay = None
+
+ def post_init(self):
+ self.rmr_send(b"", RIC_HEALTH_CHECK_REQ)
+
+ def default_handler(self, summary, sbuf):
+ pass
+
+ global rmr_xapp_health
+ rmr_xapp_health = RMRXapp(default_handler, post_init=post_init, rmr_port=4666, use_fake_sdl=True)
+
+ def health_handler(self, summary, sbuf):
+ nonlocal health_pay
+ health_pay = summary["payload"]
+ self.rmr_free(sbuf)
+
+ rmr_xapp_health.register_callback(health_handler, RIC_HEALTH_CHECK_RESP)
+ rmr_xapp_health.run(thread=True) # in unit tests we need to thread here or else execution is not returned!
+
+ time.sleep(1)
+
+ assert health_pay == b"OK\n"