import os
import requests
-import json
import http.client
import ssl
from requests_oauthlib import OAuth2Session
for _ in range(retries):
try:
- response = self.session.post(
- url, data=json.dumps(data))
+ response = self.session.post(url, json=data)
response.raise_for_status()
return self.handle_post_data(response)
except (SSLError, RequestException) as e:
'perceivedSeverity': alarm_data['perceivedSeverity'],
'extensions': json.loads(alarm_data['extensions'])
}
- # logger.warning(callback)
- callback_data = json.dumps(callback)
logger.info('callback URL: {}'.format(sub_data['callback']))
- logger.debug('callback data: {}'.format(callback_data))
+ logger.debug('callback data: {}'.format(json.dumps(callback)))
- return notifications.send(sub_data['callback'], callback_data)
+ return notifications.send(sub_data['callback'], callback)
callback['postObjectState'] = json.dumps(obj_dict)
if msg.notificationEventType == NotificationEventEnum.DELETE:
callback.pop('objectRef')
- callback_data = json.dumps(callback)
logger.info('callback URL: {}'.format(sub_data['callback']))
- logger.debug('callback data: {}'.format(callback_data))
+ logger.debug('callback data: {}'.format(json.dumps(callback)))
- return notifications.send(sub_data['callback'], callback_data)
+ return notifications.send(sub_data['callback'], callback)
# Call SMO through the SMO callback url
# o = urlparse(sub_data['callback'])
'smo_token_algo': 'RS256'
}
- callback_data = json.dumps({
+ callback = {
'globalCloudId': reg_data['globalCloudId'],
'oCloudId': reg_data['oCloudId'],
'IMS_EP': config.get_api_url(),
'smo_token_data': smo_token_info
- })
+ }
logger.info('callback URL: {}'.format(conf.DEFAULT.smo_register_url))
- logger.debug('callback data: {}'.format(callback_data))
- return notifications.send(conf.DEFAULT.smo_register_url, callback_data)
+ logger.debug('callback data: {}'.format(json.dumps(callback)))
+ return notifications.send(conf.DEFAULT.smo_register_url, callback)