# See the License for the specific language governing permissions and
# limitations under the License.
+import json
+from flask_restx._http import HTTPStatus
from werkzeug.wrappers import Request, Response
-from o2common.helper import o2logging
+
from o2common.authmw.authprov import auth_definer
+from o2common.authmw.exceptions import AuthRequiredExp
+from o2common.authmw.exceptions import AuthFailureExp
+from o2common.helper import o2logging
logger = o2logging.get_logger(__name__)
-class AuthRequiredExp(Exception):
- def __init__(self, value):
- self.value = value
-
- def dictize(self):
- return {
- 'WWW-Authenticate': '{}'.format(self.value)}
-
-
-class AuthFailureExp(Exception):
- def __init__(self, value):
- self.value = value
-
- def dictize(self):
- return {
- 'WWW-Authenticate': '{}'.format(self.value)}
+class AuthProblemDetails():
+ def __init__(self, code: int, detail: str, path: str,
+ title=None, instance=None
+ ) -> None:
+ self.status = code
+ self.detail = detail
+ self.type = path
+ self.title = title if title is not None else self.getTitle(code)
+ self.instance = instance if instance is not None else []
+
+ def getTitle(self, code):
+ return HTTPStatus(code).phrase
+
+ def serialize(self):
+ details = {}
+ for key in dir(self):
+ if key == 'ns' or key.startswith('__') or \
+ callable(getattr(self, key)):
+ continue
+ else:
+ details[key] = getattr(self, key)
+ return json.dumps(details, indent=True)
-def _response_wrapper(environ, start_response, header):
+def _response_wrapper(environ, start_response, header, detail):
res = Response(headers=header,
- mimetype='text/plain', status=401)
+ mimetype='application/json', status=401, response=detail)
return res(environ, start_response)
-def _internal_err_response_wrapper(environ, start_response):
- res = Response(mimetype='text/plain', status=500)
+def _internal_err_response_wrapper(environ, start_response, detail):
+ res = Response(mimetype='application/json', status=500, response=detail)
return res(environ, start_response)
class authmiddleware():
-
'''
Auth WSGI middleware
'''
self.app = app
def __call__(self, environ, start_response):
- logger.info(__name__ + 'authentication middleware')
+ logger.debug(__name__ + 'authentication middleware')
req = Request(environ, populate_request=True, shallow=True)
+ auth_token = None
try:
- auth_header = req.headers['Authorization']
-
+ auth_header = req.headers.get('Authorization', None)
if auth_header:
auth_token = auth_header.split(" ")[1]
# invoke underlying auth mdw to make k8s/keystone api
ret = ad.authenticate(auth_token)
if ret is True:
- logger.info(
+ logger.debug(
"auth success with oauth token: " + auth_token)
try:
return self.app(environ, start_response)
logger.error(
'Internal exception happend \
ed {}'.format(str(ex)), exc_info=True)
+ prb = AuthProblemDetails(
+ 500, 'Internal error.', req.path)
return \
- _internal_err_response_wrapper(environ,
- start_response)
+ _internal_err_response_wrapper(
+ environ,
+ start_response, prb.serialize())
else:
raise AuthFailureExp(
'Bearer realm="Authentication Failed"')
else:
raise AuthRequiredExp('Bearer realm="Authentication Required"')
except AuthRequiredExp as ex:
- return _response_wrapper(environ, start_response, ex.dictize())
+ prb = AuthProblemDetails(401, ex.value, req.path)
+ return _response_wrapper(environ, start_response,
+ ex.dictize(), prb.serialize())
except AuthFailureExp as ex:
- return _response_wrapper(environ, start_response, ex.dictize())
+ prb = AuthProblemDetails(401, ex.value, req.path)
+ return _response_wrapper(environ, start_response,
+ ex.dictize(), prb.serialize())
except Exception as ex:
- logger.error('Internal exception happended {}'.format(
- str(ex)), exc_info=True)
- return _internal_err_response_wrapper(environ, start_response)
+ if auth_token:
+ logger.error('Internal exception happended {}'.format(
+ str(ex)), exc_info=True)
+ prb = AuthProblemDetails(500, 'Internal error.', req.path)
+ return \
+ _internal_err_response_wrapper(
+ environ, start_response, prb.serialize())
+ else:
+ logger.debug('Auth token missing or not obtained.')
+ ex = AuthRequiredExp('Bearer realm="Authentication Required"')
+ prb = AuthProblemDetails(401, ex.value, req.path)
+ return _response_wrapper(environ, start_response,
+ ex.dictize(), prb.serialize())