Merge "Added support for using oauth token for Kafka"
authorPatrik Buhr <patrik.buhr@est.tech>
Thu, 23 Mar 2023 13:00:32 +0000 (13:00 +0000)
committerGerrit Code Review <gerrit@o-ran-sc.org>
Thu, 23 Mar 2023 13:00:32 +0000 (13:00 +0000)
30 files changed:
influxlogger/config/application.yaml
influxlogger/src/main/java/org/oran/pmlog/ConsumerRegstrationTask.java
influxlogger/src/main/java/org/oran/pmlog/KafkaTopicListener.java
influxlogger/src/main/java/org/oran/pmlog/clients/AsyncRestClient.java
influxlogger/src/main/java/org/oran/pmlog/clients/AsyncRestClientFactory.java
influxlogger/src/main/java/org/oran/pmlog/configuration/ApplicationConfig.java
influxlogger/src/main/java/org/oran/pmlog/exceptions/ServiceException.java
influxlogger/src/main/java/org/oran/pmlog/oauth2/OAuthBearerTokenJwt.java [new file with mode: 0644]
influxlogger/src/main/java/org/oran/pmlog/oauth2/OAuthKafkaAuthenticateLoginCallbackHandler.java [new file with mode: 0644]
influxlogger/src/main/java/org/oran/pmlog/oauth2/SecurityContext.java [moved from pmproducer/src/main/java/org/oran/pmproducer/clients/SecurityContext.java with 90% similarity]
influxlogger/src/test/java/org/oran/pmlog/ApplicationTest.java
influxlogger/src/test/java/org/oran/pmlog/Integration.java
influxlogger/src/test/resources/jwtToken.b64 [new file with mode: 0644]
pmproducer/config/application.yaml
pmproducer/src/main/java/org/oran/pmproducer/Application.java
pmproducer/src/main/java/org/oran/pmproducer/clients/AsyncRestClient.java
pmproducer/src/main/java/org/oran/pmproducer/clients/AsyncRestClientFactory.java
pmproducer/src/main/java/org/oran/pmproducer/configuration/ApplicationConfig.java
pmproducer/src/main/java/org/oran/pmproducer/exceptions/ServiceException.java
pmproducer/src/main/java/org/oran/pmproducer/oauth2/OAuthBearerTokenJwt.java [new file with mode: 0644]
pmproducer/src/main/java/org/oran/pmproducer/oauth2/OAuthKafkaAuthenticateLoginCallbackHandler.java [new file with mode: 0644]
pmproducer/src/main/java/org/oran/pmproducer/oauth2/SecurityContext.java [moved from influxlogger/src/main/java/org/oran/pmlog/clients/SecurityContext.java with 85% similarity]
pmproducer/src/main/java/org/oran/pmproducer/repository/Jobs.java
pmproducer/src/main/java/org/oran/pmproducer/tasks/JobDataDistributor.java
pmproducer/src/main/java/org/oran/pmproducer/tasks/ProducerRegstrationTask.java
pmproducer/src/main/java/org/oran/pmproducer/tasks/TopicListener.java
pmproducer/src/test/java/org/oran/pmproducer/ApplicationTest.java
pmproducer/src/test/java/org/oran/pmproducer/IntegrationWithIcs.java
pmproducer/src/test/java/org/oran/pmproducer/IntegrationWithKafka.java
pmproducer/src/test/resources/jwtToken.b64 [new file with mode: 0644]

index 8c82f93..9da01ab 100644 (file)
@@ -84,6 +84,15 @@ app:
     max-poll-records: 500
     group-id: kafkaGroupId
     client-id: kafkaClientId
+    # Configues if oath2 tokens shall be used. If set to true, auth-token-file must also be configured
+    use-oath-token: false
+    ssl:
+      key-store-type: PEM
+      key-store-location:
+      # key password is needed if the private key is encrypted
+      key-store-password:
+      trust-store-type: PEM
+      trust-store-location:
   influx:
     url: http://localhost:8086
     access-token: xmrt1YobMTl-Nx-a8iiO6fC8xJc5BvKZLSU8U18VfAYza4N0YHTFrLy15W4Ss2bxXhgX95qagxsBJ0GCBSFveQ==
@@ -95,5 +104,7 @@ app:
     database: pm_data
   ics-base-url: https://localhost:8434
   consumer-job-id: "pmlog"
+  # If the file name is empty, no authorization token is used
+  auth-token-file:
 
 
index 573b2c3..be16a20 100644 (file)
@@ -26,8 +26,8 @@ import lombok.Getter;
 
 import org.oran.pmlog.clients.AsyncRestClient;
 import org.oran.pmlog.clients.AsyncRestClientFactory;
-import org.oran.pmlog.clients.SecurityContext;
 import org.oran.pmlog.configuration.ApplicationConfig;
+import org.oran.pmlog.oauth2.SecurityContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
index 390ccae..1617f1d 100644 (file)
@@ -89,6 +89,7 @@ public class KafkaTopicListener {
         consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
 
         consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId + "_" + applicationConfig.getKafkaGroupId());
