c95133a50579b40dc43a61b5f2892ef79a1a8546
[pti/o2.git] / o2app / entrypoints / redis_eventconsumer.py
1 # Copyright (C) 2021 Wind River Systems, Inc.
2 #
3 #  Licensed under the Apache License, Version 2.0 (the "License");
4 #  you may not use this file except in compliance with the License.
5 #  You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #  Unless required by applicable law or agreed to in writing, software
10 #  distributed under the License is distributed on an "AS IS" BASIS,
11 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #  See the License for the specific language governing permissions and
13 #  limitations under the License.
14
15 # import json
16
17 import redis
18 import json
19 from o2app import bootstrap
20 from o2common.config import config
21 from o2common.adapter.notifications import SmoNotifications
22 from o2dms.domain import commands
23 from o2ims.domain import commands as imscmd
24 from o2ims.domain.subscription_obj import Message2SMO, RegistrationMessage
25 from o2ims.domain.alarm_obj import AlarmEvent2SMO
26
27 from o2common.helper import o2logging
28 logger = o2logging.get_logger(__name__)
29
30 r = redis.Redis(**config.get_redis_host_and_port())
31
32 apibase = config.get_o2ims_api_base()
33 api_monitoring_base = config.get_o2ims_monitoring_api_base()
34 monitor_api_version = config.get_o2ims_monitoring_api_v1()
35 inventory_api_version = config.get_o2ims_inventory_api_v1()
36
37
38 def main():
39     logger.info("Redis pubsub starting")
40
41     notifications = SmoNotifications()
42     bus = bootstrap.bootstrap(notifications=notifications)
43
44     pubsub = r.pubsub(ignore_subscribe_messages=True)
45     pubsub.subscribe("NfDeploymentStateChanged")
46     pubsub.subscribe('OcloudChanged')
47     pubsub.subscribe('ResourceTypeChanged')
48     pubsub.subscribe('ResourcePoolChanged')
49     pubsub.subscribe('DmsChanged')
50     pubsub.subscribe('ResourceChanged')
51     pubsub.subscribe('AlarmEventChanged')
52
53     for m in pubsub.listen():
54         try:
55             handle_changed(m, bus)
56         except Exception as ex:
57             logger.warning("{}".format(str(ex)))
58             continue
59
60
61 def handle_changed(m, bus):
62     logger.info("handling %s", m)
63     channel = m['channel'].decode("UTF-8")
64     if channel == "NfDeploymentStateChanged":
65         datastr = m['data']
66         data = json.loads(datastr)
67         logger.info('HandleNfDeploymentStateChanged with cmd:{}'.format(data))
68         cmd = commands.HandleNfDeploymentStateChanged(
69             NfDeploymentId=data['NfDeploymentId'],
70             FromState=data['FromState'],
71             ToState=data['ToState']
72         )
73         bus.handle(cmd)
74     elif channel == 'ResourceTypeChanged':
75         datastr = m['data']
76         data = json.loads(datastr)
77         logger.info('ResourceTypeChanged with cmd:{}'.format(data))
78         ref = apibase + inventory_api_version + '/resourceTypes/' + \
79             data['id']
80         cmd = imscmd.PubMessage2SMO(data=Message2SMO(
81             id=data['id'], ref=ref,
82             eventtype=data['notificationEventType'],
83             updatetime=data['updatetime']),
84             type='ResourceType')
85         bus.handle(cmd)
86     elif channel == 'ResourcePoolChanged':
87         datastr = m['data']
88         data = json.loads(datastr)
89         logger.info('ResourcePoolChanged with cmd:{}'.format(data))
90         ref = apibase + inventory_api_version + '/resourcePools/' + \
91             data['id']
92         cmd = imscmd.PubMessage2SMO(data=Message2SMO(
93             id=data['id'], ref=ref,
94             eventtype=data['notificationEventType'],
95             updatetime=data['updatetime']),
96             type='ResourcePool')
97         bus.handle(cmd)
98     elif channel == 'DmsChanged':
99         datastr = m['data']
100         data = json.loads(datastr)
101         logger.info('ResourceChanged with cmd:{}'.format(data))
102         ref = apibase + inventory_api_version + '/deploymentManagers/' + \
103             data['id']
104         cmd = imscmd.PubMessage2SMO(data=Message2SMO(
105             id=data['id'], ref=ref,
106             eventtype=data['notificationEventType'],
107             updatetime=data['updatetime']),
108             type='Dms')
109         bus.handle(cmd)
110     elif channel == 'ResourceChanged':
111         datastr = m['data']
112         data = json.loads(datastr)
113         logger.info('ResourceChanged with cmd:{}'.format(data))
114         ref = apibase + inventory_api_version + '/resourcePools/' + \
115             data['resourcePoolId'] + '/resources/' + data['id']
116         cmd = imscmd.PubMessage2SMO(data=Message2SMO(
117             id=data['id'], ref=ref,
118             eventtype=data['notificationEventType'],
119             updatetime=data['updatetime']),
120             type='Resource')
121         bus.handle(cmd)
122     elif channel == 'OcloudChanged':
123         datastr = m['data']
124         data = json.loads(datastr)
125         logger.info('OcloudChanged with cmd:{}'.format(data))
126         cmd = imscmd.Register2SMO(data=RegistrationMessage(
127             id=data['id'], eventtype=data['notificationEventType'],
128             updatetime=data['updatetime']))
129         bus.handle(cmd)
130     elif channel == 'AlarmEventChanged':
131         datastr = m['data']
132         data = json.loads(datastr)
133         logger.info('AlarmEventChanged with cmd:{}'.format(data))
134         ref = api_monitoring_base + \
135             monitor_api_version + '/alarms/' + data['id']
136         cmd = imscmd.PubAlarm2SMO(data=AlarmEvent2SMO(
137             id=data['id'], ref=ref,
138             eventtype=data['notificationEventType'],
139             updatetime=data['updatetime']))
140         bus.handle(cmd)
141     else:
142         logger.info("unhandled:{}".format(channel))
143
144
145 if __name__ == "__main__":
146     main()