1 # Copyright (C) 2021 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
19 # from o2common.config import config
21 # from o2common.helper import o2logging
22 # logger = o2logging.get_logger(__name__)
24 # r = redis.Redis(**config.get_redis_host_and_port())
25 r = redis.Redis(host='127.0.0.1', port=63791)
27 # apibase = config.get_o2ims_api_base()
28 # url = config.get_api_url()
29 apibase = '/o2ims_infrastructureInventory/v1'
30 url = '127.0.0.1:5005'
33 class Singleton(type):
36 def __call__(cls, *args, **kwargs):
37 if cls not in cls._instances:
38 cls._instances[cls] = super(
39 Singleton, cls).__call__(*args, **kwargs)
40 return cls._instances[cls]
43 class Subscription(metaclass=Singleton):
45 def __init__(self, sub_id='') -> None:
49 def subscription_ims(self):
50 conn = http.client.HTTPConnection(self.url)
51 headers = {'Content-type': 'application/json'}
54 'consumerSubscriptionId': 'mock_smo',
55 'filter': '["pserver","pserver_ram"]'
57 json_val = json.dumps(post_val)
58 conn.request('POST', apibase+'/subscriptions', json_val, headers)
59 resp = conn.getresponse()
60 data = resp.read().decode('utf-8')
61 print(resp.status, resp.reason)
63 json_data = json.loads(data)
64 self.subId = json_data['subscriptionId']
66 def subscription_mq(self):
67 sub = r.pubsub(ignore_subscribe_messages=True)
68 sub.subscribe(self.subId)
70 for m in sub.listen():
72 # logger.info("handling %s", m)
73 print("handling %s", m)
74 channel = m['channel'].decode("UTF-8")
75 if channel == self.subId:
77 data = json.loads(datastr)
78 # logger.info('notification: {}'.format(data))
79 print('notification: {}'.format(data))
81 # logger.info("unhandled:{}".format(channel))
82 print("unhandled:{}".format(channel))
83 except Exception as ex:
84 # logger.warning("{}".format(str(ex)))
85 print("[WARNING]{}".format(str(ex)))
88 def unsubscription_ims(self):
89 conn = http.client.HTTPConnection(self.url)
90 conn.request('DELETE', apibase + '/subscriptions/' + self.subId)
91 resp = conn.getresponse()
92 print(resp.status, resp.reason)
95 def handler(signum, frame):
96 print('\nCtrl-c was pressed. Call to delete subscription')
98 sub.unsubscription_ims()
104 sub.subscription_ims()
105 signal.signal(signal.SIGINT, handler)
106 sub.subscription_mq()
109 if __name__ == "__main__":