+        this.applicationConfig.addKafkaSecurityProps(consumerProps);
 
         return ReceiverOptions.<byte[], byte[]>create(consumerProps)
                 .subscription(Collections.singleton(this.applicationConfig.getKafkaInputTopic()));
index 106b68e..43961e8 100644 (file)
@@ -29,6 +29,7 @@ import java.lang.invoke.MethodHandles;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.oran.pmlog.configuration.WebClientConfig.HttpProxyConfig;
+import org.oran.pmlog.oauth2.SecurityContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.http.MediaType;
index b082188..a1d1c12 100644 (file)
@@ -43,6 +43,7 @@ import javax.net.ssl.KeyManagerFactory;
 
 import org.oran.pmlog.configuration.WebClientConfig;
 import org.oran.pmlog.configuration.WebClientConfig.HttpProxyConfig;
+import org.oran.pmlog.oauth2.SecurityContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.ResourceUtils;
index 84c5083..fc613c5 100644 (file)
@@ -23,10 +23,16 @@ package org.oran.pmlog.configuration;
 import java.lang.invoke.MethodHandles;
 import java.nio.charset.Charset;
 import java.nio.file.Files;
+import java.util.Map;
 
 import lombok.Getter;
 import lombok.ToString;
 
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.oran.pmlog.oauth2.OAuthKafkaAuthenticateLoginCallbackHandler;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Value;
@@ -61,10 +67,10 @@ public class ApplicationConfig {
     @Value("${app.webclient.trust-store}")
     private String sslTrustStore = "";
 
-    @Value("${app.webclient.http.proxy-host:}")
+    @Value("${app.webclient.http.proxy-host}")
     private String httpProxyHost = "";
 
-    @Value("${app.webclient.http.proxy-port:0}")
+    @Value("${app.webclient.http.proxy-port}")
     private int httpProxyPort = 0;
 
     @Getter
@@ -102,7 +108,7 @@ public class ApplicationConfig {
     private String icsBaseUrl;
 
     @Getter
-    @Value("${app.consumer-job-id:shouldHaveBeenDefinedInYaml}")
+    @Value("${app.consumer-job-id}")
     private String consumerJobId;
 
     @Getter
@@ -125,6 +131,24 @@ public class ApplicationConfig {
     @Value("${app.influx.org}")
     private String influxOrg;
 
+    @Value("${app.kafka.ssl.key-store-type}")
+    private String kafkaKeyStoreType;
+
+    @Value("${app.kafka.ssl.key-store-location}")
+    private String kafkaKeyStoreLocation;
+
+    @Value("${app.kafka.ssl.key-store-password}")
+    private String kafkaKeyStorePassword;
+
+    @Value("${app.kafka.ssl.trust-store-type}")
+    private String kafkaTrustStoreType;
+
+    @Value("${app.kafka.ssl.trust-store-location}")
+    private String kafkTrustStoreLocation;
+
+    @Value("${app.kafka.use-oath-token}")
+    private boolean useOathToken;
+
     private WebClientConfig webClientConfig = null;
 
     public WebClientConfig getWebClientConfig() {
@@ -148,6 +172,32 @@ public class ApplicationConfig {
         return this.webClientConfig;
     }
 
+    public void addKafkaSecurityProps(Map<String, Object> props) {
+
+        if (useOathToken) {
+            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
+            props.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
+            props.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS,
+                    OAuthKafkaAuthenticateLoginCallbackHandler.class.getName());
+            props.put(SaslConfigs.SASL_JAAS_CONFIG,
+                    "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub=\"alice\"; ");
+        }
+        if (!kafkaKeyStoreLocation.isEmpty()) {
+            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name);
+            // SSL
+            props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, kafkaKeyStoreType);
+            props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, kafkaKeyStoreLocation);
+            if (!kafkaKeyStorePassword.isEmpty()) {
+                props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaKeyStorePassword);
+            }
+            if (!kafkTrustStoreLocation.isEmpty()) {
+                props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, kafkaTrustStoreType);
+                props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkTrustStoreLocation);
+            }
+        }
+
+    }
+
     public String getConsumerJobInfo() {
 
         try {
index f05d955..f7e4999 100644 (file)
@@ -36,4 +36,8 @@ public class ServiceException extends Exception {
         this.httpStatus = httpStatus;
     }
 
+    public ServiceException(String message) {
+        this(message, HttpStatus.I_AM_A_TEAPOT);
+    }
+
 }
diff --git a/influxlogger/src/main/java/org/oran/pmlog/oauth2/OAuthBearerTokenJwt.java b/influxlogger/src/main/java/org/oran/pmlog/oauth2/OAuthBearerTokenJwt.java
new file mode 100644 (file)
index 0000000..841b8c6
--- /dev/null
@@ -0,0 +1,101 @@
+//  ============LICENSE_START===============================================
+//  Copyright (C) 2023 Nordix Foundation. All rights reserved.
+//  ========================================================================
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//       http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+//  ============LICENSE_END=================================================
+//
+
+package org.oran.pmlog.oauth2;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+
+import java.util.Base64;
+import java.util.HashSet;
+import java.util.Set;
+
+import lombok.ToString;
+
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.oran.pmlog.exceptions.ServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OAuthBearerTokenJwt implements OAuthBearerToken {
+    private static final Logger logger = LoggerFactory.getLogger(OAuthBearerTokenJwt.class);
+    private static final com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
+
+    private final String jwtTokenRaw;
+    private final JwtTokenBody tokenBody;
+
+    @ToString
+    private static class JwtTokenBody {
+        String sub = ""; // principalName
+        long exp = 0; // expirationTime
+        long iat = 0; // startTime
+        String scope = "";
+    }
+
+    public static OAuthBearerTokenJwt create(String tokenRaw)
+            throws ServiceException, JsonMappingException, JsonProcessingException {
+        String[] chunks = tokenRaw.split("\\.");
+        Base64.Decoder decoder = Base64.getUrlDecoder();
+        if (chunks.length < 2) {
+            throw new ServiceException("Could not parse JWT token: " + tokenRaw);
+
+        }
+        String payloadStr = new String(decoder.decode(chunks[1]));
+        JwtTokenBody token = gson.fromJson(payloadStr, JwtTokenBody.class);
+        logger.error("Token: {}", token);
+        return new OAuthBearerTokenJwt(token, tokenRaw);
+    }
+
+    private OAuthBearerTokenJwt(JwtTokenBody jwtTokenBody, String accessToken) {
+        super();
+        this.jwtTokenRaw = accessToken;
+        this.tokenBody = jwtTokenBody;
+    }
+
+    @Override
+    public String value() {
+        return jwtTokenRaw;
+    }
+
+    @Override
+    public Set<String> scope() {
+        Set<String> res = new HashSet<>();
+        if (!this.tokenBody.scope.isEmpty()) {
+            res.add(this.tokenBody.scope);
+        }
+        return res;
+    }
+
+    @Override
+    public long lifetimeMs() {
+        if (this.tokenBody.exp == 0) {
+            return Long.MAX_VALUE;
+        }
+        return this.tokenBody.exp * 1000;
+    }
+
+    @Override
+    public String principalName() {
+        return this.tokenBody.sub;
+    }
+
+    @Override
+    public Long startTimeMs() {
+        return this.tokenBody.iat;
+    }
+
+}
diff --git a/influxlogger/src/main/java/org/oran/pmlog/oauth2/OAuthKafkaAuthenticateLoginCallbackHandler.java b/influxlogger/src/main/java/org/oran/pmlog/oauth2/OAuthKafkaAuthenticateLoginCallbackHandler.java
new file mode 100644 (file)
index 0000000..b48f222
--- /dev/null
@@ -0,0 +1,93 @@
+//  ============LICENSE_START===============================================
+//  Copyright (C) 2023 Nordix Foundation. All rights reserved.
+//  ========================================================================
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//       http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+//  ============LICENSE_END=================================================
+//
+
+package org.oran.pmlog.oauth2;
+
+import java.io.IOException;
+import java.util.*;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.auth.SaslExtensions;
+import org.apache.kafka.common.security.auth.SaslExtensionsCallback;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
+import org.oran.pmlog.exceptions.ServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OAuthKafkaAuthenticateLoginCallbackHandler implements AuthenticateCallbackHandler {
+    private final Logger logger = LoggerFactory.getLogger(OAuthKafkaAuthenticateLoginCallbackHandler.class);
+
+    private boolean isConfigured = false;
+
+    @Override
+    public void configure(Map<String, ?> map, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
+
+        if (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism))
+            throw new IllegalArgumentException(String.format("Unexpected SASL mechanism: %s", saslMechanism));
+        if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == null)
+            throw new IllegalArgumentException(
+                    String.format("Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)",
+                            jaasConfigEntries.size()));
+        isConfigured = true;
+    }
+
+    @Override
+    public void close() {}
+
+    @Override
+    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+
+        if (!this.isConfigured)
+            throw new IllegalStateException("Callback handler not configured");
+        for (Callback callback : callbacks) {
+            logger.debug("callback " + callback.toString());
+            if (callback instanceof OAuthBearerTokenCallback) {
+                handleCallback((OAuthBearerTokenCallback) callback);
+            } else if (callback instanceof SaslExtensionsCallback) {
+                handleCallback((SaslExtensionsCallback) callback);
+            } else {
+                logger.error("Unsupported callback: {}", callback);
+                throw new UnsupportedCallbackException(callback);
+            }
+        }
+    }
+
+    private void handleCallback(SaslExtensionsCallback callback) {
+        callback.extensions(SaslExtensions.empty());
+    }
+
+    private void handleCallback(OAuthBearerTokenCallback callback) {
+        try {
+            if (callback.token() != null) {
+                throw new ServiceException("Callback had a token already");
+            }
+
+            String accessToken = SecurityContext.getInstance().getBearerAuthToken();
+            OAuthBearerTokenJwt token = OAuthBearerTokenJwt.create(accessToken);
+
+            callback.token(token);
+        } catch (Exception e) {
+            logger.error("Could not handle login callback: {}", e.getMessage());
+        }
+    }
+
+}
  * ========================LICENSE_END===================================
  */
 
