from typing import Callable
from o2common.adapter.notifications import AbstractNotifications,\
- SmoO2Notifications
+ NoneNotifications
from o2common.adapter import redis_eventpublisher
from o2common.service import unit_of_work
from o2common.service import messagebus
) -> messagebus.MessageBus:
if notifications is None:
- notifications = SmoO2Notifications()
+ notifications = NoneNotifications()
if start_orm:
with uow:
import json
from o2app import bootstrap
from o2common.config import config
+from o2common.adapter.notifications import SmoNotifications
from o2dms.domain import commands
from o2ims.domain import commands as imscmd
from o2ims.domain.subscription_obj import Message2SMO, RegistrationMessage
def main():
logger.info("Redis pubsub starting")
- bus = bootstrap.bootstrap()
+
+ notifications = SmoNotifications()
+ bus = bootstrap.bootstrap(notifications=notifications)
+
pubsub = r.pubsub(ignore_subscribe_messages=True)
pubsub.subscribe("NfDeploymentStateChanged")
pubsub.subscribe('OcloudChanged')
+# Copyright (C) 2021-2023 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
# pylint: disable=too-few-public-methods
import abc
-from o2common.config import config
+from o2common.config import config, conf
+from o2common.service.command.handler import SMOClient
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
SMO_O2_ENDPOINT = config.get_smo_o2endpoint()
class AbstractNotifications(abc.ABC):
@abc.abstractmethod
- def send(self, message):
+ def send(self, url, message):
raise NotImplementedError
+class NoneNotifications(AbstractNotifications):
+ def __init__(self):
+ pass
+
+ def send(self, url, message):
+ pass
+
+
class SmoO2Notifications(AbstractNotifications):
def __init__(self, smoO2Endpoint=SMO_O2_ENDPOINT):
self.smoO2Endpoint = smoO2Endpoint
- def send(self, message):
+ def send(self, url, message):
pass
+
+
+class SmoNotifications(AbstractNotifications):
+ def __init__(self):
+ logger.debug('In SmoNotifications')
+ if conf.PUBSUB.SMO_AUTH_URL is not None \
+ and conf.PUBSUB.SMO_AUTH_URL != '':
+ logger.debug(f'SMO_AUTH_URL is {conf.PUBSUB.SMO_AUTH_URL}')
+ self.smo_client = SMOClient(
+ conf.PUBSUB.SMO_CLIENT_ID, conf.PUBSUB.SMO_AUTH_URL,
+ conf.PUBSUB.SMO_USERNAME, conf.PUBSUB.SMO_PASSWORD,
+ use_oauth=True)
+ else:
+ self.smo_client = SMOClient()
+
+ def send(self, url, message):
+ try:
+ return self.smo_client.post(url, message)
+ except Exception as e:
+ logger.critical('Notify except: {}'.format(e))
+ return False
# See the License for the specific language governing permissions and
# limitations under the License.
+import os
+import requests
+import json
import http.client
import ssl
+from requests_oauthlib import OAuth2Session
+from oauthlib.oauth2 import LegacyApplicationClient
+from requests.packages.urllib3.util.retry import Retry
+from requests.adapters import HTTPAdapter
+from requests.exceptions import RequestException, SSLError
+
from o2common.helper import o2logging
from o2common.config import config
logger = o2logging.get_logger(__name__)
-def post_data(conn, path, data):
- headers = {'Content-type': 'application/json'}
- conn.request('POST', path, data, headers)
- resp = conn.getresponse()
- data = resp.read().decode('utf-8')
- # json_data = json.loads(data)
- if resp.status >= 200 and resp.status <= 299:
- logger.info('Post data to SMO successed, response code {} {}, data {}'.
- format(resp.status, resp.reason, data))
- return True, resp.status
- logger.error('Response code is: {}'.format(resp.status))
- return False, resp.status
-
-
def get_http_conn(callbackurl):
conn = http.client.HTTPConnection(callbackurl)
return conn
sslctx.verify_mode = ssl.CERT_REQUIRED
conn = http.client.HTTPSConnection(callbackurl, context=sslctx)
return conn
+
+
+class SMOClient:
+ def __init__(self, client_id=None, token_url=None, username=None,
+ password=None, scope=None, retries=3, use_oauth=False):
+ self.client_id = client_id
+ self.token_url = token_url
+ self.username = username
+ self.password = password
+ self.scope = scope if scope else []
+ self.use_oauth = use_oauth
+ self.retries = retries
+
+ if self.use_oauth:
+ if not all([self.client_id, self.token_url, self.username,
+ self.password]):
+ raise ValueError(
+ 'client_id, token_url, username, and password ' +
+ 'must be provided when use_oauth is True.')
+
+ # Set OAUTHLIB_INSECURE_TRANSPORT environment variable
+ # if token_url uses http
+ if 'http://' in self.token_url:
+ os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1'
+
+ # Create a LegacyApplicationClient for handling password flow
+ client = LegacyApplicationClient(client_id=self.client_id)
+ self.session = OAuth2Session(client=client)
+
+ # Check if token_url uses https and set SSL verification
+ if 'https://' in self.token_url:
+ ca_path = config.get_smo_ca_config_path()
+ if os.path.exists(ca_path):
+ self.session.verify = ca_path
+ else:
+ self.session.verify = True
+
+ # Fetch the access token
+ self.fetch_token(self.session.verify)
+ else:
+ self.session = requests.Session()
+
+ # Create a Retry object for handling retries
+ retry_strategy = Retry(
+ total=retries,
+ backoff_factor=1,
+ status_forcelist=[429, 500, 502, 503, 504],
+ allowed_methods=["HEAD", "GET", "OPTIONS", "POST"]
+ )
+ adapter = HTTPAdapter(max_retries=retry_strategy)
+ self.session.mount("https://", adapter)
+ self.session.mount("http://", adapter)
+
+ def fetch_token(self, verify):
+ try:
+ self.session.fetch_token(
+ token_url=self.token_url,
+ username=self.username,
+ password=self.password,
+ client_id=self.client_id,
+ verify=verify
+ )
+ except SSLError:
+ # If SSLError is raised, try again with verify=False
+ logger.warning('The SSLError occurred')
+ if verify is not False:
+ self.fetch_token(verify=False)
+
+ def handle_post_data(self, resp):
+ if resp.status_code >= 200 and resp.status_code < 300:
+ return True
+ logger.error('Response code is: {}'.format(resp.status_code))
+ # TODO: write the status to extension db table.
+ return False
+
+ def post(self, url, data, retries=1):
+ if not all([url, data]):
+ raise ValueError(
+ 'url, data must be provided when call the post.')
+
+ # Check if token_url uses https and set SSL verification
+ if 'https://' in url:
+ ca_path = config.get_smo_ca_config_path()
+ if os.path.exists(ca_path):
+ self.session.verify = ca_path
+ else:
+ self.session.verify = True
+
+ if retries is None:
+ retries = self.retries
+
+ for _ in range(retries):
+ try:
+ response = self.session.post(
+ url, data=json.dumps(data))
+ response.raise_for_status()
+ return self.handle_post_data(response)
+ except (SSLError, RequestException) as e:
+ logger.warning(f'Error occurred: {e}')
+ pass
+ raise Exception(
+ f"POST request to {url} failed after {retries} retries.")
# import redis
# import requests
-import ssl
import json
-from urllib.parse import urlparse
from o2common.config import conf
from o2common.domain.filter import gen_orm_filter
from o2common.service.unit_of_work import AbstractUnitOfWork
-from o2common.service.command.handler import get_https_conn_default
-from o2common.service.command.handler import get_http_conn
-from o2common.service.command.handler import get_https_conn_selfsigned
-from o2common.service.command.handler import post_data
+from o2common.adapter.notifications import AbstractNotifications
from o2ims.domain import commands
from o2ims.domain.alarm_obj import AlarmSubscription, AlarmEvent2SMO, \
def notify_alarm_to_smo(
cmd: commands.PubAlarm2SMO,
uow: AbstractUnitOfWork,
+ notifications: AbstractNotifications,
):
logger.debug('In notify_alarm_to_smo')
data = cmd.data
sub_data['alarmSubscriptionId']))
if not sub_data.get('filter', None):
- callback_smo(sub, data, alarm)
+ callback_smo(notifications, sub, data, alarm)
continue
try:
args = gen_orm_filter(AlarmEventRecord, sub_data['filter'])
'name or value. Ignore the filter'.format(
sub_data['alarmSubscriptionId'],
sub_data['filter']))
- callback_smo(sub, data, alarm)
+ callback_smo(notifications, sub, data, alarm)
continue
args.append(AlarmEventRecord.alarmEventRecordId == data.id)
ret = uow.alarm_event_records.list_with_count(*args)
'the filter.'
.format(data.id, sub_data['alarmSubscriptionId']))
continue
- callback_smo(sub, data, alarm)
+ callback_smo(notifications, sub, data, alarm)
-def callback_smo(sub: AlarmSubscription, msg: AlarmEvent2SMO,
- alarm: AlarmEventRecord):
+def callback_smo(notifications: AbstractNotifications, sub: AlarmSubscription,
+ msg: AlarmEvent2SMO, alarm: AlarmEventRecord):
sub_data = sub.serialize()
alarm_data = alarm.serialize()
callback = {
logger.info('callback URL: {}'.format(sub_data['callback']))
logger.debug('callback data: {}'.format(callback_data))
- o = urlparse(sub_data['callback'])
- if o.scheme == 'https':
- conn = get_https_conn_default(o.netloc)
- else:
- conn = get_http_conn(o.netloc)
- try:
- rst, status = post_data(conn, o.path, callback_data)
- if rst is True:
- logger.info(
- 'Notify alarm to SMO successed with status: {}'.format(status))
- return
- logger.error('Notify alarm Response code is: {}'.format(status))
- except ssl.SSLCertVerificationError as e:
- logger.debug(
- 'Notify alarm try to post data with trusted ca \
- failed: {}'.format(e))
- if 'self signed' in str(e):
- conn = get_https_conn_selfsigned(o.netloc)
- try:
- return post_data(conn, o.path, callback_data)
- except Exception as e:
- logger.info(
- 'Notify alarm with self-signed ca failed: {}'.format(e))
- # TODO: write the status to extension db table.
- return False
- return False
- except Exception as e:
- logger.critical('Notify alarm except: {}'.format(e))
- return False
+ return notifications.send(sub_data['callback'], callback_data)
# import redis
# import requests
import json
-import ssl
-from urllib.parse import urlparse
-# from o2common.config import config
+# from o2common.config import conf
from o2common.domain.filter import gen_orm_filter
from o2common.service.unit_of_work import AbstractUnitOfWork
-from o2common.service.command.handler import get_https_conn_default
-from o2common.service.command.handler import get_http_conn
-from o2common.service.command.handler import get_https_conn_selfsigned
-from o2common.service.command.handler import post_data
+from o2common.adapter.notifications import AbstractNotifications
from o2ims.domain import commands, ocloud
from o2ims.domain.subscription_obj import Subscription, Message2SMO, \
def notify_change_to_smo(
cmd: commands.PubMessage2SMO,
uow: AbstractUnitOfWork,
+ notifications: AbstractNotifications,
):
logger.debug('In notify_change_to_smo')
msg_type = cmd.type
if msg_type == 'ResourceType':
- _notify_resourcetype(uow, cmd.data)
+ _notify_resourcetype(uow, notifications, cmd.data)
elif msg_type == 'ResourcePool':
- _notify_resourcepool(uow, cmd.data)
+ _notify_resourcepool(uow, notifications, cmd.data)
elif msg_type == 'Dms':
- _notify_dms(uow, cmd.data)
+ _notify_dms(uow, notifications, cmd.data)
elif msg_type == 'Resource':
- _notify_resource(uow, cmd.data)
+ _notify_resource(uow, notifications, cmd.data)
-def _notify_resourcetype(uow, data):
+def _notify_resourcetype(uow, notifications, data):
with uow:
resource_type = uow.resource_types.get(data.id)
if resource_type is None:
logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
filters = handle_filter(sub_data['filter'], 'ResourceTypeInfo')
if not filters:
- callback_smo(sub, data, resource_type_dict)
+ callback_smo(notifications, sub, data, resource_type_dict)
continue
filter_hit = False
for filter in filters:
logger.info('Subscription {} filter hit, skip ResourceType {}.'
.format(sub_data['subscriptionId'], data.id))
else:
- callback_smo(sub, data, resource_type_dict)
+ callback_smo(notifications, sub, data, resource_type_dict)
-def _notify_resourcepool(uow, data):
+def _notify_resourcepool(uow, notifications, data):
with uow:
resource_pool = uow.resource_pools.get(data.id)
if resource_pool is None:
logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
filters = handle_filter(sub_data['filter'], 'ResourcePoolInfo')
if not filters:
- callback_smo(sub, data, resource_pool_dict)
+ callback_smo(notifications, sub, data, resource_pool_dict)
continue
filter_hit = False
for filter in filters:
logger.info('Subscription {} filter hit, skip ResourcePool {}.'
.format(sub_data['subscriptionId'], data.id))
else:
- callback_smo(sub, data, resource_pool_dict)
+ callback_smo(notifications, sub, data, resource_pool_dict)
-def _notify_dms(uow, data):
+def _notify_dms(uow, notifications, data):
with uow:
dms = uow.deployment_managers.get(data.id)
if dms is None:
filters = handle_filter(
sub_data['filter'], 'DeploymentManagerInfo')
if not filters:
- callback_smo(sub, data, dms_dict)
+ callback_smo(notifications, sub, data, dms_dict)
continue
filter_hit = False
for filter in filters:
'DeploymentManager {}.'
.format(sub_data['subscriptionId'], data.id))
else:
- callback_smo(sub, data, dms_dict)
+ callback_smo(notifications, sub, data, dms_dict)
-def _notify_resource(uow, data):
+def _notify_resource(uow, notifications, data):
with uow:
resource = uow.resources.get(data.id)
if resource is None:
logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
filters = handle_filter(sub_data['filter'], 'ResourceInfo')
if not filters:
- callback_smo(sub, data, res_dict)
+ callback_smo(notifications, sub, data, res_dict)
continue
filter_hit = False
for filter in filters:
logger.info('Subscription {} filter hit, skip Resource {}.'
.format(sub_data['subscriptionId'], data.id))
else:
- callback_smo(sub, data, res_dict)
+ callback_smo(notifications, sub, data, res_dict)
def handle_filter(filter: str, f_type: str):
return filters
-def callback_smo(sub: Subscription, msg: Message2SMO, obj_dict: dict = None):
+def callback_smo(notifications: AbstractNotifications, sub: Subscription,
+ msg: Message2SMO, obj_dict: dict = None):
sub_data = sub.serialize()
callback = {
'consumerSubscriptionId': sub_data['consumerSubscriptionId'],
logger.info('callback URL: {}'.format(sub_data['callback']))
logger.debug('callback data: {}'.format(callback_data))
+ return notifications.send(sub_data['callback'], callback_data)
+
# Call SMO through the SMO callback url
- o = urlparse(sub_data['callback'])
- if o.scheme == 'https':
- conn = get_https_conn_default(o.netloc)
- else:
- conn = get_http_conn(o.netloc)
- try:
- rst, status = post_data(conn, o.path, callback_data)
- if rst is True:
- logger.info(
- 'Notify to SMO successed with status: {}'.format(status))
- return
- logger.error('Notify Response code is: {}'.format(status))
- except ssl.SSLCertVerificationError as e:
- logger.debug(
- 'Notify try to post data with trusted ca failed: {}'.format(e))
- if 'self signed' in str(e):
- conn = get_https_conn_selfsigned(o.netloc)
- try:
- return post_data(conn, o.path, callback_data)
- except Exception as e:
- logger.info(
- 'Notify post data with self-signed ca \
- failed: {}'.format(e))
- # TODO: write the status to extension db table.
- return False
- return False
- except Exception as e:
- logger.critical('Notify except: {}'.format(e))
- return False
+ # o = urlparse(sub_data['callback'])
+ # if o.scheme == 'https':
+ # conn = get_https_conn_default(o.netloc)
+ # else:
+ # conn = get_http_conn(o.netloc)
+ # try:
+ # rst, status = post_data(conn, o.path, callback_data)
+ # if rst is True:
+ # logger.info(
+ # 'Notify to SMO successed with status: {}'.format(status))
+ # return
+ # logger.error('Notify Response code is: {}'.format(status))
+ # except ssl.SSLCertVerificationError as e:
+ # logger.debug(
+ # 'Notify try to post data with trusted ca failed: {}'.format(e))
+ # if 'self signed' in str(e):
+ # conn = get_https_conn_selfsigned(o.netloc)
+ # try:
+ # return post_data(conn, o.path, callback_data)
+ # except Exception as e:
+ # logger.info(
+ # 'Notify post data with self-signed ca \
+ # failed: {}'.format(e))
+ # return False
+ # return False
+ # except Exception as e:
+ # logger.critical('Notify except: {}'.format(e))
+ # return False
# See the License for the specific language governing permissions and
# limitations under the License.
-import ssl
import json
-from retry import retry
-from urllib.parse import urlparse
from o2common.config import config, conf
from o2common.domain.filter import gen_orm_filter
from o2common.service.unit_of_work import AbstractUnitOfWork
-from o2common.service.command.handler import get_https_conn_default, \
- get_http_conn, get_https_conn_selfsigned, post_data
+from o2common.adapter.notifications import AbstractNotifications
from o2ims.domain import commands, ocloud as cloud
from o2ims.domain.subscription_obj import Message2SMO, NotificationEventEnum
def registry_to_smo(
cmd: commands.Register2SMO,
uow: AbstractUnitOfWork,
+ notifications: AbstractNotifications,
):
logger.debug('In registry_to_smo')
data = cmd.data
'serviceUri': ocloud.serviceUri
}
if data.notificationEventType == NotificationEventEnum.CREATE:
- register_smo(uow, ocloud_dict)
+ register_smo(notifications, ocloud_dict)
elif data.notificationEventType in [NotificationEventEnum.MODIFY,
NotificationEventEnum.DELETE]:
_notify_ocloud(uow, data, ocloud_dict)
self.value = value
-def register_smo(uow, ocloud_data):
- call_res, status = call_smo(ocloud_data)
+def register_smo(notifications, ocloud_data):
+ call_res = call_smo(notifications, ocloud_data)
logger.debug('Call SMO response is {}'.format(call_res))
if call_res is True:
- logger.info('Register to smo success response is {}'.format(status))
+ logger.info('Register to smo success response')
else:
raise RegIMSToSMOExp('Register o2ims to SMO failed')
# TODO: record the result for the smo register
callback_smo(sub, msg, ocloud_dict)
-@retry((ConnectionRefusedError), tries=2, delay=2)
-def call_smo(reg_data: dict):
+def call_smo(notifications: AbstractNotifications, reg_data: dict):
smo_token = conf.DEFAULT.smo_token_data
smo_token_info = {
'iss': 'o2ims',
})
logger.info('callback URL: {}'.format(conf.DEFAULT.smo_register_url))
logger.debug('callback data: {}'.format(callback_data))
- o = urlparse(conf.DEFAULT.smo_register_url)
- if o.scheme == 'https':
- conn = get_https_conn_default(o.netloc)
- else:
- conn = get_http_conn(o.netloc)
-
- try:
- return post_data(conn, o.path, callback_data)
- except ssl.SSLCertVerificationError as e:
- logger.debug('Try to register to smo with \
- trusted ca failed: {}'.format(e))
- if 'self signed' in str(e):
- conn = get_https_conn_selfsigned(o.netloc)
- try:
- return post_data(conn, o.path, callback_data)
- except Exception as e:
- logger.info(
- 'Register to smo with self-signed ca failed: {}'.format(e))
- # TODO: write the status to extension db table.
- return False, None
- return False, None
- except Exception as e:
- logger.critical('Register to smo except: {}'.format(e))
- return False, None
+ return notifications.send(conf.DEFAULT.smo_register_url, callback_data)
babel
PrettyTable<0.8,>=0.7.2
+requests-oauthlib
retry
ruamel.yaml==0.17.17
--- /dev/null
+# Copyright (C) 2021-2023 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import json
+from urllib.parse import urlparse, urlunparse
+
+from o2common.service.command.handler import SMOClient
+
+
+def test_smo_with_oauth2():
+ # Replace these with actual values
+ client_id = 'client_id'
+ token_url = 'http://128.224.115.32:1080/mock_smo/v1/auth/token'
+ username = 'admin'
+ password = 'admin'
+ url = 'http://128.224.115.32:1080/mock_smo/v1/ocloud_observer'
+ data = {"key": "value"}
+
+ client = SMOClient(client_id=client_id, token_url=token_url,
+ username=username, password=password,
+ use_oauth=True)
+
+ # Fetch the token
+ client.fetch_token(client.session.verify)
+
+ # Make a POST request
+ response = client.post(url=url, data=json.dumps(data))
+
+ # Check the status code
+ assert response is True
+
+ # Check the response data if you expect any
+ # response_data = json.loads(response.text)
+ # assert response_data == expected_data
+
+ # --------------- HTTPS ---------------- #
+ parsed_token_url = urlparse(token_url)
+ parsed_token_url = parsed_token_url._replace(scheme='https')
+ token_url1 = urlunparse(parsed_token_url)
+
+ parsed_url = urlparse(url)
+ parsed_url = parsed_url._replace(scheme='https')
+ url1 = urlunparse(parsed_url)
+
+ client = SMOClient(client_id=client_id, token_url=token_url1,
+ username=username, password=password,
+ use_oauth=True)
+
+ # Fetch the token
+ client.fetch_token(client.session.verify)
+
+ # Make a POST request
+ response = client.post(url=url1, data=json.dumps(data))
+
+ # Check the status code
+ assert response is True
+
+
+def test_smo_client():
+ url = 'http://128.224.115.32:1080/mock_smo/v1/o2ims_inventory_observer'
+ data = {"key": "value"}
+
+ client = SMOClient()
+
+ # Make a POST request
+ response = client.post(url=url, data=json.dumps(data))
+ # Check the status code
+ assert response is True
+
+ # Check the response data if you expect any
+ # response_data = json.loads(response.text)
+ # assert response_data == expected_data
+
+ parsed_url = urlparse(url)
+ parsed_url = parsed_url._replace(scheme='https')
+ url1 = urlunparse(parsed_url)
+
+ # Make a POST request
+ response = client.post(url=url1, data=json.dumps(data))
+ # Check the status code
+ assert response is True