Add oAuth2 for subscription and registration with SMO 87/11187/1
authorZhang Rong(Jon) <rong.zhang@windriver.com>
Mon, 22 May 2023 14:30:09 +0000 (22:30 +0800)
committerZhang Rong(Jon) <rong.zhang@windriver.com>
Mon, 22 May 2023 14:32:49 +0000 (22:32 +0800)
Issue-ID: INF-412
Change-Id: I3190b40b919d3f968f7a57b4b833c1f783d2a93c
Signed-off-by: Zhang Rong(Jon) <rong.zhang@windriver.com>
o2app/bootstrap.py
o2app/entrypoints/redis_eventconsumer.py
o2common/adapter/notifications.py
o2common/service/command/handler.py
o2ims/service/command/notify_alarm_handler.py
o2ims/service/command/notify_handler.py
o2ims/service/command/registration_handler.py
requirements.txt
tests/integration/test_common.py [new file with mode: 0644]

index e025a90..7679ef3 100644 (file)
@@ -17,7 +17,7 @@ import inspect
 from typing import Callable
 
 from o2common.adapter.notifications import AbstractNotifications,\
-    SmoO2Notifications
+    NoneNotifications
 from o2common.adapter import redis_eventpublisher
 from o2common.service import unit_of_work
 from o2common.service import messagebus
@@ -49,7 +49,7 @@ def bootstrap(
 ) -> messagebus.MessageBus:
 
     if notifications is None:
-        notifications = SmoO2Notifications()
+        notifications = NoneNotifications()
 
     if start_orm:
         with uow:
index 458514f..c95133a 100644 (file)
@@ -18,6 +18,7 @@ import redis
 import json
 from o2app import bootstrap
 from o2common.config import config
+from o2common.adapter.notifications import SmoNotifications
 from o2dms.domain import commands
 from o2ims.domain import commands as imscmd
 from o2ims.domain.subscription_obj import Message2SMO, RegistrationMessage
@@ -36,7 +37,10 @@ inventory_api_version = config.get_o2ims_inventory_api_v1()
 
 def main():
     logger.info("Redis pubsub starting")
-    bus = bootstrap.bootstrap()
+
+    notifications = SmoNotifications()
+    bus = bootstrap.bootstrap(notifications=notifications)
+
     pubsub = r.pubsub(ignore_subscribe_messages=True)
     pubsub.subscribe("NfDeploymentStateChanged")
     pubsub.subscribe('OcloudChanged')
index e6ba8b2..d8eace2 100644 (file)
@@ -1,20 +1,66 @@
+# Copyright (C) 2021-2023 Wind River Systems, Inc.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
 # pylint: disable=too-few-public-methods
 import abc
-from o2common.config import config
 
+from o2common.config import config, conf
+from o2common.service.command.handler import SMOClient
+
+from o2common.helper import o2logging
+logger = o2logging.get_logger(__name__)
 
 SMO_O2_ENDPOINT = config.get_smo_o2endpoint()
 
 
 class AbstractNotifications(abc.ABC):
     @abc.abstractmethod
-    def send(self, message):
+    def send(self, url, message):
         raise NotImplementedError
 
 
+class NoneNotifications(AbstractNotifications):
+    def __init__(self):
+        pass
+
+    def send(self, url, message):
+        pass
+
+
 class SmoO2Notifications(AbstractNotifications):
     def __init__(self, smoO2Endpoint=SMO_O2_ENDPOINT):
         self.smoO2Endpoint = smoO2Endpoint
 
-    def send(self, message):
+    def send(self, url, message):
         pass
