1 # Copyright (C) 2021-2023 Wind River Systems, Inc.
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14 # pylint: disable=too-few-public-methods
17 from o2common.config import config, conf
18 from o2common.service.command.handler import SMOClient
20 from o2common.helper import o2logging
21 logger = o2logging.get_logger(__name__)
23 SMO_O2_ENDPOINT = config.get_smo_o2endpoint()
26 class AbstractNotifications(abc.ABC):
28 def send(self, url, message):
29 raise NotImplementedError
32 class NoneNotifications(AbstractNotifications):
36 def send(self, url, message):
40 class SmoO2Notifications(AbstractNotifications):
41 def __init__(self, smoO2Endpoint=SMO_O2_ENDPOINT):
42 self.smoO2Endpoint = smoO2Endpoint
44 def send(self, url, message):
48 class SmoNotifications(AbstractNotifications):
50 logger.debug('In SmoNotifications')
51 if conf.PUBSUB.SMO_AUTH_URL is not None \
52 and conf.PUBSUB.SMO_AUTH_URL != '':
53 logger.debug(f'SMO_AUTH_URL is {conf.PUBSUB.SMO_AUTH_URL}')
54 self.smo_client = SMOClient(
55 conf.PUBSUB.SMO_CLIENT_ID, conf.PUBSUB.SMO_AUTH_URL,
56 conf.PUBSUB.SMO_USERNAME, conf.PUBSUB.SMO_PASSWORD,
59 self.smo_client = SMOClient()
61 def send(self, url, message):
63 return self.smo_client.post(url, message)
64 except Exception as e:
65 logger.critical('Notify except: {}'.format(e))