Added support for using oauth token for Kafka
[nonrtric/plt/ranpm.git] / influxlogger / src / main / java / org / oran / pmlog / oauth2 / OAuthKafkaAuthenticateLoginCallbackHandler.java
1 //  ============LICENSE_START===============================================
2 //  Copyright (C) 2023 Nordix Foundation. All rights reserved.
3 //  ========================================================================
4 //  Licensed under the Apache License, Version 2.0 (the "License");
5 //  you may not use this file except in compliance with the License.
6 //  You may obtain a copy of the License at
7 //
8 //       http://www.apache.org/licenses/LICENSE-2.0
9 //
10 //  Unless required by applicable law or agreed to in writing, software
11 //  distributed under the License is distributed on an "AS IS" BASIS,
12 //  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 //  See the License for the specific language governing permissions and
14 //  limitations under the License.
15 //  ============LICENSE_END=================================================
16 //
17
18 package org.oran.pmlog.oauth2;
19
20 import java.io.IOException;
21 import java.util.*;
22
23 import javax.security.auth.callback.Callback;
24 import javax.security.auth.callback.UnsupportedCallbackException;
25 import javax.security.auth.login.AppConfigurationEntry;
26
27 import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
28 import org.apache.kafka.common.security.auth.SaslExtensions;
29 import org.apache.kafka.common.security.auth.SaslExtensionsCallback;
30 import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
31 import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
32 import org.oran.pmlog.exceptions.ServiceException;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35
36 public class OAuthKafkaAuthenticateLoginCallbackHandler implements AuthenticateCallbackHandler {
37     private final Logger logger = LoggerFactory.getLogger(OAuthKafkaAuthenticateLoginCallbackHandler.class);
38
39     private boolean isConfigured = false;
40
41     @Override
42     public void configure(Map<String, ?> map, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
43
44         if (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism))
45             throw new IllegalArgumentException(String.format("Unexpected SASL mechanism: %s", saslMechanism));
46         if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == null)
47             throw new IllegalArgumentException(
48                     String.format("Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)",
49                             jaasConfigEntries.size()));
50         isConfigured = true;
51     }
52
53     @Override
54     public void close() {}
55
56     @Override
57     public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
58
59         if (!this.isConfigured)
60             throw new IllegalStateException("Callback handler not configured");
61         for (Callback callback : callbacks) {
62             logger.debug("callback " + callback.toString());
63             if (callback instanceof OAuthBearerTokenCallback) {
64                 handleCallback((OAuthBearerTokenCallback) callback);
65             } else if (callback instanceof SaslExtensionsCallback) {
66                 handleCallback((SaslExtensionsCallback) callback);
67             } else {
68                 logger.error("Unsupported callback: {}", callback);
69                 throw new UnsupportedCallbackException(callback);
70             }
71         }
72     }
73
74     private void handleCallback(SaslExtensionsCallback callback) {
75         callback.extensions(SaslExtensions.empty());
76     }
77
78     private void handleCallback(OAuthBearerTokenCallback callback) {
79         try {
80             if (callback.token() != null) {
81                 throw new ServiceException("Callback had a token already");
82             }
83
84             String accessToken = SecurityContext.getInstance().getBearerAuthToken();
85             OAuthBearerTokenJwt token = OAuthBearerTokenJwt.create(accessToken);
86
87             callback.token(token);
88         } catch (Exception e) {
89             logger.error("Could not handle login callback: {}", e.getMessage());
90         }
91     }
92
93 }