+
+
+class SmoNotifications(AbstractNotifications):
+    def __init__(self):
+        logger.debug('In SmoNotifications')
+        if conf.PUBSUB.SMO_AUTH_URL is not None \
+                and conf.PUBSUB.SMO_AUTH_URL != '':
+            logger.debug(f'SMO_AUTH_URL is {conf.PUBSUB.SMO_AUTH_URL}')
+            self.smo_client = SMOClient(
+                conf.PUBSUB.SMO_CLIENT_ID, conf.PUBSUB.SMO_AUTH_URL,
+                conf.PUBSUB.SMO_USERNAME, conf.PUBSUB.SMO_PASSWORD,
+                use_oauth=True)
+        else:
+            self.smo_client = SMOClient()
+
+    def send(self, url, message):
+        try:
+            return self.smo_client.post(url, message)
+        except Exception as e:
+            logger.critical('Notify except: {}'.format(e))
+            return False
index 83eafef..624781a 100644 (file)
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
+import os
+import requests
+import json
 import http.client
 import ssl
+from requests_oauthlib import OAuth2Session
+from oauthlib.oauth2 import LegacyApplicationClient
+from requests.packages.urllib3.util.retry import Retry
+from requests.adapters import HTTPAdapter
+from requests.exceptions import RequestException, SSLError
+
 from o2common.helper import o2logging
 from o2common.config import config
 
 logger = o2logging.get_logger(__name__)
 
 
-def post_data(conn, path, data):
-    headers = {'Content-type': 'application/json'}
-    conn.request('POST', path, data, headers)
-    resp = conn.getresponse()
-    data = resp.read().decode('utf-8')
-    # json_data = json.loads(data)
-    if resp.status >= 200 and resp.status <= 299:
-        logger.info('Post data to SMO successed, response code {} {}, data {}'.
-                    format(resp.status, resp.reason, data))
-        return True, resp.status
-    logger.error('Response code is: {}'.format(resp.status))
-    return False, resp.status
-
-
 def get_http_conn(callbackurl):
     conn = http.client.HTTPConnection(callbackurl)
     return conn
@@ -59,3 +54,105 @@ def get_https_conn_selfsigned(callbackurl):
     sslctx.verify_mode = ssl.CERT_REQUIRED
     conn = http.client.HTTPSConnection(callbackurl, context=sslctx)
     return conn
+
+
+class SMOClient:
+    def __init__(self, client_id=None, token_url=None, username=None,
+                 password=None, scope=None, retries=3, use_oauth=False):
+        self.client_id = client_id
+        self.token_url = token_url
+        self.username = username
+        self.password = password
+        self.scope = scope if scope else []
+        self.use_oauth = use_oauth
+        self.retries = retries
+
+        if self.use_oauth:
+            if not all([self.client_id, self.token_url, self.username,
+                        self.password]):
+                raise ValueError(
+                    'client_id, token_url, username, and password ' +
+                    'must be provided when use_oauth is True.')
+
+            # Set OAUTHLIB_INSECURE_TRANSPORT environment variable
+            # if token_url uses http
+            if 'http://' in self.token_url:
+                os.environ['OAUTHLIB_INSECURE_TRANSPORT'] = '1'
+
+            # Create a LegacyApplicationClient for handling password flow
+            client = LegacyApplicationClient(client_id=self.client_id)
+            self.session = OAuth2Session(client=client)
+
+            # Check if token_url uses https and set SSL verification
+            if 'https://' in self.token_url:
+                ca_path = config.get_smo_ca_config_path()
+                if os.path.exists(ca_path):
+                    self.session.verify = ca_path
+                else:
+                    self.session.verify = True
+
+            # Fetch the access token
+            self.fetch_token(self.session.verify)
+        else:
+            self.session = requests.Session()
+
+        # Create a Retry object for handling retries
+        retry_strategy = Retry(
+            total=retries,
+            backoff_factor=1,
+            status_forcelist=[429, 500, 502, 503, 504],
+            allowed_methods=["HEAD", "GET", "OPTIONS", "POST"]
+        )
+        adapter = HTTPAdapter(max_retries=retry_strategy)
+        self.session.mount("https://", adapter)
+        self.session.mount("http://", adapter)
+
+    def fetch_token(self, verify):
+        try:
+            self.session.fetch_token(
+                token_url=self.token_url,
+                username=self.username,
+                password=self.password,
+                client_id=self.client_id,
+                verify=verify
+            )
+        except SSLError:
+            # If SSLError is raised, try again with verify=False
+            logger.warning('The SSLError occurred')
+            if verify is not False:
+                self.fetch_token(verify=False)
+
+    def handle_post_data(self, resp):
+        if resp.status_code >= 200 and resp.status_code < 300:
+            return True
+        logger.error('Response code is: {}'.format(resp.status_code))
+        # TODO: write the status to extension db table.
+        return False
+
+    def post(self, url, data, retries=1):
+        if not all([url, data]):
+            raise ValueError(
+                'url, data must be provided when call the post.')
+
+        # Check if token_url uses https and set SSL verification
+        if 'https://' in url:
+            ca_path = config.get_smo_ca_config_path()
+            if os.path.exists(ca_path):
+                self.session.verify = ca_path
+            else:
+                self.session.verify = True
+
+        if retries is None:
+            retries = self.retries
+
+        for _ in range(retries):
+            try:
+                response = self.session.post(
+                    url, data=json.dumps(data))
+                response.raise_for_status()
+                return self.handle_post_data(response)
+            except (SSLError, RequestException) as e:
+                logger.warning(f'Error occurred: {e}')
+                pass
+        raise Exception(
+            f"POST request to {url} failed after {retries} retries.")
index 3d44e64..81cbeaa 100644 (file)
 
 # import redis
 # import requests
