--- /dev/null
+# Copyright (C) 2022 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
--- /dev/null
+# Copyright (C) 2022 Wind River Systems, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import http.client
+import ssl
+from o2common.helper import o2logging
+from o2common.config import config
+
+logger = o2logging.get_logger(__name__)
+
+
+def post_data(conn, path, data):
+ headers = {'Content-type': 'application/json'}
+ conn.request('POST', path, data, headers)
+ resp = conn.getresponse()
+ data = resp.read().decode('utf-8')
+ # json_data = json.loads(data)
+ if resp.status >= 200 and resp.status <= 299:
+ logger.info('Post data to SMO successed, response code {} {}, data {}'.
+ format(resp.status, resp.reason, data))
+ return True, resp.status
+ logger.error('Response code is: {}'.format(resp.status))
+ return False, resp.status
+
+
+def get_http_conn(callbackurl):
+ conn = http.client.HTTPConnection(callbackurl)
+ return conn
+
+
+# with default CA
+def get_https_conn_default(callbackurl):
+ sslctx = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
+ sslctx.check_hostname = True
+ sslctx.verify_mode = ssl.CERT_REQUIRED
+ sslctx.load_default_certs()
+ conn = http.client.HTTPSConnection(callbackurl, context=sslctx)
+ return conn
+
+
+# with self signed ca
+def get_https_conn_selfsigned(callbackurl):
+ sslctx = ssl.create_default_context(
+ purpose=ssl.Purpose.SERVER_AUTH)
+ smo_ca_path = config.get_smo_ca_config_path()
+ sslctx.load_verify_locations(smo_ca_path)
+ sslctx.check_hostname = False
+ sslctx.verify_mode = ssl.CERT_REQUIRED
+ conn = http.client.HTTPSConnection(callbackurl, context=sslctx)
+ return conn
-# Copyright (C) 2021 Wind River Systems, Inc.
+# Copyright (C) 2021-2022 Wind River Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
import json
# import redis
# import requests
-import http.client
from urllib.parse import urlparse
# from o2common.config import config
from o2common.service.unit_of_work import AbstractUnitOfWork
from o2ims.domain import commands
from o2ims.domain.alarm_obj import AlarmSubscription, AlarmEvent2SMO
-
+import ssl
+from o2common.service.command.handler import get_https_conn_default
+from o2common.service.command.handler import get_http_conn
+from o2common.service.command.handler import get_https_conn_selfsigned
+from o2common.service.command.handler import post_data
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
logger.info('URL: {}, data: {}'.format(
sub_data['callback'], callback_data))
o = urlparse(sub_data['callback'])
- conn = http.client.HTTPConnection(o.netloc)
- headers = {'Content-type': 'application/json'}
- conn.request('POST', o.path, callback_data, headers)
- resp = conn.getresponse()
- data = resp.read().decode('utf-8')
- # json_data = json.loads(data)
- if resp.status == 202 or resp.status == 200:
- logger.info('Notify to SMO successed, response code {} {}, data {}'.
- format(resp.status, resp.reason, data))
- return
- logger.error('Response code is: {}'.format(resp.status))
+ if o.scheme == 'https':
+ conn = get_https_conn_default(o.netloc)
+ else:
+ conn = get_http_conn(o.netloc)
+ try:
+ rst, status = post_data(conn, o.path, callback_data)
+ if rst is True:
+ logger.info(
+ 'Notify alarm to SMO successed with status: {}'.format(status))
+ return
+ logger.error('Notify alarm Response code is: {}'.format(status))
+ except ssl.SSLCertVerificationError as e:
+ logger.info(
+ 'Notify alarm post data with trusted ca failed: {}'.format(e))
+ if 'self signed' in str(e):
+ conn = get_https_conn_selfsigned(o.netloc)
+ try:
+ return post_data(conn, o.path, callback_data)
+ except Exception as e:
+ logger.info(
+ 'Notify alarm with self-signed ca failed: {}'.format(e))
+ # TODO: write the status to extension db table.
+ return False
+ return False
+ except Exception as e:
+ logger.critical('Notify alarm except: {}'.format(e))
+ return False
-# Copyright (C) 2021 Wind River Systems, Inc.
+# Copyright (C) 2021-2022 Wind River Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
import json
# import redis
# import requests
-import http.client
from urllib.parse import urlparse
# from o2common.config import config
from o2common.service.unit_of_work import AbstractUnitOfWork
from o2ims.domain import commands
from o2ims.domain.subscription_obj import Subscription, Message2SMO
-
+from o2common.service.command.handler import get_https_conn_default
+from o2common.service.command.handler import get_http_conn
+from o2common.service.command.handler import get_https_conn_selfsigned
+from o2common.service.command.handler import post_data
+import ssl
from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
+
# # Maybe another MQ server
# r = redis.Redis(**config.get_redis_host_and_port())
# except requests.exceptions.HTTPError as err:
# logger.error('request smo error: {}'.format(err))
o = urlparse(sub_data['callback'])
- conn = http.client.HTTPConnection(o.netloc)
- headers = {'Content-type': 'application/json'}
- conn.request('POST', o.path, callback_data, headers)
- resp = conn.getresponse()
- data = resp.read().decode('utf-8')
- # json_data = json.loads(data)
- if resp.status == 202 or resp.status == 200:
- logger.info('Notify to SMO successed, response code {} {}, data {}'.
- format(resp.status, resp.reason, data))
- return
- logger.error('Response code is: {}'.format(resp.status))
+ if o.scheme == 'https':
+ conn = get_https_conn_default(o.netloc)
+ else:
+ conn = get_http_conn(o.netloc)
+ try:
+ rst, status = post_data(conn, o.path, callback_data)
+ if rst is True:
+ logger.info(
+ 'Notify to SMO successed with status: {}'.format(status))
+ return
+ logger.error('Notify Response code is: {}'.format(status))
+ except ssl.SSLCertVerificationError as e:
+ logger.info('Notify post data with trusted ca failed: {}'.format(e))
+ if 'self signed' in str(e):
+ conn = get_https_conn_selfsigned(o.netloc)
+ try:
+ return post_data(conn, o.path, callback_data)
+ except Exception as e:
+ logger.info(
+ 'Notify post data with self-signed ca \
+ failed: {}'.format(e))
+ # TODO: write the status to extension db table.
+ return False
+ return False
+ except Exception as e:
+ logger.critical('Notify except: {}'.format(e))
+ return False
-# Copyright (C) 2021 Wind River Systems, Inc.
+# Copyright (C) 2021-2022 Wind River Systems, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
import json
# import asyncio
# import requests
-import http.client
-import ssl
+
from urllib.parse import urlparse
from retry import retry
from o2common.service.unit_of_work import AbstractUnitOfWork
from o2common.config import config, conf
-
+from o2common.service.command.handler import get_https_conn_default
+from o2common.service.command.handler import get_http_conn
+from o2common.service.command.handler import get_https_conn_selfsigned
+from o2common.service.command.handler import post_data
+import ssl
from o2ims.domain import commands
from o2ims.domain.subscription_obj import NotificationEventEnum
def register_smo(uow, ocloud_data):
- call_res = call_smo(ocloud_data)
+ call_res, status = call_smo(ocloud_data)
logger.debug('Call SMO response is {}'.format(call_res))
- if call_res is not True:
+ if call_res is True:
+ logger.info('Register to smo success response is {}'.format(status))
+ else:
raise RegIMSToSMOExp('Register o2ims to SMO failed')
# TODO: record the result for the smo register
conf.DEFAULT.smo_register_url, callback_data))
o = urlparse(conf.DEFAULT.smo_register_url)
if o.scheme == 'https':
- sslctx = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
- sslctx.check_hostname = True
- sslctx.verify_mode = ssl.CERT_REQUIRED
- sslctx.load_default_certs()
- conn = http.client.HTTPSConnection(o.netloc, context=sslctx)
+ conn = get_https_conn_default(o.netloc)
else:
- conn = http.client.HTTPConnection(o.netloc)
+ conn = get_http_conn(o.netloc)
try:
return post_data(conn, o.path, callback_data)
except ssl.SSLCertVerificationError as e:
- logger.info('post data except: {}'.format(e))
+ logger.info('Register to smo with trusted ca failed: {}'.format(e))
if 'self signed' in str(e):
- sslctx = ssl.create_default_context(
- purpose=ssl.Purpose.SERVER_AUTH)
- smo_ca_path = config.get_smo_ca_config_path()
- sslctx.load_verify_locations(smo_ca_path)
- sslctx.check_hostname = False
- sslctx.verify_mode = ssl.CERT_REQUIRED
- conn = http.client.HTTPSConnection(o.netloc, context=sslctx)
- return post_data(conn, o.path, callback_data)
+ conn = get_https_conn_selfsigned(o.netloc)
+ try:
+ return post_data(conn, o.path, callback_data)
+ except Exception as e:
+ logger.info(
+ 'Register to smo with self-signed ca failed: {}'.format(e))
+ # TODO: write the status to extension db table.
+ return False, None
+ return False, None
except Exception as e:
- logger.info('except: {}'.format(e))
- return False
-
-
-def post_data(conn, path, data):
- headers = {'Content-type': 'application/json'}
- conn.request('POST', path, data, headers)
- resp = conn.getresponse()
- data = resp.read().decode('utf-8')
- # json_data = json.loads(data)
- if resp.status == 202 or resp.status == 200:
- logger.info('Registrer to SMO successed, response code {} {}, data {}'.
- format(resp.status, resp.reason, data))
- return True
- logger.error('Response code is: {}'.format(resp.status))
- return False
+ logger.critical('Register to smo except: {}'.format(e))
+ return False, None