--- /dev/null
+import json
+import ricxappframe.xapp_subscribe
+import ricxappframe.subsclient
+import ricxappframe.xapp_rest
+
+
+class MockApiClientResponse:
+ def __init__(self, data, reason, status):
+ self.data = data
+ self.status = status
+ self.reason = reason
+
+
+class MockApiClientLoader(ricxappframe.subsclient.ApiClient):
+ def __init__(self, config):
+ return
+
+ def __del__(self):
+ return
+
+ def request(self, method, url, headers, body=None):
+ if method == 'POST':
+ data = (
+ '{ "SubscriptionResponse": {'
+ '"SubscriptionId": "testing",'
+ '"SubscriptionInstances": [{'
+ '"XappEventInstanceID": "16253",'
+ '"E2EventInstanceID": "1241"'
+ '}]'
+ '}}'
+ )
+ return MockApiClientResponse(data, 'OK', 200)
+ elif method == 'DELETE':
+ return MockApiClientResponse(None, 'OK', 204)
+ elif method == 'GET':
+ data = (
+ '{ "SubscriptionList": [{'
+ '"SubscriptionId": "12345",'
+ '"Meid": "gnb123456",'
+ '"ClientEndpoint": ["127.0.0.1:4056"],'
+ '"SubscriptionInstances": [{'
+ '"XappEventInstanceID": "16253",'
+ '"E2EventInstanceID": "1241"'
+ '}]'
+ '}]'
+ '}'
+ )
+ return MockApiClientResponse(data, 'OK', 200)
+
+
+def test_subscribe(monkeypatch):
+
+ monkeypatch.setattr("ricxappframe.subsclient.ApiClient", MockApiClientLoader)
+
+ subscriber = ricxappframe.xapp_subscribe.NewSubscriber("http://127.0.0.1:8088/ric/v1", local_port=9099)
+ # setup the subscription
+ subEndPoint = subscriber.SubscriptionParamsClientEndpoint("localhost", 8091, 4061)
+ assert subEndPoint.to_dict() == {'host': 'localhost', 'http_port': 8091, 'rmr_port': 4061}
+
+ subsDirective = subscriber.SubscriptionParamsE2SubscriptionDirectives(10, 2, False)
+ assert subsDirective.to_dict() == {'e2_retry_count': 2, 'e2_timeout_timer_value': 10, 'rmr_routing_needed': False}
+
+ subsequentAction = subscriber.SubsequentAction("continue", "w10ms")
+ assert subsequentAction.to_dict() == {'subsequent_action_type': 'continue', 'time_to_wait': 'w10ms'}
+
+ actionDefinitionList = subscriber.ActionToBeSetup(1, "policy", (11, 12, 13, 14, 15), subsequentAction)
+ assert actionDefinitionList.to_dict() == {
+ 'action_definition': (11, 12, 13, 14, 15),
+ 'action_id': 1, 'action_type': 'policy',
+ 'subsequent_action': {
+ 'subsequent_action_type': 'continue',
+ 'time_to_wait': 'w10ms'
+ }
+ }
+
+ subsDetail = subscriber.SubscriptionDetail(12110, (1, 2, 3, 4, 5), actionDefinitionList)
+ assert subsDetail.to_dict() == {
+ 'action_to_be_setup_list': {
+ 'action_definition': (11, 12, 13, 14, 15),
+ 'action_id': 1, 'action_type': 'policy',
+ 'subsequent_action': {
+ 'subsequent_action_type': 'continue',
+ 'time_to_wait': 'w10ms'
+ }
+ },
+ 'event_triggers': (1, 2, 3, 4, 5),
+ 'xapp_event_instance_id': 12110
+ }
+
+ # subscription data ready, make the subscription
+ subObj = subscriber.SubscriptionParams("sub10", subEndPoint, "gnb123456", 1231, subsDirective, subsDetail)
+ assert subObj.to_dict() == {
+ 'client_endpoint': {
+ 'host': 'localhost', 'http_port': 8091, 'rmr_port': 4061
+ },
+ 'e2_subscription_directives': {
+ 'e2_retry_count': 2, 'e2_timeout_timer_value': 10,
+ 'rmr_routing_needed': False
+ },
+ 'meid': 'gnb123456', 'ran_function_id': 1231, 'subscription_details': {
+ 'action_to_be_setup_list': {
+ 'action_definition': (11, 12, 13, 14, 15),
+ 'action_id': 1, 'action_type': 'policy', 'subsequent_action': {
+ 'subsequent_action_type': 'continue', 'time_to_wait': 'w10ms'
+ }
+ },
+ 'event_triggers': (1, 2, 3, 4, 5), 'xapp_event_instance_id': 12110
+ },
+ 'subscription_id': 'sub10'
+ }
+
+ data, resp, status = subscriber.Subscribe(subObj)
+ assert json.loads(data) == {"SubscriptionResponse": {
+ "SubscriptionId": "testing", "SubscriptionInstances": [{
+ "XappEventInstanceID": "16253", "E2EventInstanceID": "1241"
+ }]
+ }}
+ assert resp == 'OK'
+ assert status == 200
+
+
+def test_unsubscribe(monkeypatch):
+
+ monkeypatch.setattr("ricxappframe.subsclient.ApiClient", MockApiClientLoader)
+
+ subscriber = ricxappframe.xapp_subscribe.NewSubscriber("http://127.0.0.1:8088/ric/v1", local_port=9099)
+ data, resp, status = subscriber.UnSubscribe('1654236')
+ assert resp == 'OK'
+ assert status == 204
+
+
+def test_QuerySubscriptions(monkeypatch):
+
+ monkeypatch.setattr("ricxappframe.subsclient.ApiClient", MockApiClientLoader)
+
+ subscriber = ricxappframe.xapp_subscribe.NewSubscriber("http://127.0.0.1:8088/ric/v1", local_port=9099)
+ data, resp, status = subscriber.QuerySubscriptions()
+ assert json.loads(data) == {'SubscriptionList': [{'SubscriptionId': '12345', 'Meid': 'gnb123456',
+ 'ClientEndpoint': ['127.0.0.1:4056'], 'SubscriptionInstances':
+ [{'XappEventInstanceID': '16253', 'E2EventInstanceID': '1241'}]}]}
+ assert resp == 'OK'
+ assert status == 200
+
+
+def test_ResponseHandler(monkeypatch):
+
+ def subsResponseCB(name, path, data, ctype):
+ response = ricxappframe.xapp_rest.initResponse()
+ response['payload'] = ("{}")
+ return response
+
+ monkeypatch.setattr("ricxappframe.subsclient.ApiClient", MockApiClientLoader)
+
+ subscriber = ricxappframe.xapp_subscribe.NewSubscriber("http://127.0.0.1:8088/ric/v1", local_port=9099)
+ ret = subscriber.ResponseHandler(subsResponseCB)
+
+ assert ret is True