2 import ricxappframe.xapp_subscribe
3 import ricxappframe.subsclient
4 import ricxappframe.xapp_rest
7 class MockApiClientResponse:
8 def __init__(self, data, reason, status):
14 class MockApiClientLoader(ricxappframe.subsclient.ApiClient):
15 def __init__(self, config):
21 def request(self, method, url, headers, body=None):
24 '{ "SubscriptionResponse": {'
25 '"SubscriptionId": "testing",'
26 '"SubscriptionInstances": [{'
27 '"XappEventInstanceID": "16253",'
28 '"E2EventInstanceID": "1241"'
32 return MockApiClientResponse(data, 'OK', 200)
33 elif method == 'DELETE':
34 return MockApiClientResponse(None, 'OK', 204)
37 '{ "SubscriptionList": [{'
38 '"SubscriptionId": "12345",'
39 '"Meid": "gnb123456",'
40 '"ClientEndpoint": ["127.0.0.1:4056"],'
41 '"SubscriptionInstances": [{'
42 '"XappEventInstanceID": "16253",'
43 '"E2EventInstanceID": "1241"'
48 return MockApiClientResponse(data, 'OK', 200)
51 def test_subscribe(monkeypatch):
53 monkeypatch.setattr("ricxappframe.subsclient.ApiClient", MockApiClientLoader)
55 subscriber = ricxappframe.xapp_subscribe.NewSubscriber("http://127.0.0.1:8088/ric/v1", local_port=9099)
56 # setup the subscription
57 subEndPoint = subscriber.SubscriptionParamsClientEndpoint("localhost", 8091, 4061)
58 assert subEndPoint.to_dict() == {'host': 'localhost', 'http_port': 8091, 'rmr_port': 4061}
60 subsDirective = subscriber.SubscriptionParamsE2SubscriptionDirectives(10, 2, False)
61 assert subsDirective.to_dict() == {'e2_retry_count': 2, 'e2_timeout_timer_value': 10, 'rmr_routing_needed': False}
63 subsequentAction = subscriber.SubsequentAction("continue", "w10ms")
64 assert subsequentAction.to_dict() == {'subsequent_action_type': 'continue', 'time_to_wait': 'w10ms'}
66 actionDefinitionList = subscriber.ActionToBeSetup(1, "policy", (11, 12, 13, 14, 15), subsequentAction)
67 assert actionDefinitionList.to_dict() == {
68 'action_definition': (11, 12, 13, 14, 15),
69 'action_id': 1, 'action_type': 'policy',
70 'subsequent_action': {
71 'subsequent_action_type': 'continue',
72 'time_to_wait': 'w10ms'
76 subsDetail = subscriber.SubscriptionDetail(12110, (1, 2, 3, 4, 5), actionDefinitionList)
77 assert subsDetail.to_dict() == {
78 'action_to_be_setup_list': {
79 'action_definition': (11, 12, 13, 14, 15),
80 'action_id': 1, 'action_type': 'policy',
81 'subsequent_action': {
82 'subsequent_action_type': 'continue',
83 'time_to_wait': 'w10ms'
86 'event_triggers': (1, 2, 3, 4, 5),
87 'xapp_event_instance_id': 12110
90 # subscription data ready, make the subscription
91 subObj = subscriber.SubscriptionParams("sub10", subEndPoint, "gnb123456", 1231, subsDirective, subsDetail)
92 assert subObj.to_dict() == {
94 'host': 'localhost', 'http_port': 8091, 'rmr_port': 4061
96 'e2_subscription_directives': {
97 'e2_retry_count': 2, 'e2_timeout_timer_value': 10,
98 'rmr_routing_needed': False
100 'meid': 'gnb123456', 'ran_function_id': 1231, 'subscription_details': {
101 'action_to_be_setup_list': {
102 'action_definition': (11, 12, 13, 14, 15),
103 'action_id': 1, 'action_type': 'policy', 'subsequent_action': {
104 'subsequent_action_type': 'continue', 'time_to_wait': 'w10ms'
107 'event_triggers': (1, 2, 3, 4, 5), 'xapp_event_instance_id': 12110
109 'subscription_id': 'sub10'
112 data, resp, status = subscriber.Subscribe(subObj)
113 assert json.loads(data) == {"SubscriptionResponse": {
114 "SubscriptionId": "testing", "SubscriptionInstances": [{
115 "XappEventInstanceID": "16253", "E2EventInstanceID": "1241"
122 def test_unsubscribe(monkeypatch):
124 monkeypatch.setattr("ricxappframe.subsclient.ApiClient", MockApiClientLoader)
126 subscriber = ricxappframe.xapp_subscribe.NewSubscriber("http://127.0.0.1:8088/ric/v1", local_port=9099)
127 data, resp, status = subscriber.UnSubscribe('1654236')
132 def test_QuerySubscriptions(monkeypatch):
134 monkeypatch.setattr("ricxappframe.subsclient.ApiClient", MockApiClientLoader)
136 subscriber = ricxappframe.xapp_subscribe.NewSubscriber("http://127.0.0.1:8088/ric/v1", local_port=9099)
137 data, resp, status = subscriber.QuerySubscriptions()
138 assert json.loads(data) == {'SubscriptionList': [{'SubscriptionId': '12345', 'Meid': 'gnb123456',
139 'ClientEndpoint': ['127.0.0.1:4056'], 'SubscriptionInstances':
140 [{'XappEventInstanceID': '16253', 'E2EventInstanceID': '1241'}]}]}
145 def test_ResponseHandler(monkeypatch):
147 def subsResponseCB(name, path, data, ctype):
148 response = ricxappframe.xapp_rest.initResponse()
149 response['payload'] = ("{}")
152 monkeypatch.setattr("ricxappframe.subsclient.ApiClient", MockApiClientLoader)
154 subscriber = ricxappframe.xapp_subscribe.NewSubscriber("http://127.0.0.1:8088/ric/v1", local_port=9099)
155 ret = subscriber.ResponseHandler(subsResponseCB)