From 250c3960b727b9549581b50b88849218ef951609 Mon Sep 17 00:00:00 2001 From: "Zhang Rong(Jon)" Date: Mon, 22 May 2023 22:30:09 +0800 Subject: [PATCH] Add oAuth2 for subscription and registration with SMO Issue-ID: INF-412 Change-Id: I3190b40b919d3f968f7a57b4b833c1f783d2a93c Signed-off-by: Zhang Rong(Jon) --- o2app/bootstrap.py | 4 +- o2app/entrypoints/redis_eventconsumer.py | 6 +- o2common/adapter/notifications.py | 52 ++++++++++- o2common/service/command/handler.py | 125 +++++++++++++++++++++++--- o2ims/service/command/notify_alarm_handler.py | 48 ++-------- o2ims/service/command/notify_handler.py | 104 +++++++++++---------- o2ims/service/command/registration_handler.py | 43 ++------- requirements.txt | 1 + tests/integration/test_common.py | 91 +++++++++++++++++++ 9 files changed, 326 insertions(+), 148 deletions(-) create mode 100644 tests/integration/test_common.py diff --git a/o2app/bootstrap.py b/o2app/bootstrap.py index e025a90..7679ef3 100644 --- a/o2app/bootstrap.py +++ b/o2app/bootstrap.py @@ -17,7 +17,7 @@ import inspect from typing import Callable from o2common.adapter.notifications import AbstractNotifications,\ - SmoO2Notifications + NoneNotifications from o2common.adapter import redis_eventpublisher from o2common.service import unit_of_work from o2common.service import messagebus @@ -49,7 +49,7 @@ def bootstrap( ) -> messagebus.MessageBus: if notifications is None: - notifications = SmoO2Notifications() + notifications = NoneNotifications() if start_orm: with uow: diff --git a/o2app/entrypoints/redis_eventconsumer.py b/o2app/entrypoints/redis_eventconsumer.py index 458514f..c95133a 100644 --- a/o2app/entrypoints/redis_eventconsumer.py +++ b/o2app/entrypoints/redis_eventconsumer.py @@ -18,6 +18,7 @@ import redis import json from o2app import bootstrap from o2common.config import config +from o2common.adapter.notifications import SmoNotifications from o2dms.domain import commands from o2ims.domain import commands as imscmd from o2ims.domain.subscription_obj import Message2SMO, RegistrationMessage @@ -36,7 +37,10 @@ inventory_api_version = config.get_o2ims_inventory_api_v1() def main(): logger.info("Redis pubsub starting") - bus = bootstrap.bootstrap() + + notifications = SmoNotifications() + bus = bootstrap.bootstrap(notifications=notifications) + pubsub = r.pubsub(ignore_subscribe_messages=True) pubsub.subscribe("NfDeploymentStateChanged") pubsub.subscribe('OcloudChanged') diff --git a/o2common/adapter/notifications.py b/o2common/adapter/notifications.py index e6ba8b2..d8eace2 100644 --- a/o2common/adapter/notifications.py +++ b/o2common/adapter/notifications.py @@ -1,20 +1,66 @@ +# Copyright (C) 2021-2023 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. # pylint: disable=too-few-public-methods import abc -from o2common.config import config +from o2common.config import config, conf +from o2common.service.command.handler import SMOClient + +from o2common.helper import o2logging +logger = o2logging.get_logger(__name__) SMO_O2_ENDPOINT = config.get_smo_o2endpoint() class AbstractNotifications(abc.ABC): @abc.abstractmethod - def send(self, message): + def send(self, url, message): raise NotImplementedError +class NoneNotifications(AbstractNotifications): + def __init__(self): + pass + + def send(self, url, message): + pass + + class SmoO2Notifications(AbstractNotifications): def __init__(self, smoO2Endpoint=SMO_O2_ENDPOINT): self.smoO2Endpoint = smoO2Endpoint - def send(self, message): + def send(self, url, message): pass + + +class SmoNotifications(AbstractNotifications): + def __init__(self): + logger.debug('In SmoNotifications') + if conf.PUBSUB.SMO_AUTH_URL is not None \ + and conf.PUBSUB.SMO_AUTH_URL != '': + logger.debug(f'SMO_AUTH_URL is {conf.PUBSUB.SMO_AUTH_URL}') + self.smo_client = SMOClient( + conf.PUBSUB.SMO_CLIENT_ID, conf.PUBSUB.SMO_AUTH_URL, + conf.PUBSUB.SMO_USERNAME, conf.PUBSUB.SMO_PASSWORD, + use_oauth=True) + else: + self.smo_client = SMOClient() + + def send(self, url, message): + try: + return self.smo_client.post(url, message) + except Exception as e: + logger.critical('Notify except: {}'.format(e)) + return False diff --git a/o2common/service/command/handler.py b/o2common/service/command/handler.py index 83eafef..624781a 100644 --- a/o2common/service/command/handler.py +++ b/o2common/service/command/handler.py @@ -12,28 +12,23 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os +import requests +import json import http.client import ssl +from requests_oauthlib import OAuth2Session +from oauthlib.oauth2 import LegacyApplicationClient +from requests.packages.urllib3.util.retry import Retry +from requests.adapters import HTTPAdapter +from requests.exceptions import RequestException, SSLError + from o2common.helper import o2logging from o2common.config import config logger = o2logging.get_logger(__name__) -def post_data(conn, path, data): - headers = {'Content-type': 'application/json'} - conn.request('POST', path, data, headers) - resp = conn.getresponse() - data = resp.read().decode('utf-8') - # json_data = json.loads(data) - if resp.status >= 200 and resp.status <= 299: - logger.info('Post data to SMO successed, response code {} {}, data {}'. - format(resp.status, resp.reason, data)) - return True, resp.status - logger.error('Response code is: {}'.format(resp.status)) - return False, resp.status - - def get_http_conn(callbackurl): conn = http.client.HTTPConnection(callbackurl) return conn @@ -59,3 +54,105 @@ def get_https_conn_selfsigned(callbackurl): sslctx.verify_mode = ssl.CERT_REQUIRED conn = http.client.HTTPSConnection(callbackurl, context=sslctx) return conn + + +class SMOClient: + def __init__(self, client_id=None, token_url=None, username=None, + password=None, scope=None, retries=3, use_oauth=False): + self.client_id = client_id + self.token_url = token_url + self.username = username + self.password = password + self.scope = scope if scope else [] + self.use_oauth = use_oauth + self.retries = retries + + if self.use_oauth: + if not all([self.client_id, self.token_url, self.username, + self.password]): + raise ValueError( + 'client_id, token_url, username, and password ' + + 'must be provided when use_oauth is True.') + + # Set OAUTHLIB_INSECURE_TRANSPORT environment variable + # if token_url uses http + if 'http://' in self.token_url: + os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1' + + # Create a LegacyApplicationClient for handling password flow + client = LegacyApplicationClient(client_id=self.client_id) + self.session = OAuth2Session(client=client) + + # Check if token_url uses https and set SSL verification + if 'https://' in self.token_url: + ca_path = config.get_smo_ca_config_path() + if os.path.exists(ca_path): + self.session.verify = ca_path + else: + self.session.verify = True + + # Fetch the access token + self.fetch_token(self.session.verify) + else: + self.session = requests.Session() + + # Create a Retry object for handling retries + retry_strategy = Retry( + total=retries, + backoff_factor=1, + status_forcelist=[429, 500, 502, 503, 504], + allowed_methods=["HEAD", "GET", "OPTIONS", "POST"] + ) + adapter = HTTPAdapter(max_retries=retry_strategy) + self.session.mount("https://", adapter) + self.session.mount("http://", adapter) + + def fetch_token(self, verify): + try: + self.session.fetch_token( + token_url=self.token_url, + username=self.username, + password=self.password, + client_id=self.client_id, + verify=verify + ) + except SSLError: + # If SSLError is raised, try again with verify=False + logger.warning('The SSLError occurred') + if verify is not False: + self.fetch_token(verify=False) + + def handle_post_data(self, resp): + if resp.status_code >= 200 and resp.status_code < 300: + return True + logger.error('Response code is: {}'.format(resp.status_code)) + # TODO: write the status to extension db table. + return False + + def post(self, url, data, retries=1): + if not all([url, data]): + raise ValueError( + 'url, data must be provided when call the post.') + + # Check if token_url uses https and set SSL verification + if 'https://' in url: + ca_path = config.get_smo_ca_config_path() + if os.path.exists(ca_path): + self.session.verify = ca_path + else: + self.session.verify = True + + if retries is None: + retries = self.retries + + for _ in range(retries): + try: + response = self.session.post( + url, data=json.dumps(data)) + response.raise_for_status() + return self.handle_post_data(response) + except (SSLError, RequestException) as e: + logger.warning(f'Error occurred: {e}') + pass + raise Exception( + f"POST request to {url} failed after {retries} retries.") diff --git a/o2ims/service/command/notify_alarm_handler.py b/o2ims/service/command/notify_alarm_handler.py index 3d44e64..81cbeaa 100644 --- a/o2ims/service/command/notify_alarm_handler.py +++ b/o2ims/service/command/notify_alarm_handler.py @@ -14,17 +14,12 @@ # import redis # import requests -import ssl import json -from urllib.parse import urlparse from o2common.config import conf from o2common.domain.filter import gen_orm_filter from o2common.service.unit_of_work import AbstractUnitOfWork -from o2common.service.command.handler import get_https_conn_default -from o2common.service.command.handler import get_http_conn -from o2common.service.command.handler import get_https_conn_selfsigned -from o2common.service.command.handler import post_data +from o2common.adapter.notifications import AbstractNotifications from o2ims.domain import commands from o2ims.domain.alarm_obj import AlarmSubscription, AlarmEvent2SMO, \ @@ -37,6 +32,7 @@ logger = o2logging.get_logger(__name__) def notify_alarm_to_smo( cmd: commands.PubAlarm2SMO, uow: AbstractUnitOfWork, + notifications: AbstractNotifications, ): logger.debug('In notify_alarm_to_smo') data = cmd.data @@ -53,7 +49,7 @@ def notify_alarm_to_smo( sub_data['alarmSubscriptionId'])) if not sub_data.get('filter', None): - callback_smo(sub, data, alarm) + callback_smo(notifications, sub, data, alarm) continue try: args = gen_orm_filter(AlarmEventRecord, sub_data['filter']) @@ -63,7 +59,7 @@ def notify_alarm_to_smo( 'name or value. Ignore the filter'.format( sub_data['alarmSubscriptionId'], sub_data['filter'])) - callback_smo(sub, data, alarm) + callback_smo(notifications, sub, data, alarm) continue args.append(AlarmEventRecord.alarmEventRecordId == data.id) ret = uow.alarm_event_records.list_with_count(*args) @@ -73,11 +69,11 @@ def notify_alarm_to_smo( 'the filter.' .format(data.id, sub_data['alarmSubscriptionId'])) continue - callback_smo(sub, data, alarm) + callback_smo(notifications, sub, data, alarm) -def callback_smo(sub: AlarmSubscription, msg: AlarmEvent2SMO, - alarm: AlarmEventRecord): +def callback_smo(notifications: AbstractNotifications, sub: AlarmSubscription, + msg: AlarmEvent2SMO, alarm: AlarmEventRecord): sub_data = sub.serialize() alarm_data = alarm.serialize() callback = { @@ -102,32 +98,4 @@ def callback_smo(sub: AlarmSubscription, msg: AlarmEvent2SMO, logger.info('callback URL: {}'.format(sub_data['callback'])) logger.debug('callback data: {}'.format(callback_data)) - o = urlparse(sub_data['callback']) - if o.scheme == 'https': - conn = get_https_conn_default(o.netloc) - else: - conn = get_http_conn(o.netloc) - try: - rst, status = post_data(conn, o.path, callback_data) - if rst is True: - logger.info( - 'Notify alarm to SMO successed with status: {}'.format(status)) - return - logger.error('Notify alarm Response code is: {}'.format(status)) - except ssl.SSLCertVerificationError as e: - logger.debug( - 'Notify alarm try to post data with trusted ca \ - failed: {}'.format(e)) - if 'self signed' in str(e): - conn = get_https_conn_selfsigned(o.netloc) - try: - return post_data(conn, o.path, callback_data) - except Exception as e: - logger.info( - 'Notify alarm with self-signed ca failed: {}'.format(e)) - # TODO: write the status to extension db table. - return False - return False - except Exception as e: - logger.critical('Notify alarm except: {}'.format(e)) - return False + return notifications.send(sub_data['callback'], callback_data) diff --git a/o2ims/service/command/notify_handler.py b/o2ims/service/command/notify_handler.py index 34470f3..19363f9 100644 --- a/o2ims/service/command/notify_handler.py +++ b/o2ims/service/command/notify_handler.py @@ -15,16 +15,11 @@ # import redis # import requests import json -import ssl -from urllib.parse import urlparse -# from o2common.config import config +# from o2common.config import conf from o2common.domain.filter import gen_orm_filter from o2common.service.unit_of_work import AbstractUnitOfWork -from o2common.service.command.handler import get_https_conn_default -from o2common.service.command.handler import get_http_conn -from o2common.service.command.handler import get_https_conn_selfsigned -from o2common.service.command.handler import post_data +from o2common.adapter.notifications import AbstractNotifications from o2ims.domain import commands, ocloud from o2ims.domain.subscription_obj import Subscription, Message2SMO, \ @@ -41,20 +36,21 @@ logger = o2logging.get_logger(__name__) def notify_change_to_smo( cmd: commands.PubMessage2SMO, uow: AbstractUnitOfWork, + notifications: AbstractNotifications, ): logger.debug('In notify_change_to_smo') msg_type = cmd.type if msg_type == 'ResourceType': - _notify_resourcetype(uow, cmd.data) + _notify_resourcetype(uow, notifications, cmd.data) elif msg_type == 'ResourcePool': - _notify_resourcepool(uow, cmd.data) + _notify_resourcepool(uow, notifications, cmd.data) elif msg_type == 'Dms': - _notify_dms(uow, cmd.data) + _notify_dms(uow, notifications, cmd.data) elif msg_type == 'Resource': - _notify_resource(uow, cmd.data) + _notify_resource(uow, notifications, cmd.data) -def _notify_resourcetype(uow, data): +def _notify_resourcetype(uow, notifications, data): with uow: resource_type = uow.resource_types.get(data.id) if resource_type is None: @@ -76,7 +72,7 @@ def _notify_resourcetype(uow, data): logger.debug('Subscription: {}'.format(sub_data['subscriptionId'])) filters = handle_filter(sub_data['filter'], 'ResourceTypeInfo') if not filters: - callback_smo(sub, data, resource_type_dict) + callback_smo(notifications, sub, data, resource_type_dict) continue filter_hit = False for filter in filters: @@ -101,10 +97,10 @@ def _notify_resourcetype(uow, data): logger.info('Subscription {} filter hit, skip ResourceType {}.' .format(sub_data['subscriptionId'], data.id)) else: - callback_smo(sub, data, resource_type_dict) + callback_smo(notifications, sub, data, resource_type_dict) -def _notify_resourcepool(uow, data): +def _notify_resourcepool(uow, notifications, data): with uow: resource_pool = uow.resource_pools.get(data.id) if resource_pool is None: @@ -124,7 +120,7 @@ def _notify_resourcepool(uow, data): logger.debug('Subscription: {}'.format(sub_data['subscriptionId'])) filters = handle_filter(sub_data['filter'], 'ResourcePoolInfo') if not filters: - callback_smo(sub, data, resource_pool_dict) + callback_smo(notifications, sub, data, resource_pool_dict) continue filter_hit = False for filter in filters: @@ -149,10 +145,10 @@ def _notify_resourcepool(uow, data): logger.info('Subscription {} filter hit, skip ResourcePool {}.' .format(sub_data['subscriptionId'], data.id)) else: - callback_smo(sub, data, resource_pool_dict) + callback_smo(notifications, sub, data, resource_pool_dict) -def _notify_dms(uow, data): +def _notify_dms(uow, notifications, data): with uow: dms = uow.deployment_managers.get(data.id) if dms is None: @@ -174,7 +170,7 @@ def _notify_dms(uow, data): filters = handle_filter( sub_data['filter'], 'DeploymentManagerInfo') if not filters: - callback_smo(sub, data, dms_dict) + callback_smo(notifications, sub, data, dms_dict) continue filter_hit = False for filter in filters: @@ -201,10 +197,10 @@ def _notify_dms(uow, data): 'DeploymentManager {}.' .format(sub_data['subscriptionId'], data.id)) else: - callback_smo(sub, data, dms_dict) + callback_smo(notifications, sub, data, dms_dict) -def _notify_resource(uow, data): +def _notify_resource(uow, notifications, data): with uow: resource = uow.resources.get(data.id) if resource is None: @@ -226,7 +222,7 @@ def _notify_resource(uow, data): logger.debug('Subscription: {}'.format(sub_data['subscriptionId'])) filters = handle_filter(sub_data['filter'], 'ResourceInfo') if not filters: - callback_smo(sub, data, res_dict) + callback_smo(notifications, sub, data, res_dict) continue filter_hit = False for filter in filters: @@ -252,7 +248,7 @@ def _notify_resource(uow, data): logger.info('Subscription {} filter hit, skip Resource {}.' .format(sub_data['subscriptionId'], data.id)) else: - callback_smo(sub, data, res_dict) + callback_smo(notifications, sub, data, res_dict) def handle_filter(filter: str, f_type: str): @@ -282,7 +278,8 @@ def handle_filter(filter: str, f_type: str): return filters -def callback_smo(sub: Subscription, msg: Message2SMO, obj_dict: dict = None): +def callback_smo(notifications: AbstractNotifications, sub: Subscription, + msg: Message2SMO, obj_dict: dict = None): sub_data = sub.serialize() callback = { 'consumerSubscriptionId': sub_data['consumerSubscriptionId'], @@ -302,33 +299,34 @@ def callback_smo(sub: Subscription, msg: Message2SMO, obj_dict: dict = None): logger.info('callback URL: {}'.format(sub_data['callback'])) logger.debug('callback data: {}'.format(callback_data)) + return notifications.send(sub_data['callback'], callback_data) + # Call SMO through the SMO callback url - o = urlparse(sub_data['callback']) - if o.scheme == 'https': - conn = get_https_conn_default(o.netloc) - else: - conn = get_http_conn(o.netloc) - try: - rst, status = post_data(conn, o.path, callback_data) - if rst is True: - logger.info( - 'Notify to SMO successed with status: {}'.format(status)) - return - logger.error('Notify Response code is: {}'.format(status)) - except ssl.SSLCertVerificationError as e: - logger.debug( - 'Notify try to post data with trusted ca failed: {}'.format(e)) - if 'self signed' in str(e): - conn = get_https_conn_selfsigned(o.netloc) - try: - return post_data(conn, o.path, callback_data) - except Exception as e: - logger.info( - 'Notify post data with self-signed ca \ - failed: {}'.format(e)) - # TODO: write the status to extension db table. - return False - return False - except Exception as e: - logger.critical('Notify except: {}'.format(e)) - return False + # o = urlparse(sub_data['callback']) + # if o.scheme == 'https': + # conn = get_https_conn_default(o.netloc) + # else: + # conn = get_http_conn(o.netloc) + # try: + # rst, status = post_data(conn, o.path, callback_data) + # if rst is True: + # logger.info( + # 'Notify to SMO successed with status: {}'.format(status)) + # return + # logger.error('Notify Response code is: {}'.format(status)) + # except ssl.SSLCertVerificationError as e: + # logger.debug( + # 'Notify try to post data with trusted ca failed: {}'.format(e)) + # if 'self signed' in str(e): + # conn = get_https_conn_selfsigned(o.netloc) + # try: + # return post_data(conn, o.path, callback_data) + # except Exception as e: + # logger.info( + # 'Notify post data with self-signed ca \ + # failed: {}'.format(e)) + # return False + # return False + # except Exception as e: + # logger.critical('Notify except: {}'.format(e)) + # return False diff --git a/o2ims/service/command/registration_handler.py b/o2ims/service/command/registration_handler.py index 21cbbf8..8531a2d 100644 --- a/o2ims/service/command/registration_handler.py +++ b/o2ims/service/command/registration_handler.py @@ -12,16 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -import ssl import json -from retry import retry -from urllib.parse import urlparse from o2common.config import config, conf from o2common.domain.filter import gen_orm_filter from o2common.service.unit_of_work import AbstractUnitOfWork -from o2common.service.command.handler import get_https_conn_default, \ - get_http_conn, get_https_conn_selfsigned, post_data +from o2common.adapter.notifications import AbstractNotifications from o2ims.domain import commands, ocloud as cloud from o2ims.domain.subscription_obj import Message2SMO, NotificationEventEnum @@ -39,6 +35,7 @@ inventory_api_version = config.get_o2ims_inventory_api_v1() def registry_to_smo( cmd: commands.Register2SMO, uow: AbstractUnitOfWork, + notifications: AbstractNotifications, ): logger.debug('In registry_to_smo') data = cmd.data @@ -60,7 +57,7 @@ def registry_to_smo( 'serviceUri': ocloud.serviceUri } if data.notificationEventType == NotificationEventEnum.CREATE: - register_smo(uow, ocloud_dict) + register_smo(notifications, ocloud_dict) elif data.notificationEventType in [NotificationEventEnum.MODIFY, NotificationEventEnum.DELETE]: _notify_ocloud(uow, data, ocloud_dict) @@ -71,11 +68,11 @@ class RegIMSToSMOExp(Exception): self.value = value -def register_smo(uow, ocloud_data): - call_res, status = call_smo(ocloud_data) +def register_smo(notifications, ocloud_data): + call_res = call_smo(notifications, ocloud_data) logger.debug('Call SMO response is {}'.format(call_res)) if call_res is True: - logger.info('Register to smo success response is {}'.format(status)) + logger.info('Register to smo success response') else: raise RegIMSToSMOExp('Register o2ims to SMO failed') # TODO: record the result for the smo register @@ -121,8 +118,7 @@ def _notify_ocloud(uow, data, ocloud_dict): callback_smo(sub, msg, ocloud_dict) -@retry((ConnectionRefusedError), tries=2, delay=2) -def call_smo(reg_data: dict): +def call_smo(notifications: AbstractNotifications, reg_data: dict): smo_token = conf.DEFAULT.smo_token_data smo_token_info = { 'iss': 'o2ims', @@ -141,27 +137,4 @@ def call_smo(reg_data: dict): }) logger.info('callback URL: {}'.format(conf.DEFAULT.smo_register_url)) logger.debug('callback data: {}'.format(callback_data)) - o = urlparse(conf.DEFAULT.smo_register_url) - if o.scheme == 'https': - conn = get_https_conn_default(o.netloc) - else: - conn = get_http_conn(o.netloc) - - try: - return post_data(conn, o.path, callback_data) - except ssl.SSLCertVerificationError as e: - logger.debug('Try to register to smo with \ - trusted ca failed: {}'.format(e)) - if 'self signed' in str(e): - conn = get_https_conn_selfsigned(o.netloc) - try: - return post_data(conn, o.path, callback_data) - except Exception as e: - logger.info( - 'Register to smo with self-signed ca failed: {}'.format(e)) - # TODO: write the status to extension db table. - return False, None - return False, None - except Exception as e: - logger.critical('Register to smo except: {}'.format(e)) - return False, None + return notifications.send(conf.DEFAULT.smo_register_url, callback_data) diff --git a/requirements.txt b/requirements.txt index 60705b9..f68a4f3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -12,6 +12,7 @@ httplib2 babel PrettyTable<0.8,>=0.7.2 +requests-oauthlib retry ruamel.yaml==0.17.17 diff --git a/tests/integration/test_common.py b/tests/integration/test_common.py new file mode 100644 index 0000000..210b2d1 --- /dev/null +++ b/tests/integration/test_common.py @@ -0,0 +1,91 @@ +# Copyright (C) 2021-2023 Wind River Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import json +from urllib.parse import urlparse, urlunparse + +from o2common.service.command.handler import SMOClient + + +def test_smo_with_oauth2(): + # Replace these with actual values + client_id = 'client_id' + token_url = 'http://128.224.115.32:1080/mock_smo/v1/auth/token' + username = 'admin' + password = 'admin' + url = 'http://128.224.115.32:1080/mock_smo/v1/ocloud_observer' + data = {"key": "value"} + + client = SMOClient(client_id=client_id, token_url=token_url, + username=username, password=password, + use_oauth=True) + + # Fetch the token + client.fetch_token(client.session.verify) + + # Make a POST request + response = client.post(url=url, data=json.dumps(data)) + + # Check the status code + assert response is True + + # Check the response data if you expect any + # response_data = json.loads(response.text) + # assert response_data == expected_data + + # --------------- HTTPS ---------------- # + parsed_token_url = urlparse(token_url) + parsed_token_url = parsed_token_url._replace(scheme='https') + token_url1 = urlunparse(parsed_token_url) + + parsed_url = urlparse(url) + parsed_url = parsed_url._replace(scheme='https') + url1 = urlunparse(parsed_url) + + client = SMOClient(client_id=client_id, token_url=token_url1, + username=username, password=password, + use_oauth=True) + + # Fetch the token + client.fetch_token(client.session.verify) + + # Make a POST request + response = client.post(url=url1, data=json.dumps(data)) + + # Check the status code + assert response is True + + +def test_smo_client(): + url = 'http://128.224.115.32:1080/mock_smo/v1/o2ims_inventory_observer' + data = {"key": "value"} + + client = SMOClient() + + # Make a POST request + response = client.post(url=url, data=json.dumps(data)) + # Check the status code + assert response is True + + # Check the response data if you expect any + # response_data = json.loads(response.text) + # assert response_data == expected_data + + parsed_url = urlparse(url) + parsed_url = parsed_url._replace(scheme='https') + url1 = urlunparse(parsed_url) + + # Make a POST request + response = client.post(url=url1, data=json.dumps(data)) + # Check the status code + assert response is True -- 2.16.6