Merge "Update DeploymentManagerInfo attributes"
[pti/o2.git] / o2app / entrypoints / redis_eventconsumer.py
1 # Copyright (C) 2021-2024 Wind River Systems, Inc.
2 #
3 #  Licensed under the Apache License, Version 2.0 (the "License");
4 #  you may not use this file except in compliance with the License.
5 #  You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #  Unless required by applicable law or agreed to in writing, software
10 #  distributed under the License is distributed on an "AS IS" BASIS,
11 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #  See the License for the specific language governing permissions and
13 #  limitations under the License.
14
15 # import json
16
17 import redis
18 import json
19
20 from o2app import bootstrap
21 from o2common.adapter.notifications import SmoNotifications
22 from o2common.config import config
23 from o2common.helper import o2logging
24 from o2dms.domain import commands
25 from o2ims.domain import commands as imscmd
26 from o2ims.domain.subscription_obj import Message2SMO, RegistrationMessage, \
27     NotificationEventEnum
28 from o2ims.domain.alarm_obj import AlarmEvent2SMO
29
30 logger = o2logging.get_logger(__name__)
31
32 r = redis.Redis(**config.get_redis_host_and_port())
33
34 apibase = config.get_o2ims_api_base()
35 api_monitoring_base = config.get_o2ims_monitoring_api_base()
36 monitor_api_version = config.get_o2ims_monitoring_api_v1()
37 inventory_api_version = config.get_o2ims_inventory_api_v1()
38
39
40 def main():
41     logger.info("Redis pubsub starting")
42
43     notifications = SmoNotifications()
44     bus = bootstrap.bootstrap(notifications=notifications)
45
46     pubsub = r.pubsub(ignore_subscribe_messages=True)
47     pubsub.subscribe("NfDeploymentStateChanged")
48     pubsub.subscribe('OcloudChanged')
49     pubsub.subscribe('ResourceTypeChanged')
50     pubsub.subscribe('ResourcePoolChanged')
51     pubsub.subscribe('DmsChanged')
52     pubsub.subscribe('ResourceChanged')
53     pubsub.subscribe('AlarmEventChanged')
54     pubsub.subscribe('AlarmEventPurged')
55
56     for m in pubsub.listen():
57         try:
58             handle_changed(m, bus)
59         except Exception as ex:
60             logger.warning("{}".format(str(ex)))
61             continue
62
63
64 def handle_changed(m, bus):
65     logger.info("handling %s", m)
66     channel = m['channel'].decode("UTF-8")
67     if channel == "NfDeploymentStateChanged":
68         datastr = m['data']
69         data = json.loads(datastr)
70         logger.info('HandleNfDeploymentStateChanged with cmd:{}'.format(data))
71         cmd = commands.HandleNfDeploymentStateChanged(
72             NfDeploymentId=data['NfDeploymentId'],
73             FromState=data['FromState'],
74             ToState=data['ToState']
75         )
76         bus.handle(cmd)
77     elif channel == 'ResourceTypeChanged':
78         datastr = m['data']
79         data = json.loads(datastr)
80         logger.info('ResourceTypeChanged with cmd:{}'.format(data))
81         ref = apibase + inventory_api_version + '/resourceTypes/' + \
82             data['id']
83         cmd = imscmd.PubMessage2SMO(data=Message2SMO(
84             id=data['id'], ref=ref,
85             eventtype=data['notificationEventType'],
86             updatetime=data['updatetime']),
87             type='ResourceType')
88         bus.handle(cmd)
89     elif channel == 'ResourcePoolChanged':
90         datastr = m['data']
91         data = json.loads(datastr)
92         logger.info('ResourcePoolChanged with cmd:{}'.format(data))
93         ref = apibase + inventory_api_version + '/resourcePools/' + \
94             data['id']
95         cmd = imscmd.PubMessage2SMO(data=Message2SMO(
96             id=data['id'], ref=ref,
97             eventtype=data['notificationEventType'],
98             updatetime=data['updatetime']),
99             type='ResourcePool')
100         bus.handle(cmd)
101     elif channel == 'DmsChanged':
102         datastr = m['data']
103         data = json.loads(datastr)
104         logger.info('ResourceChanged with cmd:{}'.format(data))
105         ref = apibase + inventory_api_version + '/deploymentManagers/' + \
106             data['id']
107         cmd = imscmd.PubMessage2SMO(data=Message2SMO(
108             id=data['id'], ref=ref,
109             eventtype=data['notificationEventType'],
110             updatetime=data['updatetime']),
111             type='Dms')
112         bus.handle(cmd)
113     elif channel == 'ResourceChanged':
114         datastr = m['data']
115         data = json.loads(datastr)
116         logger.info('ResourceChanged with cmd:{}'.format(data))
117         ref = apibase + inventory_api_version + '/resourcePools/' + \
118             data['resourcePoolId'] + '/resources/' + data['id']
119         cmd = imscmd.PubMessage2SMO(data=Message2SMO(
120             id=data['id'], ref=ref,
121             eventtype=data['notificationEventType'],
122             updatetime=data['updatetime']),
123             type='Resource')
124         bus.handle(cmd)
125     elif channel == 'OcloudChanged':
126         datastr = m['data']
127         data = json.loads(datastr)
128         logger.info('OcloudChanged with cmd:{}'.format(data))
129         if data['notificationEventType'] == NotificationEventEnum.CREATE:
130             cmd = imscmd.Register2SMO(data=RegistrationMessage(
131                 id=data['id'], eventtype=data['notificationEventType'],
132                 updatetime=data['updatetime']))
133         elif data['notificationEventType'] == NotificationEventEnum.MODIFY:
134             ref = apibase + inventory_api_version
135             cmd = imscmd.PubMessage2SMO(data=Message2SMO(
136                 id=data['id'], ref=ref,
137                 eventtype=data['notificationEventType'],
138                 updatetime=data['updatetime']),
139                 type='OCloud')
140         bus.handle(cmd)
141     elif channel == 'AlarmEventChanged':
142         datastr = m['data']
143         data = json.loads(datastr)
144         logger.info('AlarmEventChanged with cmd:{}'.format(data))
145         ref = api_monitoring_base + \
146             monitor_api_version + '/alarms/' + data['id']
147         cmd = imscmd.PubAlarm2SMO(data=AlarmEvent2SMO(
148             id=data['id'], ref=ref,
149             eventtype=data['notificationEventType'],
150             updatetime=data['updatetime']))
151         bus.handle(cmd)
152     elif channel == 'AlarmEventPurged':
153         datastr = m['data']
154         data = json.loads(datastr)
155         logger.info('AlarmEventPurged with cmd:{}'.format(data))
156         cmd = imscmd.PurgeAlarmEvent(data=AlarmEvent2SMO(
157             id=data['id'], ref="", eventtype=data['notificationEventType'],
158             updatetime=data['updatetime']))
159         bus.handle(cmd)
160     else:
161         logger.info("unhandled:{}".format(channel))
162
163
164 if __name__ == "__main__":
165     main()