-import ssl
 import json
-from urllib.parse import urlparse
 
 from o2common.config import conf
 from o2common.domain.filter import gen_orm_filter
 from o2common.service.unit_of_work import AbstractUnitOfWork
-from o2common.service.command.handler import get_https_conn_default
-from o2common.service.command.handler import get_http_conn
-from o2common.service.command.handler import get_https_conn_selfsigned
-from o2common.service.command.handler import post_data
+from o2common.adapter.notifications import AbstractNotifications
 
 from o2ims.domain import commands
 from o2ims.domain.alarm_obj import AlarmSubscription, AlarmEvent2SMO, \
@@ -37,6 +32,7 @@ logger = o2logging.get_logger(__name__)
 def notify_alarm_to_smo(
     cmd: commands.PubAlarm2SMO,
     uow: AbstractUnitOfWork,
+    notifications: AbstractNotifications,
 ):
     logger.debug('In notify_alarm_to_smo')
     data = cmd.data
@@ -53,7 +49,7 @@ def notify_alarm_to_smo(
                 sub_data['alarmSubscriptionId']))
 
             if not sub_data.get('filter', None):
-                callback_smo(sub, data, alarm)
+                callback_smo(notifications, sub, data, alarm)
                 continue
             try:
                 args = gen_orm_filter(AlarmEventRecord, sub_data['filter'])
@@ -63,7 +59,7 @@ def notify_alarm_to_smo(
                     'name or value. Ignore the filter'.format(
                         sub_data['alarmSubscriptionId'],
                         sub_data['filter']))
-                callback_smo(sub, data, alarm)
+                callback_smo(notifications, sub, data, alarm)
                 continue
             args.append(AlarmEventRecord.alarmEventRecordId == data.id)
             ret = uow.alarm_event_records.list_with_count(*args)
@@ -73,11 +69,11 @@ def notify_alarm_to_smo(
                     'the filter.'
                     .format(data.id, sub_data['alarmSubscriptionId']))
                 continue