-package org.oran.pmproducer.clients;
+package org.oran.pmlog.oauth2;
 
 import java.lang.invoke.MethodHandles;
 import java.nio.file.Files;
 import java.nio.file.Path;
 
+import lombok.Getter;
 import lombok.Setter;
 
 import org.slf4j.Logger;
@@ -44,10 +45,14 @@ public class SecurityContext {
 
     private String authToken = "";
 
+    @Getter
+    private static SecurityContext instance;
+
     @Setter
     private Path authTokenFilePath;
 
     public SecurityContext(@Value("${app.auth-token-file:}") String authTokenFilename) {
+        instance = this;
         if (!authTokenFilename.isEmpty()) {
             this.authTokenFilePath = Path.of(authTokenFilename);
         }
@@ -59,12 +64,14 @@ public class SecurityContext {
 
     public synchronized String getBearerAuthToken() {
         if (!isConfigured()) {
+            logger.warn("No configuration for auth token");
             return "";
         }
         try {
             long lastModified = authTokenFilePath.toFile().lastModified();
             if (tokenTimestamp == 0 || lastModified != this.tokenTimestamp) {
                 this.authToken = Files.readString(authTokenFilePath);
+                this.authToken = this.authToken.trim();
                 this.tokenTimestamp = lastModified;
             }
         } catch (Exception e) {
index da12e0a..46de821 100644 (file)
@@ -43,11 +43,11 @@ import org.junit.jupiter.api.TestMethodOrder;
 import org.mockito.Mockito;
 import org.oran.pmlog.clients.AsyncRestClient;
 import org.oran.pmlog.clients.AsyncRestClientFactory;
-import org.oran.pmlog.clients.SecurityContext;
 import org.oran.pmlog.configuration.ApplicationConfig;
 import org.oran.pmlog.configuration.ConsumerJobInfo;
 import org.oran.pmlog.configuration.WebClientConfig;
 import org.oran.pmlog.configuration.WebClientConfig.HttpProxyConfig;
+import org.oran.pmlog.oauth2.SecurityContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
index 840298d..1fad28d 100644 (file)
@@ -58,7 +58,9 @@ import reactor.kafka.sender.SenderRecord;
         "server.ssl.key-store=./config/keystore.jks", //
         "app.webclient.trust-store=./config/truststore.jks", //
         "app.configuration-filepath=./src/test/resources/test_application_configuration.json", //
-        "app.pm-files-path=./src/test/resources/" //
+        "app.pm-files-path=./src/test/resources/", //
+        "app.auth-token-file=src/test/resources/jwtToken.b64", //
+        "app.kafka.use-oath-token=false" //
 }) //
 class Integration {
 
@@ -114,6 +116,7 @@ class Integration {
         props.put(ProducerConfig.ACKS_CONFIG, "all");
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+        this.applicationConfig.addKafkaSecurityProps(props);
         return SenderOptions.create(props);
     }
 
diff --git a/influxlogger/src/test/resources/jwtToken.b64 b/influxlogger/src/test/resources/jwtToken.b64
new file mode 100644 (file)
index 0000000..b19a883
--- /dev/null
@@ -0,0 +1 @@
+eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
index 27885f1..0e611b5 100644 (file)
@@ -80,12 +80,22 @@ app:
 
   # The url used to adress this component. This is used as a callback url sent to other components.
   pm-producer-base-url: https://localhost:8435
-  # KAFKA boostrap servers. This is only needed if there are Information Types that uses a kafkaInputTopic
-  # several redundant boostrap servers can be specified, separated by a comma ','.
+
   kafka:
+    # KAFKA boostrap servers.
+    # Several redundant boostrap servers can be specified, separated by a comma ','.
     bootstrap-servers: localhost:9092
     # The maximum number of records returned in a single call to poll() (default 100)
     max-poll-records: 500
+    # Configues if oath2 tokens shall be used. If set to true, auth-token-file must also be configured
+    use-oath-token: false
+    ssl:
+      key-store-type: PEM
+      key-store-location:
+      # key password is needed if the private key is encrypted
+      key-store-password:
+      trust-store-type: PEM
+      trust-store-location:
   # If the file name is empty, no authorization token is used
   auth-token-file:
   pm-files-path: /tmp
index e92bd47..0ffb73f 100644 (file)
@@ -37,7 +37,6 @@ public class Application {
 
     private static final Logger logger = LoggerFactory.getLogger(Application.class);
 
-    private long configFileLastModification = 0;
     private static ConfigurableApplicationContext applicationContext;
 
     public static void main(String[] args) {
@@ -67,4 +66,5 @@ public class Application {
         thread.setDaemon(false);
         thread.start();
     }
+
 }
index ac76436..1fc0620 100644 (file)
@@ -29,6 +29,7 @@ import java.lang.invoke.MethodHandles;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.oran.pmproducer.configuration.WebClientConfig.HttpProxyConfig;
+import org.oran.pmproducer.oauth2.SecurityContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.http.MediaType;
index 783da3a..7d50fa0 100644 (file)
@@ -43,6 +43,7 @@ import javax.net.ssl.KeyManagerFactory;
 
 import org.oran.pmproducer.configuration.WebClientConfig;
 import org.oran.pmproducer.configuration.WebClientConfig.HttpProxyConfig;
+import org.oran.pmproducer.oauth2.SecurityContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.util.ResourceUtils;
index 18fcd8f..a70243b 100644 (file)
@@ -26,12 +26,18 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.Map;
 
 import lombok.Getter;
 import lombok.Setter;
 import lombok.ToString;
 
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.oran.pmproducer.configuration.WebClientConfig.HttpProxyConfig;
+import org.oran.pmproducer.oauth2.OAuthKafkaAuthenticateLoginCallbackHandler;
 import org.oran.pmproducer.repository.InfoType;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -69,10 +75,10 @@ public class ApplicationConfig {
     @Value("${app.webclient.trust-store}")
     private String sslTrustStore = "";
 
-    @Value("${app.webclient.http.proxy-host:}")
+    @Value("${app.webclient.http.proxy-host}")
     private String httpProxyHost = "";
 
-    @Value("${app.webclient.http.proxy-port:0}")
+    @Value("${app.webclient.http.proxy-port}")
     private int httpProxyPort = 0;
 
     @Getter
@@ -97,7 +103,7 @@ public class ApplicationConfig {
     private int kafkaMaxPollRecords;
 
     @Getter
-    @Value("${app.pm-files-path:}")
+    @Value("${app.pm-files-path}")
     private String pmFilesPath;
 
     @Getter
@@ -122,9 +128,27 @@ public class ApplicationConfig {
 
     @Getter
     @Setter
-    @Value("${app.zip-output:}")
+    @Value("${app.zip-output}")
     private boolean zipOutput;
 
+    @Value("${app.kafka.ssl.key-store-type}")
+    private String kafkaKeyStoreType;
+
+    @Value("${app.kafka.ssl.key-store-location}")
+    private String kafkaKeyStoreLocation;
+
+    @Value("${app.kafka.ssl.key-store-password}")
+    private String kafkaKeyStorePassword;
+
+    @Value("${app.kafka.ssl.trust-store-type}")
+    private String kafkaTrustStoreType;
+
+    @Value("${app.kafka.ssl.trust-store-location}")
+    private String kafkTrustStoreLocation;
+
+    @Value("${app.kafka.use-oath-token}")
+    private boolean useOathToken;
+
     private WebClientConfig webClientConfig = null;
 
     public WebClientConfig getWebClientConfig() {
@@ -167,7 +191,31 @@ public class ApplicationConfig {
             logger.error("Could not load configuration file {}", getLocalConfigurationFilePath());
             return Collections.emptyList();
         }
+    }
+
+    public void addKafkaSecurityProps(Map<String, Object> props) {
 
+        if (useOathToken) {
+            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
+            props.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
+            props.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS,
+                    OAuthKafkaAuthenticateLoginCallbackHandler.class.getName());
+            props.put(SaslConfigs.SASL_JAAS_CONFIG,
+                    "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub=\"alice\"; ");
+        }
+        if (!kafkaKeyStoreLocation.isEmpty()) {
+            props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name);
+            // SSL
+            props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, kafkaKeyStoreType);
+            props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, kafkaKeyStoreLocation);
+            if (!kafkaKeyStorePassword.isEmpty()) {
+                props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaKeyStorePassword);
+            }
+            if (!kafkTrustStoreLocation.isEmpty()) {
+                props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, kafkaTrustStoreType);
+                props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkTrustStoreLocation);
+            }
+        }
     }
 
 }
