Add subscription and notification for resource changes; fix a bug while pserver node...
[pti/o2.git] / tests / mock_smo / subscription.py
1 # Copyright (C) 2021 Wind River Systems, Inc.
2 #
3 #  Licensed under the Apache License, Version 2.0 (the "License");
4 #  you may not use this file except in compliance with the License.
5 #  You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #  Unless required by applicable law or agreed to in writing, software
10 #  distributed under the License is distributed on an "AS IS" BASIS,
11 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #  See the License for the specific language governing permissions and
13 #  limitations under the License.
14
15 import redis
16 import json
17 import signal
18 import http.client
19 # from o2common.config import config
20
21 # from o2common.helper import o2logging
22 # logger = o2logging.get_logger(__name__)
23
24 # r = redis.Redis(**config.get_redis_host_and_port())
25 r = redis.Redis(host='127.0.0.1', port=63791)
26
27 # apibase = config.get_o2ims_api_base()
28 # url = config.get_api_url()
29 apibase = '/o2ims_infrastructureInventory/v1'
30 url = '127.0.0.1:5005'
31
32
33 class Singleton(type):
34     _instances = {}
35
36     def __call__(cls, *args, **kwargs):
37         if cls not in cls._instances:
38             cls._instances[cls] = super(
39                 Singleton, cls).__call__(*args, **kwargs)
40         return cls._instances[cls]
41
42
43 class Subscription(metaclass=Singleton):
44
45     def __init__(self, sub_id='') -> None:
46         self.url = url
47         self.subId = sub_id
48
49     def subscription_ims(self):
50         conn = http.client.HTTPConnection(self.url)
51         headers = {'Content-type': 'application/json'}
52         post_val = {
53             'callback': self.url,
54             'consumerSubscriptionId': 'mock_smo',
55             'filter': '["pserver","pserver_ram"]'
56         }
57         json_val = json.dumps(post_val)
58         conn.request('POST', apibase+'/subscriptions', json_val, headers)
59         resp = conn.getresponse()
60         data = resp.read().decode('utf-8')
61         print(resp.status, resp.reason)
62         print(data)
63         json_data = json.loads(data)
64         self.subId = json_data['subscriptionId']
65
66     def subscription_mq(self):
67         sub = r.pubsub(ignore_subscribe_messages=True)
68         sub.subscribe(self.subId)
69
70         for m in sub.listen():
71             try:
72                 # logger.info("handling %s", m)
73                 print("handling %s", m)
74                 channel = m['channel'].decode("UTF-8")
75                 if channel == self.subId:
76                     datastr = m['data']
77                     data = json.loads(datastr)
78                     # logger.info('notification: {}'.format(data))
79                     print('notification: {}'.format(data))
80                 else:
81                     # logger.info("unhandled:{}".format(channel))
82                     print("unhandled:{}".format(channel))
83             except Exception as ex:
84                 # logger.warning("{}".format(str(ex)))
85                 print("[WARNING]{}".format(str(ex)))
86                 continue
87
88     def unsubscription_ims(self):
89         conn = http.client.HTTPConnection(self.url)
90         conn.request('DELETE', apibase + '/subscriptions/' + self.subId)
91         resp = conn.getresponse()
92         print(resp.status, resp.reason)
93
94
95 def handler(signum, frame):
96     print('\nCtrl-c was pressed. Call to delete subscription')
97     sub = Subscription()
98     sub.unsubscription_ims()
99     exit()
100
101
102 def main():
103     sub = Subscription()
104     sub.subscription_ims()
105     signal.signal(signal.SIGINT, handler)
106     sub.subscription_mq()
107
108
109 if __name__ == "__main__":
110     main()