-            callback_smo(sub, data, alarm)
+            callback_smo(notifications, sub, data, alarm)
 
 
-def callback_smo(sub: AlarmSubscription, msg: AlarmEvent2SMO,
-                 alarm: AlarmEventRecord):
+def callback_smo(notifications: AbstractNotifications, sub: AlarmSubscription,
+                 msg: AlarmEvent2SMO, alarm: AlarmEventRecord):
     sub_data = sub.serialize()
     alarm_data = alarm.serialize()
     callback = {
@@ -102,32 +98,4 @@ def callback_smo(sub: AlarmSubscription, msg: AlarmEvent2SMO,
     logger.info('callback URL: {}'.format(sub_data['callback']))
     logger.debug('callback data: {}'.format(callback_data))
 
-    o = urlparse(sub_data['callback'])
-    if o.scheme == 'https':
-        conn = get_https_conn_default(o.netloc)
-    else:
-        conn = get_http_conn(o.netloc)
-    try:
-        rst, status = post_data(conn, o.path, callback_data)
-        if rst is True:
-            logger.info(
-                'Notify alarm to SMO successed with status: {}'.format(status))
-            return
-        logger.error('Notify alarm Response code is: {}'.format(status))
-    except ssl.SSLCertVerificationError as e:
-        logger.debug(
-            'Notify alarm try to post data with trusted ca \
-                failed: {}'.format(e))
-        if 'self signed' in str(e):
-            conn = get_https_conn_selfsigned(o.netloc)
-            try:
-                return post_data(conn, o.path, callback_data)
-            except Exception as e:
-                logger.info(
-                    'Notify alarm with self-signed ca failed: {}'.format(e))
-                # TODO: write the status to extension db table.
-                return False
-        return False
-    except Exception as e:
-        logger.critical('Notify alarm except: {}'.format(e))
-        return False
+    return notifications.send(sub_data['callback'], callback_data)
index 34470f3..19363f9 100644 (file)
 # import redis
 # import requests
 import json
-import ssl
-from urllib.parse import urlparse
 
-# from o2common.config import config
+# from o2common.config import conf
 from o2common.domain.filter import gen_orm_filter
 from o2common.service.unit_of_work import AbstractUnitOfWork
-from o2common.service.command.handler import get_https_conn_default
-from o2common.service.command.handler import get_http_conn
-from o2common.service.command.handler import get_https_conn_selfsigned
-from o2common.service.command.handler import post_data
+from o2common.adapter.notifications import AbstractNotifications
 
 from o2ims.domain import commands, ocloud
 from o2ims.domain.subscription_obj import Subscription, Message2SMO, \
@@ -41,20 +36,21 @@ logger = o2logging.get_logger(__name__)
 def notify_change_to_smo(
     cmd: commands.PubMessage2SMO,
     uow: AbstractUnitOfWork,
+    notifications: AbstractNotifications,
 ):
     logger.debug('In notify_change_to_smo')
     msg_type = cmd.type
     if msg_type == 'ResourceType':
-        _notify_resourcetype(uow, cmd.data)
+        _notify_resourcetype(uow, notifications, cmd.data)
     elif msg_type == 'ResourcePool':
-        _notify_resourcepool(uow, cmd.data)
+        _notify_resourcepool(uow, notifications, cmd.data)
     elif msg_type == 'Dms':
-        _notify_dms(uow, cmd.data)
+        _notify_dms(uow, notifications, cmd.data)
     elif msg_type == 'Resource':
-        _notify_resource(uow, cmd.data)
+        _notify_resource(uow, notifications, cmd.data)
 
 
-def _notify_resourcetype(uow, data):
+def _notify_resourcetype(uow, notifications, data):
     with uow:
         resource_type = uow.resource_types.get(data.id)
         if resource_type is None:
@@ -76,7 +72,7 @@ def _notify_resourcetype(uow, data):
             logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
             filters = handle_filter(sub_data['filter'], 'ResourceTypeInfo')
             if not filters:
-                callback_smo(sub, data, resource_type_dict)
+                callback_smo(notifications, sub, data, resource_type_dict)
                 continue
             filter_hit = False
             for filter in filters:
@@ -101,10 +97,10 @@ def _notify_resourcetype(uow, data):
                 logger.info('Subscription {} filter hit, skip ResourceType {}.'
                             .format(sub_data['subscriptionId'], data.id))
             else:
-                callback_smo(sub, data, resource_type_dict)
+                callback_smo(notifications, sub, data, resource_type_dict)
 
 
-def _notify_resourcepool(uow, data):
+def _notify_resourcepool(uow, notifications, data):
     with uow:
         resource_pool = uow.resource_pools.get(data.id)
         if resource_pool is None:
@@ -124,7 +120,7 @@ def _notify_resourcepool(uow, data):
             logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
             filters = handle_filter(sub_data['filter'], 'ResourcePoolInfo')
             if not filters:
-                callback_smo(sub, data, resource_pool_dict)
+                callback_smo(notifications, sub, data, resource_pool_dict)
                 continue
             filter_hit = False
             for filter in filters:
@@ -149,10 +145,10 @@ def _notify_resourcepool(uow, data):
                 logger.info('Subscription {} filter hit, skip ResourcePool {}.'
                             .format(sub_data['subscriptionId'], data.id))
             else:
-                callback_smo(sub, data, resource_pool_dict)
+                callback_smo(notifications, sub, data, resource_pool_dict)
 
 
-def _notify_dms(uow, data):
+def _notify_dms(uow, notifications, data):
     with uow:
         dms = uow.deployment_managers.get(data.id)
         if dms is None:
@@ -174,7 +170,7 @@ def _notify_dms(uow, data):
             filters = handle_filter(
                 sub_data['filter'], 'DeploymentManagerInfo')
             if not filters:
-                callback_smo(sub, data, dms_dict)
+                callback_smo(notifications, sub, data, dms_dict)
                 continue
             filter_hit = False
             for filter in filters:
@@ -201,10 +197,10 @@ def _notify_dms(uow, data):
                             'DeploymentManager {}.'
                             .format(sub_data['subscriptionId'], data.id))
             else:
-                callback_smo(sub, data, dms_dict)
+                callback_smo(notifications, sub, data, dms_dict)
 
 
-def _notify_resource(uow, data):
+def _notify_resource(uow, notifications, data):
     with uow:
         resource = uow.resources.get(data.id)
         if resource is None:
@@ -226,7 +222,7 @@ def _notify_resource(uow, data):
             logger.debug('Subscription: {}'.format(sub_data['subscriptionId']))
             filters = handle_filter(sub_data['filter'], 'ResourceInfo')
             if not filters:
-                callback_smo(sub, data, res_dict)
+                callback_smo(notifications, sub, data, res_dict)
                 continue
             filter_hit = False
             for filter in filters:
@@ -252,7 +248,7 @@ def _notify_resource(uow, data):
                 logger.info('Subscription {} filter hit, skip Resource {}.'
                             .format(sub_data['subscriptionId'], data.id))
             else:
-                callback_smo(sub, data, res_dict)
+                callback_smo(notifications, sub, data, res_dict)
 
 
 def handle_filter(filter: str, f_type: str):
@@ -282,7 +278,8 @@ def handle_filter(filter: str, f_type: str):
     return filters
 
 
-def callback_smo(sub: Subscription, msg: Message2SMO, obj_dict: dict = None):
+def callback_smo(notifications: AbstractNotifications, sub: Subscription,
+                 msg: Message2SMO, obj_dict: dict = None):
     sub_data = sub.serialize()
     callback = {
         'consumerSubscriptionId': sub_data['consumerSubscriptionId'],
@@ -302,33 +299,34 @@ def callback_smo(sub: Subscription, msg: Message2SMO, obj_dict: dict = None):
     logger.info('callback URL: {}'.format(sub_data['callback']))
     logger.debug('callback data: {}'.format(callback_data))
 
+    return notifications.send(sub_data['callback'], callback_data)
+
     # Call SMO through the SMO callback url
-    o = urlparse(sub_data['callback'])
-    if o.scheme == 'https':
-        conn = get_https_conn_default(o.netloc)
-    else:
-        conn = get_http_conn(o.netloc)
-    try:
-        rst, status = post_data(conn, o.path, callback_data)
-        if rst is True:
-            logger.info(
-                'Notify to SMO successed with status: {}'.format(status))
-            return
-        logger.error('Notify Response code is: {}'.format(status))
-    except ssl.SSLCertVerificationError as e:
-        logger.debug(
-            'Notify try to post data with trusted ca failed: {}'.format(e))
-        if 'self signed' in str(e):
-            conn = get_https_conn_selfsigned(o.netloc)
-            try:
-                return post_data(conn, o.path, callback_data)
-            except Exception as e:
-                logger.info(
-                    'Notify post data with self-signed ca \
-                    failed: {}'.format(e))
-                # TODO: write the status to extension db table.
-                return False
-        return False
-    except Exception as e:
-        logger.critical('Notify except: {}'.format(e))
-        return False
+    # o = urlparse(sub_data['callback'])
+    # if o.scheme == 'https':
+    #     conn = get_https_conn_default(o.netloc)
+    # else:
+    #     conn = get_http_conn(o.netloc)
+    # try:
+    #     rst, status = post_data(conn, o.path, callback_data)
+    #     if rst is True:
+    #         logger.info(
+    #             'Notify to SMO successed with status: {}'.format(status))
+    #         return
+    #     logger.error('Notify Response code is: {}'.format(status))
+    # except ssl.SSLCertVerificationError as e:
+    #     logger.debug(
+    #         'Notify try to post data with trusted ca failed: {}'.format(e))
+    #     if 'self signed' in str(e):
+    #         conn = get_https_conn_selfsigned(o.netloc)
+    #         try:
+    #             return post_data(conn, o.path, callback_data)
+    #         except Exception as e:
+    #             logger.info(
+    #                 'Notify post data with self-signed ca \
+    #                 failed: {}'.format(e))
+    #             return False
+    #     return False
+    # except Exception as e:
+    #     logger.critical('Notify except: {}'.format(e))
+    #     return False
index 21cbbf8..8531a2d 100644 (file)
 #  See the License for the specific language governing permissions and
 #  limitations under the License.
 
-import ssl
 import json
-from retry import retry
-from urllib.parse import urlparse
 
 from o2common.config import config, conf
 from o2common.domain.filter import gen_orm_filter
 from o2common.service.unit_of_work import AbstractUnitOfWork
-from o2common.service.command.handler import get_https_conn_default, \
-    get_http_conn, get_https_conn_selfsigned, post_data
+from o2common.adapter.notifications import AbstractNotifications
 
 from o2ims.domain import commands, ocloud as cloud
 from o2ims.domain.subscription_obj import Message2SMO, NotificationEventEnum
@@ -39,6 +35,7 @@ inventory_api_version = config.get_o2ims_inventory_api_v1()
 def registry_to_smo(
     cmd: commands.Register2SMO,
     uow: AbstractUnitOfWork,
+    notifications: AbstractNotifications,
 ):
     logger.debug('In registry_to_smo')
     data = cmd.data
@@ -60,7 +57,7 @@ def registry_to_smo(
             'serviceUri': ocloud.serviceUri
         }
         if data.notificationEventType == NotificationEventEnum.CREATE:
-            register_smo(uow, ocloud_dict)
+            register_smo(notifications, ocloud_dict)
         elif data.notificationEventType in [NotificationEventEnum.MODIFY,
                                             NotificationEventEnum.DELETE]:
             _notify_ocloud(uow, data, ocloud_dict)
@@ -71,11 +68,11 @@ class RegIMSToSMOExp(Exception):
         self.value = value
 
 
-def register_smo(uow, ocloud_data):
-    call_res, status = call_smo(ocloud_data)
+def register_smo(notifications, ocloud_data):
+    call_res = call_smo(notifications, ocloud_data)
     logger.debug('Call SMO response is {}'.format(call_res))
     if call_res is True:
-        logger.info('Register to smo success response is {}'.format(status))
+        logger.info('Register to smo success response')
     else:
         raise RegIMSToSMOExp('Register o2ims to SMO failed')
     # TODO: record the result for the smo register
@@ -121,8 +118,7 @@ def _notify_ocloud(uow, data, ocloud_dict):
             callback_smo(sub, msg, ocloud_dict)
 
 
-@retry((ConnectionRefusedError), tries=2, delay=2)
-def call_smo(reg_data: dict):
+def call_smo(notifications: AbstractNotifications, reg_data: dict):
     smo_token = conf.DEFAULT.smo_token_data
     smo_token_info = {
         'iss': 'o2ims',
@@ -141,27 +137,4 @@ def call_smo(reg_data: dict):
     })
     logger.info('callback URL: {}'.format(conf.DEFAULT.smo_register_url))
     logger.debug('callback data: {}'.format(callback_data))
-    o = urlparse(conf.DEFAULT.smo_register_url)
-    if o.scheme == 'https':
-        conn = get_https_conn_default(o.netloc)
-    else:
-        conn = get_http_conn(o.netloc)
-
-    try:
-        return post_data(conn, o.path, callback_data)
-    except ssl.SSLCertVerificationError as e:
-        logger.debug('Try to register to smo with \
-        trusted ca failed: {}'.format(e))
-        if 'self signed' in str(e):
-            conn = get_https_conn_selfsigned(o.netloc)
-            try:
-                return post_data(conn, o.path, callback_data)
-            except Exception as e:
-                logger.info(
-                    'Register to smo with self-signed ca failed: {}'.format(e))
-                # TODO: write the status to extension db table.
-                return False, None
-        return False, None
-    except Exception as e:
-        logger.critical('Register to smo except: {}'.format(e))
-        return False, None
+    return notifications.send(conf.DEFAULT.smo_register_url, callback_data)
index 60705b9..f68a4f3 100644 (file)
@@ -12,6 +12,7 @@ httplib2
 babel
 PrettyTable<0.8,>=0.7.2
 
+requests-oauthlib
 retry
 ruamel.yaml==0.17.17
 
diff --git a/tests/integration/test_common.py b/tests/integration/test_common.py
new file mode 100644 (file)
index 0000000..210b2d1
--- /dev/null
@@ -0,0 +1,91 @@
+# Copyright (C) 2021-2023 Wind River Systems, Inc.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+import json
+from urllib.parse import urlparse, urlunparse
+
+from o2common.service.command.handler import SMOClient
+
+
+def test_smo_with_oauth2():
+    # Replace these with actual values
+    client_id = 'client_id'
+    token_url = 'http://128.224.115.32:1080/mock_smo/v1/auth/token'
+    username = 'admin'
+    password = 'admin'
+    url = 'http://128.224.115.32:1080/mock_smo/v1/ocloud_observer'
+    data = {"key": "value"}
+
+    client = SMOClient(client_id=client_id, token_url=token_url,
+                       username=username, password=password,
+                       use_oauth=True)
+
+    # Fetch the token
+    client.fetch_token(client.session.verify)
+
+    # Make a POST request
+    response = client.post(url=url, data=json.dumps(data))
+
+    # Check the status code
+    assert response is True
+
+    # Check the response data if you expect any
+    # response_data = json.loads(response.text)
+    # assert response_data == expected_data
+
+    # --------------- HTTPS ---------------- #
+    parsed_token_url = urlparse(token_url)
+    parsed_token_url = parsed_token_url._replace(scheme='https')
+    token_url1 = urlunparse(parsed_token_url)
+
+    parsed_url = urlparse(url)
+    parsed_url = parsed_url._replace(scheme='https')
+    url1 = urlunparse(parsed_url)
+
+    client = SMOClient(client_id=client_id, token_url=token_url1,
+                       username=username, password=password,
+                       use_oauth=True)
+
+    # Fetch the token
+    client.fetch_token(client.session.verify)
+
+    # Make a POST request
+    response = client.post(url=url1, data=json.dumps(data))
+
+    # Check the status code
+    assert response is True
+
+
+def test_smo_client():
+    url = 'http://128.224.115.32:1080/mock_smo/v1/o2ims_inventory_observer'
+    data = {"key": "value"}
+
+    client = SMOClient()
+
+    # Make a POST request
+    response = client.post(url=url, data=json.dumps(data))
+    # Check the status code
+    assert response is True
+
+    # Check the response data if you expect any
+    # response_data = json.loads(response.text)
+    # assert response_data == expected_data
+
+    parsed_url = urlparse(url)
+    parsed_url = parsed_url._replace(scheme='https')
+    url1 = urlunparse(parsed_url)
+
+    # Make a POST request
+    response = client.post(url=url1, data=json.dumps(data))
+    # Check the status code
+    assert response is True