index 55e2a48..5a8fa36 100644 (file)
@@ -36,4 +36,9 @@ public class ServiceException extends Exception {
         this.httpStatus = httpStatus;
     }
 
+    public ServiceException(String message) {
+        super(message);
+        this.httpStatus = HttpStatus.BAD_REQUEST;
+    }
+
 }
diff --git a/pmproducer/src/main/java/org/oran/pmproducer/oauth2/OAuthBearerTokenJwt.java b/pmproducer/src/main/java/org/oran/pmproducer/oauth2/OAuthBearerTokenJwt.java
new file mode 100644 (file)
index 0000000..7ee2053
--- /dev/null
@@ -0,0 +1,101 @@
+//  ============LICENSE_START===============================================
+//  Copyright (C) 2023 Nordix Foundation. All rights reserved.
+//  ========================================================================
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//       http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+//  ============LICENSE_END=================================================
+//
+
+package org.oran.pmproducer.oauth2;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+
+import java.util.Base64;
+import java.util.HashSet;
+import java.util.Set;
+
+import lombok.ToString;
+
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.oran.pmproducer.exceptions.ServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OAuthBearerTokenJwt implements OAuthBearerToken {
+    private static final Logger logger = LoggerFactory.getLogger(OAuthBearerTokenJwt.class);
+    private static final com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
+
+    private final String jwtTokenRaw;
+    private final JwtTokenBody tokenBody;
+
+    @ToString
+    private static class JwtTokenBody {
+        String sub = ""; // principalName
+        long exp = 0; // expirationTime
+        long iat = 0; // startTime
+        String scope = "";
+    }
+
+    public static OAuthBearerTokenJwt create(String tokenRaw)
+            throws ServiceException, JsonMappingException, JsonProcessingException {
+        String[] chunks = tokenRaw.split("\\.");
+        Base64.Decoder decoder = Base64.getUrlDecoder();
+        if (chunks.length < 2) {
+            throw new ServiceException("Could not parse JWT token: " + tokenRaw);
+
+        }
+        String payloadStr = new String(decoder.decode(chunks[1]));
+        JwtTokenBody token = gson.fromJson(payloadStr, JwtTokenBody.class);
+        logger.error("Token: {}", token);
+        return new OAuthBearerTokenJwt(token, tokenRaw);
+    }
+
+    private OAuthBearerTokenJwt(JwtTokenBody jwtTokenBody, String accessToken) {
+        super();
+        this.jwtTokenRaw = accessToken;
+        this.tokenBody = jwtTokenBody;
+    }
+
+    @Override
+    public String value() {
+        return jwtTokenRaw;
+    }
+
+    @Override
+    public Set<String> scope() {
+        Set<String> res = new HashSet<>();
+        if (!this.tokenBody.scope.isEmpty()) {
+            res.add(this.tokenBody.scope);
+        }
+        return res;
+    }
+
+    @Override
+    public long lifetimeMs() {
+        if (this.tokenBody.exp == 0) {
+            return Long.MAX_VALUE;
+        }
+        return this.tokenBody.exp * 1000;
+    }
+
+    @Override
+    public String principalName() {
+        return this.tokenBody.sub;
+    }
+
+    @Override
+    public Long startTimeMs() {
+        return this.tokenBody.iat;
+    }
+
+}
diff --git a/pmproducer/src/main/java/org/oran/pmproducer/oauth2/OAuthKafkaAuthenticateLoginCallbackHandler.java b/pmproducer/src/main/java/org/oran/pmproducer/oauth2/OAuthKafkaAuthenticateLoginCallbackHandler.java
new file mode 100644 (file)
index 0000000..b209da3
--- /dev/null
@@ -0,0 +1,93 @@
+//  ============LICENSE_START===============================================
+//  Copyright (C) 2023 Nordix Foundation. All rights reserved.
+//  ========================================================================
+//  Licensed under the Apache License, Version 2.0 (the "License");
+//  you may not use this file except in compliance with the License.
+//  You may obtain a copy of the License at
+//
+//       http://www.apache.org/licenses/LICENSE-2.0
+//
+//  Unless required by applicable law or agreed to in writing, software
+//  distributed under the License is distributed on an "AS IS" BASIS,
+//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//  See the License for the specific language governing permissions and
+//  limitations under the License.
+//  ============LICENSE_END=================================================
+//
+
+package org.oran.pmproducer.oauth2;
+
+import java.io.IOException;
+import java.util.*;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.auth.SaslExtensions;
+import org.apache.kafka.common.security.auth.SaslExtensionsCallback;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
+import org.oran.pmproducer.exceptions.ServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OAuthKafkaAuthenticateLoginCallbackHandler implements AuthenticateCallbackHandler {
+    private final Logger logger = LoggerFactory.getLogger(OAuthKafkaAuthenticateLoginCallbackHandler.class);
+
+    private boolean isConfigured = false;
+
+    @Override
+    public void configure(Map<String, ?> map, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
+
+        if (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism))
+            throw new IllegalArgumentException(String.format("Unexpected SASL mechanism: %s", saslMechanism));
+        if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == null)
+            throw new IllegalArgumentException(
+                    String.format("Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)",
+                            jaasConfigEntries.size()));
+        isConfigured = true;
+    }
+
+    @Override
+    public void close() {}
+
+    @Override
+    public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+
+        if (!this.isConfigured)
+            throw new IllegalStateException("Callback handler not configured");
+        for (Callback callback : callbacks) {
+            logger.debug("callback " + callback.toString());
+            if (callback instanceof OAuthBearerTokenCallback) {
+                handleCallback((OAuthBearerTokenCallback) callback);
+            } else if (callback instanceof SaslExtensionsCallback) {
+                handleCallback((SaslExtensionsCallback) callback);
+            } else {
+                logger.error("Unsupported callback: {}", callback);
+                throw new UnsupportedCallbackException(callback);
+            }
+        }
+    }
+
+    private void handleCallback(SaslExtensionsCallback callback) {
+        callback.extensions(SaslExtensions.empty());
+    }
+
+    private void handleCallback(OAuthBearerTokenCallback callback) {
+        try {
+            if (callback.token() != null) {
+                throw new ServiceException("Callback had a token already", null);
+            }
+
+            String accessToken = SecurityContext.getInstance().getBearerAuthToken();
+            OAuthBearerTokenJwt token = OAuthBearerTokenJwt.create(accessToken);
+
+            callback.token(token);
+        } catch (Exception e) {
+            logger.error("Could not handle login callback: {}", e.getMessage());
+        }
+    }
+
+}
@@ -2,7 +2,7 @@
  * ========================LICENSE_START=================================
  * O-RAN-SC
  * %%
