1 # Copyright (C) 2021-2024 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
20 from o2app import bootstrap
21 from o2common.adapter.notifications import SmoNotifications
22 from o2common.config import config
23 from o2common.helper import o2logging
24 from o2dms.domain import commands
25 from o2ims.domain import commands as imscmd
26 from o2ims.domain.subscription_obj import Message2SMO, RegistrationMessage
27 from o2ims.domain.alarm_obj import AlarmEvent2SMO
29 logger = o2logging.get_logger(__name__)
31 r = redis.Redis(**config.get_redis_host_and_port())
33 apibase = config.get_o2ims_api_base()
34 api_monitoring_base = config.get_o2ims_monitoring_api_base()
35 monitor_api_version = config.get_o2ims_monitoring_api_v1()
36 inventory_api_version = config.get_o2ims_inventory_api_v1()
40 logger.info("Redis pubsub starting")
42 notifications = SmoNotifications()
43 bus = bootstrap.bootstrap(notifications=notifications)
45 pubsub = r.pubsub(ignore_subscribe_messages=True)
46 pubsub.subscribe("NfDeploymentStateChanged")
47 pubsub.subscribe('OcloudChanged')
48 pubsub.subscribe('ResourceTypeChanged')
49 pubsub.subscribe('ResourcePoolChanged')
50 pubsub.subscribe('DmsChanged')
51 pubsub.subscribe('ResourceChanged')
52 pubsub.subscribe('AlarmEventChanged')
53 pubsub.subscribe('AlarmEventPurged')
55 for m in pubsub.listen():
57 handle_changed(m, bus)
58 except Exception as ex:
59 logger.warning("{}".format(str(ex)))
63 def handle_changed(m, bus):
64 logger.info("handling %s", m)
65 channel = m['channel'].decode("UTF-8")
66 if channel == "NfDeploymentStateChanged":
68 data = json.loads(datastr)
69 logger.info('HandleNfDeploymentStateChanged with cmd:{}'.format(data))
70 cmd = commands.HandleNfDeploymentStateChanged(
71 NfDeploymentId=data['NfDeploymentId'],
72 FromState=data['FromState'],
73 ToState=data['ToState']
76 elif channel == 'ResourceTypeChanged':
78 data = json.loads(datastr)
79 logger.info('ResourceTypeChanged with cmd:{}'.format(data))
80 ref = apibase + inventory_api_version + '/resourceTypes/' + \
82 cmd = imscmd.PubMessage2SMO(data=Message2SMO(
83 id=data['id'], ref=ref,
84 eventtype=data['notificationEventType'],
85 updatetime=data['updatetime']),
88 elif channel == 'ResourcePoolChanged':
90 data = json.loads(datastr)
91 logger.info('ResourcePoolChanged with cmd:{}'.format(data))
92 ref = apibase + inventory_api_version + '/resourcePools/' + \
94 cmd = imscmd.PubMessage2SMO(data=Message2SMO(
95 id=data['id'], ref=ref,
96 eventtype=data['notificationEventType'],
97 updatetime=data['updatetime']),
100 elif channel == 'DmsChanged':
102 data = json.loads(datastr)
103 logger.info('ResourceChanged with cmd:{}'.format(data))
104 ref = apibase + inventory_api_version + '/deploymentManagers/' + \
106 cmd = imscmd.PubMessage2SMO(data=Message2SMO(
107 id=data['id'], ref=ref,
108 eventtype=data['notificationEventType'],
109 updatetime=data['updatetime']),
112 elif channel == 'ResourceChanged':
114 data = json.loads(datastr)
115 logger.info('ResourceChanged with cmd:{}'.format(data))
116 ref = apibase + inventory_api_version + '/resourcePools/' + \
117 data['resourcePoolId'] + '/resources/' + data['id']
118 cmd = imscmd.PubMessage2SMO(data=Message2SMO(
119 id=data['id'], ref=ref,
120 eventtype=data['notificationEventType'],
121 updatetime=data['updatetime']),
124 elif channel == 'OcloudChanged':
126 data = json.loads(datastr)
127 logger.info('OcloudChanged with cmd:{}'.format(data))
128 cmd = imscmd.Register2SMO(data=RegistrationMessage(
129 id=data['id'], eventtype=data['notificationEventType'],
130 updatetime=data['updatetime']))
132 elif channel == 'AlarmEventChanged':
134 data = json.loads(datastr)
135 logger.info('AlarmEventChanged with cmd:{}'.format(data))
136 ref = api_monitoring_base + \
137 monitor_api_version + '/alarms/' + data['id']
138 cmd = imscmd.PubAlarm2SMO(data=AlarmEvent2SMO(
139 id=data['id'], ref=ref,
140 eventtype=data['notificationEventType'],
141 updatetime=data['updatetime']))
143 elif channel == 'AlarmEventPurged':
145 data = json.loads(datastr)
146 logger.info('AlarmEventPurged with cmd:{}'.format(data))
147 cmd = imscmd.PurgeAlarmEvent(data=AlarmEvent2SMO(
148 id=data['id'], eventtype=data['notificationEventType'],
149 updatetime=data['updatetime']))
152 logger.info("unhandled:{}".format(channel))
155 if __name__ == "__main__":