Add the PATCH method for the monitoring API
[pti/o2.git] / o2app / entrypoints / redis_eventconsumer.py
1 # Copyright (C) 2021-2024 Wind River Systems, Inc.
2 #
3 #  Licensed under the Apache License, Version 2.0 (the "License");
4 #  you may not use this file except in compliance with the License.
5 #  You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 #  Unless required by applicable law or agreed to in writing, software
10 #  distributed under the License is distributed on an "AS IS" BASIS,
11 #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 #  See the License for the specific language governing permissions and
13 #  limitations under the License.
14
15 # import json
16
17 import redis
18 import json
19 from o2app import bootstrap
20 from o2common.config import config
21 from o2common.adapter.notifications import SmoNotifications
22 from o2dms.domain import commands
23 from o2ims.domain import commands as imscmd
24 from o2ims.domain.subscription_obj import Message2SMO, RegistrationMessage
25 from o2ims.domain.alarm_obj import AlarmEvent2SMO
26
27 from o2common.helper import o2logging
28 logger = o2logging.get_logger(__name__)
29
30 r = redis.Redis(**config.get_redis_host_and_port())
31
32 apibase = config.get_o2ims_api_base()
33 api_monitoring_base = config.get_o2ims_monitoring_api_base()
34 monitor_api_version = config.get_o2ims_monitoring_api_v1()
35 inventory_api_version = config.get_o2ims_inventory_api_v1()
36
37
38 def main():
39     logger.info("Redis pubsub starting")
40
41     notifications = SmoNotifications()
42     bus = bootstrap.bootstrap(notifications=notifications)
43
44     pubsub = r.pubsub(ignore_subscribe_messages=True)
45     pubsub.subscribe("NfDeploymentStateChanged")
46     pubsub.subscribe('OcloudChanged')
47     pubsub.subscribe('ResourceTypeChanged')
48     pubsub.subscribe('ResourcePoolChanged')
49     pubsub.subscribe('DmsChanged')
50     pubsub.subscribe('ResourceChanged')
51     pubsub.subscribe('AlarmEventChanged')
52     pubsub.subscribe('AlarmEventPurged')
53
54     for m in pubsub.listen():
55         try:
56             handle_changed(m, bus)
57         except Exception as ex:
58             logger.warning("{}".format(str(ex)))
59             continue
60
61
62 def handle_changed(m, bus):
63     logger.info("handling %s", m)
64     channel = m['channel'].decode("UTF-8")
65     if channel == "NfDeploymentStateChanged":
66         datastr = m['data']
67         data = json.loads(datastr)
68         logger.info('HandleNfDeploymentStateChanged with cmd:{}'.format(data))
69         cmd = commands.HandleNfDeploymentStateChanged(
70             NfDeploymentId=data['NfDeploymentId'],
71             FromState=data['FromState'],
72             ToState=data['ToState']
73         )
74         bus.handle(cmd)
75     elif channel == 'ResourceTypeChanged':
76         datastr = m['data']
77         data = json.loads(datastr)
78         logger.info('ResourceTypeChanged with cmd:{}'.format(data))
79         ref = apibase + inventory_api_version + '/resourceTypes/' + \
80             data['id']
81         cmd = imscmd.PubMessage2SMO(data=Message2SMO(
82             id=data['id'], ref=ref,
83             eventtype=data['notificationEventType'],
84             updatetime=data['updatetime']),
85             type='ResourceType')
86         bus.handle(cmd)
87     elif channel == 'ResourcePoolChanged':
88         datastr = m['data']
89         data = json.loads(datastr)
90         logger.info('ResourcePoolChanged with cmd:{}'.format(data))
91         ref = apibase + inventory_api_version + '/resourcePools/' + \
92             data['id']
93         cmd = imscmd.PubMessage2SMO(data=Message2SMO(
94             id=data['id'], ref=ref,
95             eventtype=data['notificationEventType'],
96             updatetime=data['updatetime']),
97             type='ResourcePool')
98         bus.handle(cmd)
99     elif channel == 'DmsChanged':
100         datastr = m['data']
101         data = json.loads(datastr)
102         logger.info('ResourceChanged with cmd:{}'.format(data))
103         ref = apibase + inventory_api_version + '/deploymentManagers/' + \
104             data['id']
105         cmd = imscmd.PubMessage2SMO(data=Message2SMO(
106             id=data['id'], ref=ref,
107             eventtype=data['notificationEventType'],
108             updatetime=data['updatetime']),
109             type='Dms')
110         bus.handle(cmd)
111     elif channel == 'ResourceChanged':
112         datastr = m['data']
113         data = json.loads(datastr)
114         logger.info('ResourceChanged with cmd:{}'.format(data))
115         ref = apibase + inventory_api_version + '/resourcePools/' + \
116             data['resourcePoolId'] + '/resources/' + data['id']
117         cmd = imscmd.PubMessage2SMO(data=Message2SMO(
118             id=data['id'], ref=ref,
119             eventtype=data['notificationEventType'],
120             updatetime=data['updatetime']),
121             type='Resource')
122         bus.handle(cmd)
123     elif channel == 'OcloudChanged':
124         datastr = m['data']
125         data = json.loads(datastr)
126         logger.info('OcloudChanged with cmd:{}'.format(data))
127         cmd = imscmd.Register2SMO(data=RegistrationMessage(
128             id=data['id'], eventtype=data['notificationEventType'],
129             updatetime=data['updatetime']))
130         bus.handle(cmd)
131     elif channel == 'AlarmEventChanged':
132         datastr = m['data']
133         data = json.loads(datastr)
134         logger.info('AlarmEventChanged with cmd:{}'.format(data))
135         ref = api_monitoring_base + \
136             monitor_api_version + '/alarms/' + data['id']
137         cmd = imscmd.PubAlarm2SMO(data=AlarmEvent2SMO(
138             id=data['id'], ref=ref,
139             eventtype=data['notificationEventType'],
140             updatetime=data['updatetime']))
141         bus.handle(cmd)
142     elif channel == 'AlarmEventPurged':
143         datastr = m['data']
144         data = json.loads(datastr)
145         logger.info('AlarmEventPurged with cmd:{}'.format(data))
146         ref = api_monitoring_base + \
147             monitor_api_version + '/alarms/' + data['id']
148         cmd = imscmd.PurgeAlarmEvent(data=AlarmEvent2SMO(
149             id=data['id'], ref=ref,
150             eventtype=data['notificationEventType'],
151             updatetime=data['updatetime']))
152         bus.handle(cmd)
153     else:
154         logger.info("unhandled:{}".format(channel))
155
156
157 if __name__ == "__main__":
158     main()