db39daa9fefcc26658a674ed450f33e11bb18016
[pti/o2.git] / o2app / entrypoints / redis_eventconsumer.py
1 # Copyright (C) 2021-2024 Wind River Systems, Inc.
2 #
3 #  Licensed under the Apache License, Version 2.0 (the "License");
4 #  you may not use this file except in compliance with the License.
5 #  You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #  Unless required by applicable law or agreed to in writing, software
10 #  distributed under the License is distributed on an "AS IS" BASIS,
11 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #  See the License for the specific language governing permissions and
13 #  limitations under the License.
14
15 # import json
16
17 import redis
18 import json
19
20 from o2app import bootstrap
21 from o2common.adapter.notifications import SmoNotifications
22 from o2common.config import config
23 from o2common.helper import o2logging
24 from o2dms.domain import commands
25 from o2ims.domain import commands as imscmd
26 from o2ims.domain.subscription_obj import Message2SMO, RegistrationMessage
27 from o2ims.domain.alarm_obj import AlarmEvent2SMO
28
29 logger = o2logging.get_logger(__name__)
30
31 r = redis.Redis(**config.get_redis_host_and_port())
32
33 apibase = config.get_o2ims_api_base()
34 api_monitoring_base = config.get_o2ims_monitoring_api_base()
35 monitor_api_version = config.get_o2ims_monitoring_api_v1()
36 inventory_api_version = config.get_o2ims_inventory_api_v1()
37
38
39 def main():
40     logger.info("Redis pubsub starting")
41
42     notifications = SmoNotifications()
43     bus = bootstrap.bootstrap(notifications=notifications)
44
45     pubsub = r.pubsub(ignore_subscribe_messages=True)
46     pubsub.subscribe("NfDeploymentStateChanged")
47     pubsub.subscribe('OcloudChanged')
48     pubsub.subscribe('ResourceTypeChanged')
49     pubsub.subscribe('ResourcePoolChanged')
50     pubsub.subscribe('DmsChanged')
51     pubsub.subscribe('ResourceChanged')
52     pubsub.subscribe('AlarmEventChanged')
53     pubsub.subscribe('AlarmEventPurged')
54
55     for m in pubsub.listen():
56         try:
57             handle_changed(m, bus)
58         except Exception as ex:
59             logger.warning("{}".format(str(ex)))
60             continue
61
62
63 def handle_changed(m, bus):
64     logger.info("handling %s", m)
65     channel = m['channel'].decode("UTF-8")
66     if channel == "NfDeploymentStateChanged":
67         datastr = m['data']
68         data = json.loads(datastr)
69         logger.info('HandleNfDeploymentStateChanged with cmd:{}'.format(data))
70         cmd = commands.HandleNfDeploymentStateChanged(
71             NfDeploymentId=data['NfDeploymentId'],
72             FromState=data['FromState'],
73             ToState=data['ToState']
74         )
75         bus.handle(cmd)
76     elif channel == 'ResourceTypeChanged':
77         datastr = m['data']
78         data = json.loads(datastr)
79         logger.info('ResourceTypeChanged with cmd:{}'.format(data))
80         ref = apibase + inventory_api_version + '/resourceTypes/' + \
81             data['id']
82         cmd = imscmd.PubMessage2SMO(data=Message2SMO(
83             id=data['id'], ref=ref,
84             eventtype=data['notificationEventType'],
85             updatetime=data['updatetime']),
86             type='ResourceType')
87         bus.handle(cmd)
88     elif channel == 'ResourcePoolChanged':
89         datastr = m['data']
90         data = json.loads(datastr)
91         logger.info('ResourcePoolChanged with cmd:{}'.format(data))
92         ref = apibase + inventory_api_version + '/resourcePools/' + \
93             data['id']
94         cmd = imscmd.PubMessage2SMO(data=Message2SMO(
95             id=data['id'], ref=ref,
96             eventtype=data['notificationEventType'],
97             updatetime=data['updatetime']),
98             type='ResourcePool')
99         bus.handle(cmd)
100     elif channel == 'DmsChanged':
101         datastr = m['data']
102         data = json.loads(datastr)
103         logger.info('ResourceChanged with cmd:{}'.format(data))
104         ref = apibase + inventory_api_version + '/deploymentManagers/' + \
105             data['id']
106         cmd = imscmd.PubMessage2SMO(data=Message2SMO(
107             id=data['id'], ref=ref,
108             eventtype=data['notificationEventType'],
109             updatetime=data['updatetime']),
110             type='Dms')
111         bus.handle(cmd)
112     elif channel == 'ResourceChanged':
113         datastr = m['data']
114         data = json.loads(datastr)
115         logger.info('ResourceChanged with cmd:{}'.format(data))
116         ref = apibase + inventory_api_version + '/resourcePools/' + \
117             data['resourcePoolId'] + '/resources/' + data['id']
118         cmd = imscmd.PubMessage2SMO(data=Message2SMO(
119             id=data['id'], ref=ref,
120             eventtype=data['notificationEventType'],
121             updatetime=data['updatetime']),
122             type='Resource')
123         bus.handle(cmd)
124     elif channel == 'OcloudChanged':
125         datastr = m['data']
126         data = json.loads(datastr)
127         logger.info('OcloudChanged with cmd:{}'.format(data))
128         cmd = imscmd.Register2SMO(data=RegistrationMessage(
129             id=data['id'], eventtype=data['notificationEventType'],
130             updatetime=data['updatetime']))
131         bus.handle(cmd)
132     elif channel == 'AlarmEventChanged':
133         datastr = m['data']
134         data = json.loads(datastr)
135         logger.info('AlarmEventChanged with cmd:{}'.format(data))
136         ref = api_monitoring_base + \
137             monitor_api_version + '/alarms/' + data['id']
138         cmd = imscmd.PubAlarm2SMO(data=AlarmEvent2SMO(
139             id=data['id'], ref=ref,
140             eventtype=data['notificationEventType'],
141             updatetime=data['updatetime']))
142         bus.handle(cmd)
143     elif channel == 'AlarmEventPurged':
144         datastr = m['data']
145         data = json.loads(datastr)
146         logger.info('AlarmEventPurged with cmd:{}'.format(data))
147         cmd = imscmd.PurgeAlarmEvent(data=AlarmEvent2SMO(
148             id=data['id'], eventtype=data['notificationEventType'],
149             updatetime=data['updatetime']))
150         bus.handle(cmd)
151     else:
152         logger.info("unhandled:{}".format(channel))
153
154
155 if __name__ == "__main__":
156     main()