Added support for using oauth token for Kafka
[nonrtric/plt/ranpm.git] / datafilecollector / src / main / java / org / oran / datafile / oauth2 / OAuthKafkaAuthenticateLoginCallbackHandler.java
diff --git a/datafilecollector/src/main/java/org/oran/datafile/oauth2/OAuthKafkaAuthenticateLoginCallbackHandler.java b/datafilecollector/src/main/java/org/oran/datafile/oauth2/OAuthKafkaAuthenticateLoginCallbackHandler.java
new file mode 100644 (file)
index 0000000..54911dc
--- /dev/null
@@ -0,0 +1,93 @@
+// ============LICENSE_START===============================================
+// Copyright (C) 2023 Nordix Foundation. All rights reserved.
+// ========================================================================
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ============LICENSE_END=================================================
+//
+
+package org.oran.datafile.oauth2;
+
+import java.io.IOException;
+import java.util.*;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.auth.SaslExtensions;
+import org.apache.kafka.common.security.auth.SaslExtensionsCallback;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
+import org.oran.datafile.exceptions.DatafileTaskException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OAuthKafkaAuthenticateLoginCallbackHandler implements AuthenticateCallbackHandler {
+    private final Logger logger = LoggerFactory.getLogger(OAuthKafkaAuthenticateLoginCallbackHandler.class);
+
+    private boolean isConfigured = false;
+
+    @Override
+    public void configure(Map<String, ?> map, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
+
+        if (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism))
+            throw new IllegalArgumentException(String.format("Unexpected SASL mechanism: %s", saslMechanism));
+        if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == null)
+            throw new IllegalArgumentException(String.format(
+                "Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)", jaasConfigEntries.size()));
+        isConfigured = true;
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+
+        if (!this.isConfigured)
+            throw new IllegalStateException("Callback handler not configured");
+        for (Callback callback : callbacks) {
+            logger.debug("callback " + callback.toString());
+            if (callback instanceof OAuthBearerTokenCallback) {
+                handleCallback((OAuthBearerTokenCallback) callback);
+            } else if (callback instanceof SaslExtensionsCallback) {
+                handleCallback((SaslExtensionsCallback) callback);
+            } else {
+                logger.error("Unsupported callback: {}", callback);
+                throw new UnsupportedCallbackException(callback);
+            }
+        }
+    }
+
+    private void handleCallback(SaslExtensionsCallback callback) {
+        callback.extensions(SaslExtensions.empty());
+    }
+
+    private void handleCallback(OAuthBearerTokenCallback callback) {
+        try {
+            if (callback.token() != null) {
+                throw new DatafileTaskException("Callback had a token already");
+            }
+
+            String accessToken = SecurityContext.getInstance().getBearerAuthToken();
+            OAuthBearerTokenJwt token = OAuthBearerTokenJwt.create(accessToken);
+
+            callback.token(token);
+        } catch (Exception e) {
+            logger.error("Could not handle login callback: {}", e.getMessage());
+        }
+    }
+
+}