1 # Copyright (C) 2021-2024 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
20 from o2app import bootstrap
21 from o2common.adapter.notifications import SmoNotifications
22 from o2common.config import config
23 from o2common.helper import o2logging
24 from o2dms.domain import commands
25 from o2ims.domain import commands as imscmd
26 from o2ims.domain.subscription_obj import Message2SMO, RegistrationMessage, \
28 from o2ims.domain.alarm_obj import AlarmEvent2SMO
30 logger = o2logging.get_logger(__name__)
32 r = redis.Redis(**config.get_redis_host_and_port())
34 apibase = config.get_o2ims_api_base()
35 api_monitoring_base = config.get_o2ims_monitoring_api_base()
36 monitor_api_version = config.get_o2ims_monitoring_api_v1()
37 inventory_api_version = config.get_o2ims_inventory_api_v1()
41 logger.info("Redis pubsub starting")
43 notifications = SmoNotifications()
44 bus = bootstrap.bootstrap(notifications=notifications)
46 pubsub = r.pubsub(ignore_subscribe_messages=True)
47 pubsub.subscribe("NfDeploymentStateChanged")
48 pubsub.subscribe('OcloudChanged')
49 pubsub.subscribe('ResourceTypeChanged')
50 pubsub.subscribe('ResourcePoolChanged')
51 pubsub.subscribe('DmsChanged')
52 pubsub.subscribe('ResourceChanged')
53 pubsub.subscribe('AlarmEventChanged')
54 pubsub.subscribe('AlarmEventPurged')
56 for m in pubsub.listen():
58 handle_changed(m, bus)
59 except Exception as ex:
60 logger.warning("{}".format(str(ex)))
64 def handle_changed(m, bus):
65 logger.info("handling %s", m)
66 channel = m['channel'].decode("UTF-8")
67 if channel == "NfDeploymentStateChanged":
69 data = json.loads(datastr)
70 logger.info('HandleNfDeploymentStateChanged with cmd:{}'.format(data))
71 cmd = commands.HandleNfDeploymentStateChanged(
72 NfDeploymentId=data['NfDeploymentId'],
73 FromState=data['FromState'],
74 ToState=data['ToState']
77 elif channel == 'ResourceTypeChanged':
79 data = json.loads(datastr)
80 logger.info('ResourceTypeChanged with cmd:{}'.format(data))
81 ref = apibase + inventory_api_version + '/resourceTypes/' + \
83 cmd = imscmd.PubMessage2SMO(data=Message2SMO(
84 id=data['id'], ref=ref,
85 eventtype=data['notificationEventType'],
86 updatetime=data['updatetime']),
89 elif channel == 'ResourcePoolChanged':
91 data = json.loads(datastr)
92 logger.info('ResourcePoolChanged with cmd:{}'.format(data))
93 ref = apibase + inventory_api_version + '/resourcePools/' + \
95 cmd = imscmd.PubMessage2SMO(data=Message2SMO(
96 id=data['id'], ref=ref,
97 eventtype=data['notificationEventType'],
98 updatetime=data['updatetime']),
101 elif channel == 'DmsChanged':
103 data = json.loads(datastr)
104 logger.info('ResourceChanged with cmd:{}'.format(data))
105 ref = apibase + inventory_api_version + '/deploymentManagers/' + \
107 cmd = imscmd.PubMessage2SMO(data=Message2SMO(
108 id=data['id'], ref=ref,
109 eventtype=data['notificationEventType'],
110 updatetime=data['updatetime']),
113 elif channel == 'ResourceChanged':
115 data = json.loads(datastr)
116 logger.info('ResourceChanged with cmd:{}'.format(data))
117 ref = apibase + inventory_api_version + '/resourcePools/' + \
118 data['resourcePoolId'] + '/resources/' + data['id']
119 cmd = imscmd.PubMessage2SMO(data=Message2SMO(
120 id=data['id'], ref=ref,
121 eventtype=data['notificationEventType'],
122 updatetime=data['updatetime']),
125 elif channel == 'OcloudChanged':
127 data = json.loads(datastr)
128 logger.info('OcloudChanged with cmd:{}'.format(data))
129 if data['notificationEventType'] == NotificationEventEnum.CREATE:
130 cmd = imscmd.Register2SMO(data=RegistrationMessage(
131 id=data['id'], eventtype=data['notificationEventType'],
132 updatetime=data['updatetime']))
133 elif data['notificationEventType'] == NotificationEventEnum.MODIFY:
134 ref = apibase + inventory_api_version
135 cmd = imscmd.PubMessage2SMO(data=Message2SMO(
136 id=data['id'], ref=ref,
137 eventtype=data['notificationEventType'],
138 updatetime=data['updatetime']),
141 elif channel == 'AlarmEventChanged':
143 data = json.loads(datastr)
144 logger.info('AlarmEventChanged with cmd:{}'.format(data))
145 ref = api_monitoring_base + \
146 monitor_api_version + '/alarms/' + data['id']
147 cmd = imscmd.PubAlarm2SMO(data=AlarmEvent2SMO(
148 id=data['id'], ref=ref,
149 eventtype=data['notificationEventType'],
150 updatetime=data['updatetime']))
152 elif channel == 'AlarmEventPurged':
154 data = json.loads(datastr)
155 logger.info('AlarmEventPurged with cmd:{}'.format(data))
156 cmd = imscmd.PurgeAlarmEvent(data=AlarmEvent2SMO(
157 id=data['id'], ref="", eventtype=data['notificationEventType'],
158 updatetime=data['updatetime']))
161 logger.info("unhandled:{}".format(channel))
164 if __name__ == "__main__":