- * Copyright (C) 2022 Nordix Foundation
+ * Copyright (C) 2023 Nordix Foundation
  * %%
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * ========================LICENSE_END===================================
  */
 
-package org.oran.pmlog.clients;
+package org.oran.pmproducer.oauth2;
 
 import java.lang.invoke.MethodHandles;
 import java.nio.file.Files;
 import java.nio.file.Path;
 
+import lombok.Getter;
 import lombok.Setter;
 
 import org.slf4j.Logger;
@@ -44,10 +45,14 @@ public class SecurityContext {
 
     private String authToken = "";
 
+    @Getter
+    private static SecurityContext instance;
+
     @Setter
     private Path authTokenFilePath;
 
-    public SecurityContext(@Value("${app.auth-token-file:}") String authTokenFilename) {
+    public SecurityContext(@Value("${app.auth-token-file}") String authTokenFilename) {
+        instance = this;
         if (!authTokenFilename.isEmpty()) {
             this.authTokenFilePath = Path.of(authTokenFilename);
         }
@@ -59,12 +64,14 @@ public class SecurityContext {
 
     public synchronized String getBearerAuthToken() {
         if (!isConfigured()) {
+            logger.warn("No configuration for auth token");
             return "";
         }
         try {
             long lastModified = authTokenFilePath.toFile().lastModified();
             if (tokenTimestamp == 0 || lastModified != this.tokenTimestamp) {
                 this.authToken = Files.readString(authTokenFilePath);
+                this.authToken = this.authToken.trim();
                 this.tokenTimestamp = lastModified;
             }
         } catch (Exception e) {
index 5b257bf..2b48b62 100644 (file)
@@ -29,13 +29,12 @@ import java.util.Vector;
 
 import lombok.Getter;
 
-import org.oran.pmproducer.clients.AsyncRestClientFactory;
-import org.oran.pmproducer.clients.SecurityContext;
 import org.oran.pmproducer.configuration.ApplicationConfig;
 import org.oran.pmproducer.exceptions.ServiceException;
 import org.oran.pmproducer.filter.FilterFactory;
 import org.oran.pmproducer.filter.FilteredData;
 import org.oran.pmproducer.filter.PmReportFilter;
+import org.oran.pmproducer.oauth2.SecurityContext;
 import org.oran.pmproducer.repository.Job.Parameters;
 import org.oran.pmproducer.repository.Job.Parameters.KafkaDeliveryInfo;
 import org.oran.pmproducer.tasks.TopicListener.DataFromTopic;
@@ -117,13 +116,11 @@ public class Jobs {
     private Map<String, Job> allJobs = new HashMap<>();
     private MultiMap<Job> jobsByType = new MultiMap<>();
     private Map<String, JobGroup> jobGroups = new HashMap<>(); // Key is Topic
-    private final AsyncRestClientFactory restclientFactory;
     private final List<Observer> observers = new ArrayList<>();
     private final ApplicationConfig appConfig;
 
     public Jobs(@Autowired ApplicationConfig applicationConfig, @Autowired SecurityContext securityContext,
             @Autowired ApplicationConfig appConfig) {
-        restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig(), securityContext);
         this.appConfig = appConfig;
     }
 
index d9f6632..f2d7b53 100644 (file)
@@ -93,6 +93,7 @@ public class JobDataDistributor {
 
         SenderOptions<byte[], byte[]> senderOptions = senderOptions(applConfig, jobGroup.getDeliveryInfo());
         this.sender = KafkaSender.create(senderOptions);
+
     }
 
     public void start(Flux<TopicListener.DataFromTopic> input) {
@@ -150,6 +151,9 @@ public class JobDataDistributor {
         props.put(ProducerConfig.ACKS_CONFIG, "all");
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+
+        config.addKafkaSecurityProps(props);
+
         return SenderOptions.create(props);
     }
 
index 5b10241..d4b29be 100644 (file)
@@ -32,10 +32,10 @@ import lombok.Getter;
 
 import org.oran.pmproducer.clients.AsyncRestClient;
 import org.oran.pmproducer.clients.AsyncRestClientFactory;
-import org.oran.pmproducer.clients.SecurityContext;
 import org.oran.pmproducer.configuration.ApplicationConfig;
 import org.oran.pmproducer.controllers.ProducerCallbacksController;
 import org.oran.pmproducer.exceptions.ServiceException;
+import org.oran.pmproducer.oauth2.SecurityContext;
 import org.oran.pmproducer.r1.ConsumerJobInfo;
 import org.oran.pmproducer.r1.ProducerInfoTypeInfo;
 import org.oran.pmproducer.r1.ProducerRegistrationInfo;
index 351fcc6..3dd2475 100644 (file)
@@ -155,21 +155,22 @@ public class TopicListener {
     }
 
     private ReceiverOptions<byte[], byte[]> kafkaInputProperties(String clientId) {
-        Map<String, Object> consumerProps = new HashMap<>();
+        Map<String, Object> props = new HashMap<>();
         if (this.applicationConfig.getKafkaBootStrapServers().isEmpty()) {
             logger.error("No kafka boostrap server is setup");
         }
 
-        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
-        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers());
-        consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
-        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
-        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
-        consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers());
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+        props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId + "_" + kafkaGroupId);
 
-        consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId + "_" + kafkaGroupId);
+        this.applicationConfig.addKafkaSecurityProps(props);
 
-        return ReceiverOptions.<byte[], byte[]>create(consumerProps)
+        return ReceiverOptions.<byte[], byte[]>create(props)
                 .subscription(Collections.singleton(this.type.getKafkaInputTopic()));
     }
 
index dd38ffc..04e8eed 100644 (file)
@@ -53,7 +53,6 @@ import org.junit.jupiter.api.TestMethodOrder;
 import org.mockito.ArgumentCaptor;
 import org.oran.pmproducer.clients.AsyncRestClient;
 import org.oran.pmproducer.clients.AsyncRestClientFactory;
-import org.oran.pmproducer.clients.SecurityContext;
 import org.oran.pmproducer.configuration.ApplicationConfig;
 import org.oran.pmproducer.configuration.WebClientConfig;
 import org.oran.pmproducer.configuration.WebClientConfig.HttpProxyConfig;
@@ -64,6 +63,7 @@ import org.oran.pmproducer.filter.FilteredData;
 import org.oran.pmproducer.filter.PmReport;
 import org.oran.pmproducer.filter.PmReportFilter;
 import org.oran.pmproducer.filter.PmReportFilter.FilterData;
+import org.oran.pmproducer.oauth2.SecurityContext;
 import org.oran.pmproducer.r1.ConsumerJobInfo;
 import org.oran.pmproducer.r1.ProducerJobInfo;
 import org.oran.pmproducer.repository.InfoType;
@@ -235,11 +235,6 @@ class ApplicationTest {
         return "https://localhost:" + this.applicationConfig.getLocalServerHttpPort();
     }
 
-    private String quote(String str) {
-        final String q = "\"";
-        return q + str.replace(q, "\\\"") + q;
-    }
-
     private Object toJson(String json) {
         try {
             return JsonParser.parseString(json).getAsJsonObject();
index 8ce5547..de79b60 100644 (file)
@@ -30,11 +30,11 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.oran.pmproducer.clients.AsyncRestClient;
 import org.oran.pmproducer.clients.AsyncRestClientFactory;
-import org.oran.pmproducer.clients.SecurityContext;
 import org.oran.pmproducer.configuration.ApplicationConfig;
 import org.oran.pmproducer.configuration.WebClientConfig;
 import org.oran.pmproducer.configuration.WebClientConfig.HttpProxyConfig;
 import org.oran.pmproducer.filter.PmReportFilter;
+import org.oran.pmproducer.oauth2.SecurityContext;
 import org.oran.pmproducer.r1.ConsumerJobInfo;
 import org.oran.pmproducer.repository.Jobs;
 import org.oran.pmproducer.tasks.ProducerRegstrationTask;
index 39363f8..70922d8 100644 (file)
@@ -44,7 +44,6 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.oran.pmproducer.clients.AsyncRestClient;
 import org.oran.pmproducer.clients.AsyncRestClientFactory;
-import org.oran.pmproducer.clients.SecurityContext;
 import org.oran.pmproducer.configuration.ApplicationConfig;
 import org.oran.pmproducer.configuration.WebClientConfig;
 import org.oran.pmproducer.configuration.WebClientConfig.HttpProxyConfig;
@@ -52,6 +51,7 @@ import org.oran.pmproducer.controllers.ProducerCallbacksController;
 import org.oran.pmproducer.controllers.ProducerCallbacksController.StatisticsCollection;
 import org.oran.pmproducer.datastore.DataStore;
 import org.oran.pmproducer.filter.PmReportFilter;
+import org.oran.pmproducer.oauth2.SecurityContext;
 import org.oran.pmproducer.r1.ConsumerJobInfo;
 import org.oran.pmproducer.repository.InfoType;
 import org.oran.pmproducer.repository.InfoTypes;
@@ -87,8 +87,9 @@ import reactor.kafka.sender.SenderRecord;
         "app.pm-files-path=./src/test/resources/", //
         "app.s3.locksBucket=ropfilelocks", //
         "app.pm-files-path=/tmp/dmaapadaptor", //
-        "app.s3.bucket=dmaaptest" //
-}) //
+        "app.s3.bucket=dmaaptest", //
+        "app.auth-token-file=src/test/resources/jwtToken.b64", //
+        "app.kafka.use-oath-token=false"}) //
 class IntegrationWithKafka {
 
     final static String PM_TYPE_ID = "PmDataOverKafka";
@@ -321,6 +322,10 @@ class IntegrationWithKafka {
         props.put(ProducerConfig.ACKS_CONFIG, "all");
         props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
         props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+
+        // Security
+        this.applicationConfig.addKafkaSecurityProps(props);
+
         return SenderOptions.create(props);
     }
 
diff --git a/pmproducer/src/test/resources/jwtToken.b64 b/pmproducer/src/test/resources/jwtToken.b64
new file mode 100644 (file)
index 0000000..b19a883
--- /dev/null
@@ -0,0 +1 @@
+eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c