1 # Copyright (C) 2021 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
19 from o2app import bootstrap
20 from o2common.config import config
21 from o2common.adapter.notifications import SmoNotifications
22 from o2dms.domain import commands
23 from o2ims.domain import commands as imscmd
24 from o2ims.domain.subscription_obj import Message2SMO, RegistrationMessage
25 from o2ims.domain.alarm_obj import AlarmEvent2SMO
27 from o2common.helper import o2logging
28 logger = o2logging.get_logger(__name__)
30 r = redis.Redis(**config.get_redis_host_and_port())
32 apibase = config.get_o2ims_api_base()
33 api_monitoring_base = config.get_o2ims_monitoring_api_base()
34 monitor_api_version = config.get_o2ims_monitoring_api_v1()
35 inventory_api_version = config.get_o2ims_inventory_api_v1()
39 logger.info("Redis pubsub starting")
41 notifications = SmoNotifications()
42 bus = bootstrap.bootstrap(notifications=notifications)
44 pubsub = r.pubsub(ignore_subscribe_messages=True)
45 pubsub.subscribe("NfDeploymentStateChanged")
46 pubsub.subscribe('OcloudChanged')
47 pubsub.subscribe('ResourceTypeChanged')
48 pubsub.subscribe('ResourcePoolChanged')
49 pubsub.subscribe('DmsChanged')
50 pubsub.subscribe('ResourceChanged')
51 pubsub.subscribe('AlarmEventChanged')
53 for m in pubsub.listen():
55 handle_changed(m, bus)
56 except Exception as ex:
57 logger.warning("{}".format(str(ex)))
61 def handle_changed(m, bus):
62 logger.info("handling %s", m)
63 channel = m['channel'].decode("UTF-8")
64 if channel == "NfDeploymentStateChanged":
66 data = json.loads(datastr)
67 logger.info('HandleNfDeploymentStateChanged with cmd:{}'.format(data))
68 cmd = commands.HandleNfDeploymentStateChanged(
69 NfDeploymentId=data['NfDeploymentId'],
70 FromState=data['FromState'],
71 ToState=data['ToState']
74 elif channel == 'ResourceTypeChanged':
76 data = json.loads(datastr)
77 logger.info('ResourceTypeChanged with cmd:{}'.format(data))
78 ref = apibase + inventory_api_version + '/resourceTypes/' + \
80 cmd = imscmd.PubMessage2SMO(data=Message2SMO(
81 id=data['id'], ref=ref,
82 eventtype=data['notificationEventType'],
83 updatetime=data['updatetime']),
86 elif channel == 'ResourcePoolChanged':
88 data = json.loads(datastr)
89 logger.info('ResourcePoolChanged with cmd:{}'.format(data))
90 ref = apibase + inventory_api_version + '/resourcePools/' + \
92 cmd = imscmd.PubMessage2SMO(data=Message2SMO(
93 id=data['id'], ref=ref,
94 eventtype=data['notificationEventType'],
95 updatetime=data['updatetime']),
98 elif channel == 'DmsChanged':
100 data = json.loads(datastr)
101 logger.info('ResourceChanged with cmd:{}'.format(data))
102 ref = apibase + inventory_api_version + '/deploymentManagers/' + \
104 cmd = imscmd.PubMessage2SMO(data=Message2SMO(
105 id=data['id'], ref=ref,
106 eventtype=data['notificationEventType'],
107 updatetime=data['updatetime']),
110 elif channel == 'ResourceChanged':
112 data = json.loads(datastr)
113 logger.info('ResourceChanged with cmd:{}'.format(data))
114 ref = apibase + inventory_api_version + '/resourcePools/' + \
115 data['resourcePoolId'] + '/resources/' + data['id']
116 cmd = imscmd.PubMessage2SMO(data=Message2SMO(
117 id=data['id'], ref=ref,
118 eventtype=data['notificationEventType'],
119 updatetime=data['updatetime']),
122 elif channel == 'OcloudChanged':
124 data = json.loads(datastr)
125 logger.info('OcloudChanged with cmd:{}'.format(data))
126 cmd = imscmd.Register2SMO(data=RegistrationMessage(
127 id=data['id'], eventtype=data['notificationEventType'],
128 updatetime=data['updatetime']))
130 elif channel == 'AlarmEventChanged':
132 data = json.loads(datastr)
133 logger.info('AlarmEventChanged with cmd:{}'.format(data))
134 ref = api_monitoring_base + \
135 monitor_api_version + '/alarms/' + data['id']
136 cmd = imscmd.PubAlarm2SMO(data=AlarmEvent2SMO(
137 id=data['id'], ref=ref,
138 eventtype=data['notificationEventType'],
139 updatetime=data['updatetime']))
142 logger.info("unhandled:{}".format(channel))
145 if __name__ == "__main__":