2 * ========================LICENSE_START=================================
4 * Copyright (C) 2023 Nordix Foundation
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ========================LICENSE_END===================================
20 package org.oran.pmproducer.oauth2;
22 import java.io.IOException;
25 import javax.security.auth.callback.Callback;
26 import javax.security.auth.callback.UnsupportedCallbackException;
27 import javax.security.auth.login.AppConfigurationEntry;
29 import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
30 import org.apache.kafka.common.security.auth.SaslExtensions;
31 import org.apache.kafka.common.security.auth.SaslExtensionsCallback;
32 import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
33 import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
34 import org.oran.pmproducer.exceptions.ServiceException;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
38 public class OAuthKafkaAuthenticateLoginCallbackHandler implements AuthenticateCallbackHandler {
39 private final Logger logger = LoggerFactory.getLogger(OAuthKafkaAuthenticateLoginCallbackHandler.class);
41 private boolean isConfigured = false;
44 public void configure(Map<String, ?> map, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
46 if (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism))
47 throw new IllegalArgumentException(String.format("Unexpected SASL mechanism: %s", saslMechanism));
48 if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == null)
49 throw new IllegalArgumentException(
50 String.format("Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)",
51 jaasConfigEntries.size()));
57 /*This method intentionally left empty.
58 Close functionality will be implemented later.*/
62 public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
64 if (!this.isConfigured)
65 throw new IllegalStateException("Callback handler not configured");
66 for (Callback callback : callbacks) {
67 logger.debug("callback {}", callback);
68 if (callback instanceof OAuthBearerTokenCallback oAuthBearerTokenCallback) {
69 handleCallback(oAuthBearerTokenCallback);
70 } else if (callback instanceof SaslExtensionsCallback saslExtensionsCallback) {
71 handleCallback(saslExtensionsCallback);
73 logger.error("Unsupported callback: {}", callback);
74 throw new UnsupportedCallbackException(callback);
79 private void handleCallback(SaslExtensionsCallback callback) {
80 callback.extensions(SaslExtensions.empty());
83 private void handleCallback(OAuthBearerTokenCallback callback) {
85 if (callback.token() != null) {
86 throw new ServiceException("Callback had a token already", null);
89 String accessToken = SecurityContext.getInstance().getBearerAuthToken();
90 OAuthBearerTokenJwt token = OAuthBearerTokenJwt.create(accessToken);
92 callback.token(token);
93 } catch (Exception e) {
94 logger.error("Could not handle login callback: {}", e.getMessage());
98 public boolean isConfigured() {