From 298969556b0f84de745a67e994a590d8b2a3de13 Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Tue, 21 Mar 2023 10:16:11 +0100 Subject: [PATCH] Added support for using oauth token for Kafka in pmproducer and in influxlogger OAUTHBEARER is supported as SASL mechanism. This can be done over SSL or plain text. Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-853 Change-Id: I96afd34457da1a7115d417e7a11cc3072d284e00 --- influxlogger/config/application.yaml | 11 +++ .../org/oran/pmlog/ConsumerRegstrationTask.java | 2 +- .../java/org/oran/pmlog/KafkaTopicListener.java | 1 + .../org/oran/pmlog/clients/AsyncRestClient.java | 1 + .../oran/pmlog/clients/AsyncRestClientFactory.java | 1 + .../pmlog/configuration/ApplicationConfig.java | 56 +++++++++++- .../oran/pmlog/exceptions/ServiceException.java | 4 + .../org/oran/pmlog/oauth2/OAuthBearerTokenJwt.java | 101 +++++++++++++++++++++ ...OAuthKafkaAuthenticateLoginCallbackHandler.java | 93 +++++++++++++++++++ .../org/oran/pmlog/oauth2}/SecurityContext.java | 9 +- .../test/java/org/oran/pmlog/ApplicationTest.java | 2 +- .../src/test/java/org/oran/pmlog/Integration.java | 5 +- influxlogger/src/test/resources/jwtToken.b64 | 1 + pmproducer/config/application.yaml | 14 ++- .../main/java/org/oran/pmproducer/Application.java | 2 +- .../oran/pmproducer/clients/AsyncRestClient.java | 1 + .../pmproducer/clients/AsyncRestClientFactory.java | 1 + .../configuration/ApplicationConfig.java | 56 +++++++++++- .../pmproducer/exceptions/ServiceException.java | 5 + .../pmproducer/oauth2/OAuthBearerTokenJwt.java | 101 +++++++++++++++++++++ ...OAuthKafkaAuthenticateLoginCallbackHandler.java | 93 +++++++++++++++++++ .../oran/pmproducer/oauth2}/SecurityContext.java | 13 ++- .../java/org/oran/pmproducer/repository/Jobs.java | 5 +- .../oran/pmproducer/tasks/JobDataDistributor.java | 4 + .../pmproducer/tasks/ProducerRegstrationTask.java | 2 +- .../org/oran/pmproducer/tasks/TopicListener.java | 19 ++-- .../java/org/oran/pmproducer/ApplicationTest.java | 7 +- .../org/oran/pmproducer/IntegrationWithIcs.java | 2 +- .../org/oran/pmproducer/IntegrationWithKafka.java | 11 ++- pmproducer/src/test/resources/jwtToken.b64 | 1 + 30 files changed, 583 insertions(+), 41 deletions(-) create mode 100644 influxlogger/src/main/java/org/oran/pmlog/oauth2/OAuthBearerTokenJwt.java create mode 100644 influxlogger/src/main/java/org/oran/pmlog/oauth2/OAuthKafkaAuthenticateLoginCallbackHandler.java rename {pmproducer/src/main/java/org/oran/pmproducer/clients => influxlogger/src/main/java/org/oran/pmlog/oauth2}/SecurityContext.java (90%) create mode 100644 influxlogger/src/test/resources/jwtToken.b64 create mode 100644 pmproducer/src/main/java/org/oran/pmproducer/oauth2/OAuthBearerTokenJwt.java create mode 100644 pmproducer/src/main/java/org/oran/pmproducer/oauth2/OAuthKafkaAuthenticateLoginCallbackHandler.java rename {influxlogger/src/main/java/org/oran/pmlog/clients => pmproducer/src/main/java/org/oran/pmproducer/oauth2}/SecurityContext.java (85%) create mode 100644 pmproducer/src/test/resources/jwtToken.b64 diff --git a/influxlogger/config/application.yaml b/influxlogger/config/application.yaml index 8c82f93..9da01ab 100644 --- a/influxlogger/config/application.yaml +++ b/influxlogger/config/application.yaml @@ -84,6 +84,15 @@ app: max-poll-records: 500 group-id: kafkaGroupId client-id: kafkaClientId + # Configues if oath2 tokens shall be used. If set to true, auth-token-file must also be configured + use-oath-token: false + ssl: + key-store-type: PEM + key-store-location: + # key password is needed if the private key is encrypted + key-store-password: + trust-store-type: PEM + trust-store-location: influx: url: http://localhost:8086 access-token: xmrt1YobMTl-Nx-a8iiO6fC8xJc5BvKZLSU8U18VfAYza4N0YHTFrLy15W4Ss2bxXhgX95qagxsBJ0GCBSFveQ== @@ -95,5 +104,7 @@ app: database: pm_data ics-base-url: https://localhost:8434 consumer-job-id: "pmlog" + # If the file name is empty, no authorization token is used + auth-token-file: diff --git a/influxlogger/src/main/java/org/oran/pmlog/ConsumerRegstrationTask.java b/influxlogger/src/main/java/org/oran/pmlog/ConsumerRegstrationTask.java index 573b2c3..be16a20 100644 --- a/influxlogger/src/main/java/org/oran/pmlog/ConsumerRegstrationTask.java +++ b/influxlogger/src/main/java/org/oran/pmlog/ConsumerRegstrationTask.java @@ -26,8 +26,8 @@ import lombok.Getter; import org.oran.pmlog.clients.AsyncRestClient; import org.oran.pmlog.clients.AsyncRestClientFactory; -import org.oran.pmlog.clients.SecurityContext; import org.oran.pmlog.configuration.ApplicationConfig; +import org.oran.pmlog.oauth2.SecurityContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; diff --git a/influxlogger/src/main/java/org/oran/pmlog/KafkaTopicListener.java b/influxlogger/src/main/java/org/oran/pmlog/KafkaTopicListener.java index 390ccae..1617f1d 100644 --- a/influxlogger/src/main/java/org/oran/pmlog/KafkaTopicListener.java +++ b/influxlogger/src/main/java/org/oran/pmlog/KafkaTopicListener.java @@ -89,6 +89,7 @@ public class KafkaTopicListener { consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId + "_" + applicationConfig.getKafkaGroupId()); + this.applicationConfig.addKafkaSecurityProps(consumerProps); return ReceiverOptions.create(consumerProps) .subscription(Collections.singleton(this.applicationConfig.getKafkaInputTopic())); diff --git a/influxlogger/src/main/java/org/oran/pmlog/clients/AsyncRestClient.java b/influxlogger/src/main/java/org/oran/pmlog/clients/AsyncRestClient.java index 106b68e..43961e8 100644 --- a/influxlogger/src/main/java/org/oran/pmlog/clients/AsyncRestClient.java +++ b/influxlogger/src/main/java/org/oran/pmlog/clients/AsyncRestClient.java @@ -29,6 +29,7 @@ import java.lang.invoke.MethodHandles; import java.util.concurrent.atomic.AtomicInteger; import org.oran.pmlog.configuration.WebClientConfig.HttpProxyConfig; +import org.oran.pmlog.oauth2.SecurityContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.MediaType; diff --git a/influxlogger/src/main/java/org/oran/pmlog/clients/AsyncRestClientFactory.java b/influxlogger/src/main/java/org/oran/pmlog/clients/AsyncRestClientFactory.java index b082188..a1d1c12 100644 --- a/influxlogger/src/main/java/org/oran/pmlog/clients/AsyncRestClientFactory.java +++ b/influxlogger/src/main/java/org/oran/pmlog/clients/AsyncRestClientFactory.java @@ -43,6 +43,7 @@ import javax.net.ssl.KeyManagerFactory; import org.oran.pmlog.configuration.WebClientConfig; import org.oran.pmlog.configuration.WebClientConfig.HttpProxyConfig; +import org.oran.pmlog.oauth2.SecurityContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.ResourceUtils; diff --git a/influxlogger/src/main/java/org/oran/pmlog/configuration/ApplicationConfig.java b/influxlogger/src/main/java/org/oran/pmlog/configuration/ApplicationConfig.java index 84c5083..fc613c5 100644 --- a/influxlogger/src/main/java/org/oran/pmlog/configuration/ApplicationConfig.java +++ b/influxlogger/src/main/java/org/oran/pmlog/configuration/ApplicationConfig.java @@ -23,10 +23,16 @@ package org.oran.pmlog.configuration; import java.lang.invoke.MethodHandles; import java.nio.charset.Charset; import java.nio.file.Files; +import java.util.Map; import lombok.Getter; import lombok.ToString; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.security.auth.SecurityProtocol; +import org.oran.pmlog.oauth2.OAuthKafkaAuthenticateLoginCallbackHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Value; @@ -61,10 +67,10 @@ public class ApplicationConfig { @Value("${app.webclient.trust-store}") private String sslTrustStore = ""; - @Value("${app.webclient.http.proxy-host:}") + @Value("${app.webclient.http.proxy-host}") private String httpProxyHost = ""; - @Value("${app.webclient.http.proxy-port:0}") + @Value("${app.webclient.http.proxy-port}") private int httpProxyPort = 0; @Getter @@ -102,7 +108,7 @@ public class ApplicationConfig { private String icsBaseUrl; @Getter - @Value("${app.consumer-job-id:shouldHaveBeenDefinedInYaml}") + @Value("${app.consumer-job-id}") private String consumerJobId; @Getter @@ -125,6 +131,24 @@ public class ApplicationConfig { @Value("${app.influx.org}") private String influxOrg; + @Value("${app.kafka.ssl.key-store-type}") + private String kafkaKeyStoreType; + + @Value("${app.kafka.ssl.key-store-location}") + private String kafkaKeyStoreLocation; + + @Value("${app.kafka.ssl.key-store-password}") + private String kafkaKeyStorePassword; + + @Value("${app.kafka.ssl.trust-store-type}") + private String kafkaTrustStoreType; + + @Value("${app.kafka.ssl.trust-store-location}") + private String kafkTrustStoreLocation; + + @Value("${app.kafka.use-oath-token}") + private boolean useOathToken; + private WebClientConfig webClientConfig = null; public WebClientConfig getWebClientConfig() { @@ -148,6 +172,32 @@ public class ApplicationConfig { return this.webClientConfig; } + public void addKafkaSecurityProps(Map props) { + + if (useOathToken) { + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); + props.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER"); + props.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, + OAuthKafkaAuthenticateLoginCallbackHandler.class.getName()); + props.put(SaslConfigs.SASL_JAAS_CONFIG, + "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub=\"alice\"; "); + } + if (!kafkaKeyStoreLocation.isEmpty()) { + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name); + // SSL + props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, kafkaKeyStoreType); + props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, kafkaKeyStoreLocation); + if (!kafkaKeyStorePassword.isEmpty()) { + props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaKeyStorePassword); + } + if (!kafkTrustStoreLocation.isEmpty()) { + props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, kafkaTrustStoreType); + props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkTrustStoreLocation); + } + } + + } + public String getConsumerJobInfo() { try { diff --git a/influxlogger/src/main/java/org/oran/pmlog/exceptions/ServiceException.java b/influxlogger/src/main/java/org/oran/pmlog/exceptions/ServiceException.java index f05d955..f7e4999 100644 --- a/influxlogger/src/main/java/org/oran/pmlog/exceptions/ServiceException.java +++ b/influxlogger/src/main/java/org/oran/pmlog/exceptions/ServiceException.java @@ -36,4 +36,8 @@ public class ServiceException extends Exception { this.httpStatus = httpStatus; } + public ServiceException(String message) { + this(message, HttpStatus.I_AM_A_TEAPOT); + } + } diff --git a/influxlogger/src/main/java/org/oran/pmlog/oauth2/OAuthBearerTokenJwt.java b/influxlogger/src/main/java/org/oran/pmlog/oauth2/OAuthBearerTokenJwt.java new file mode 100644 index 0000000..841b8c6 --- /dev/null +++ b/influxlogger/src/main/java/org/oran/pmlog/oauth2/OAuthBearerTokenJwt.java @@ -0,0 +1,101 @@ +// ============LICENSE_START=============================================== +// Copyright (C) 2023 Nordix Foundation. All rights reserved. +// ======================================================================== +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ============LICENSE_END================================================= +// + +package org.oran.pmlog.oauth2; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonMappingException; + +import java.util.Base64; +import java.util.HashSet; +import java.util.Set; + +import lombok.ToString; + +import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; +import org.oran.pmlog.exceptions.ServiceException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OAuthBearerTokenJwt implements OAuthBearerToken { + private static final Logger logger = LoggerFactory.getLogger(OAuthBearerTokenJwt.class); + private static final com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create(); + + private final String jwtTokenRaw; + private final JwtTokenBody tokenBody; + + @ToString + private static class JwtTokenBody { + String sub = ""; // principalName + long exp = 0; // expirationTime + long iat = 0; // startTime + String scope = ""; + } + + public static OAuthBearerTokenJwt create(String tokenRaw) + throws ServiceException, JsonMappingException, JsonProcessingException { + String[] chunks = tokenRaw.split("\\."); + Base64.Decoder decoder = Base64.getUrlDecoder(); + if (chunks.length < 2) { + throw new ServiceException("Could not parse JWT token: " + tokenRaw); + + } + String payloadStr = new String(decoder.decode(chunks[1])); + JwtTokenBody token = gson.fromJson(payloadStr, JwtTokenBody.class); + logger.error("Token: {}", token); + return new OAuthBearerTokenJwt(token, tokenRaw); + } + + private OAuthBearerTokenJwt(JwtTokenBody jwtTokenBody, String accessToken) { + super(); + this.jwtTokenRaw = accessToken; + this.tokenBody = jwtTokenBody; + } + + @Override + public String value() { + return jwtTokenRaw; + } + + @Override + public Set scope() { + Set res = new HashSet<>(); + if (!this.tokenBody.scope.isEmpty()) { + res.add(this.tokenBody.scope); + } + return res; + } + + @Override + public long lifetimeMs() { + if (this.tokenBody.exp == 0) { + return Long.MAX_VALUE; + } + return this.tokenBody.exp * 1000; + } + + @Override + public String principalName() { + return this.tokenBody.sub; + } + + @Override + public Long startTimeMs() { + return this.tokenBody.iat; + } + +} diff --git a/influxlogger/src/main/java/org/oran/pmlog/oauth2/OAuthKafkaAuthenticateLoginCallbackHandler.java b/influxlogger/src/main/java/org/oran/pmlog/oauth2/OAuthKafkaAuthenticateLoginCallbackHandler.java new file mode 100644 index 0000000..b48f222 --- /dev/null +++ b/influxlogger/src/main/java/org/oran/pmlog/oauth2/OAuthKafkaAuthenticateLoginCallbackHandler.java @@ -0,0 +1,93 @@ +// ============LICENSE_START=============================================== +// Copyright (C) 2023 Nordix Foundation. All rights reserved. +// ======================================================================== +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ============LICENSE_END================================================= +// + +package org.oran.pmlog.oauth2; + +import java.io.IOException; +import java.util.*; + +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.auth.login.AppConfigurationEntry; + +import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; +import org.apache.kafka.common.security.auth.SaslExtensions; +import org.apache.kafka.common.security.auth.SaslExtensionsCallback; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback; +import org.oran.pmlog.exceptions.ServiceException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OAuthKafkaAuthenticateLoginCallbackHandler implements AuthenticateCallbackHandler { + private final Logger logger = LoggerFactory.getLogger(OAuthKafkaAuthenticateLoginCallbackHandler.class); + + private boolean isConfigured = false; + + @Override + public void configure(Map map, String saslMechanism, List jaasConfigEntries) { + + if (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism)) + throw new IllegalArgumentException(String.format("Unexpected SASL mechanism: %s", saslMechanism)); + if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == null) + throw new IllegalArgumentException( + String.format("Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)", + jaasConfigEntries.size())); + isConfigured = true; + } + + @Override + public void close() {} + + @Override + public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { + + if (!this.isConfigured) + throw new IllegalStateException("Callback handler not configured"); + for (Callback callback : callbacks) { + logger.debug("callback " + callback.toString()); + if (callback instanceof OAuthBearerTokenCallback) { + handleCallback((OAuthBearerTokenCallback) callback); + } else if (callback instanceof SaslExtensionsCallback) { + handleCallback((SaslExtensionsCallback) callback); + } else { + logger.error("Unsupported callback: {}", callback); + throw new UnsupportedCallbackException(callback); + } + } + } + + private void handleCallback(SaslExtensionsCallback callback) { + callback.extensions(SaslExtensions.empty()); + } + + private void handleCallback(OAuthBearerTokenCallback callback) { + try { + if (callback.token() != null) { + throw new ServiceException("Callback had a token already"); + } + + String accessToken = SecurityContext.getInstance().getBearerAuthToken(); + OAuthBearerTokenJwt token = OAuthBearerTokenJwt.create(accessToken); + + callback.token(token); + } catch (Exception e) { + logger.error("Could not handle login callback: {}", e.getMessage()); + } + } + +} diff --git a/pmproducer/src/main/java/org/oran/pmproducer/clients/SecurityContext.java b/influxlogger/src/main/java/org/oran/pmlog/oauth2/SecurityContext.java similarity index 90% rename from pmproducer/src/main/java/org/oran/pmproducer/clients/SecurityContext.java rename to influxlogger/src/main/java/org/oran/pmlog/oauth2/SecurityContext.java index 5b77447..82d44a7 100644 --- a/pmproducer/src/main/java/org/oran/pmproducer/clients/SecurityContext.java +++ b/influxlogger/src/main/java/org/oran/pmlog/oauth2/SecurityContext.java @@ -18,12 +18,13 @@ * ========================LICENSE_END=================================== */ -package org.oran.pmproducer.clients; +package org.oran.pmlog.oauth2; import java.lang.invoke.MethodHandles; import java.nio.file.Files; import java.nio.file.Path; +import lombok.Getter; import lombok.Setter; import org.slf4j.Logger; @@ -44,10 +45,14 @@ public class SecurityContext { private String authToken = ""; + @Getter + private static SecurityContext instance; + @Setter private Path authTokenFilePath; public SecurityContext(@Value("${app.auth-token-file:}") String authTokenFilename) { + instance = this; if (!authTokenFilename.isEmpty()) { this.authTokenFilePath = Path.of(authTokenFilename); } @@ -59,12 +64,14 @@ public class SecurityContext { public synchronized String getBearerAuthToken() { if (!isConfigured()) { + logger.warn("No configuration for auth token"); return ""; } try { long lastModified = authTokenFilePath.toFile().lastModified(); if (tokenTimestamp == 0 || lastModified != this.tokenTimestamp) { this.authToken = Files.readString(authTokenFilePath); + this.authToken = this.authToken.trim(); this.tokenTimestamp = lastModified; } } catch (Exception e) { diff --git a/influxlogger/src/test/java/org/oran/pmlog/ApplicationTest.java b/influxlogger/src/test/java/org/oran/pmlog/ApplicationTest.java index da12e0a..46de821 100644 --- a/influxlogger/src/test/java/org/oran/pmlog/ApplicationTest.java +++ b/influxlogger/src/test/java/org/oran/pmlog/ApplicationTest.java @@ -43,11 +43,11 @@ import org.junit.jupiter.api.TestMethodOrder; import org.mockito.Mockito; import org.oran.pmlog.clients.AsyncRestClient; import org.oran.pmlog.clients.AsyncRestClientFactory; -import org.oran.pmlog.clients.SecurityContext; import org.oran.pmlog.configuration.ApplicationConfig; import org.oran.pmlog.configuration.ConsumerJobInfo; import org.oran.pmlog.configuration.WebClientConfig; import org.oran.pmlog.configuration.WebClientConfig.HttpProxyConfig; +import org.oran.pmlog.oauth2.SecurityContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; diff --git a/influxlogger/src/test/java/org/oran/pmlog/Integration.java b/influxlogger/src/test/java/org/oran/pmlog/Integration.java index 840298d..1fad28d 100644 --- a/influxlogger/src/test/java/org/oran/pmlog/Integration.java +++ b/influxlogger/src/test/java/org/oran/pmlog/Integration.java @@ -58,7 +58,9 @@ import reactor.kafka.sender.SenderRecord; "server.ssl.key-store=./config/keystore.jks", // "app.webclient.trust-store=./config/truststore.jks", // "app.configuration-filepath=./src/test/resources/test_application_configuration.json", // - "app.pm-files-path=./src/test/resources/" // + "app.pm-files-path=./src/test/resources/", // + "app.auth-token-file=src/test/resources/jwtToken.b64", // + "app.kafka.use-oath-token=false" // }) // class Integration { @@ -114,6 +116,7 @@ class Integration { props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + this.applicationConfig.addKafkaSecurityProps(props); return SenderOptions.create(props); } diff --git a/influxlogger/src/test/resources/jwtToken.b64 b/influxlogger/src/test/resources/jwtToken.b64 new file mode 100644 index 0000000..b19a883 --- /dev/null +++ b/influxlogger/src/test/resources/jwtToken.b64 @@ -0,0 +1 @@ +eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c diff --git a/pmproducer/config/application.yaml b/pmproducer/config/application.yaml index 27885f1..0e611b5 100644 --- a/pmproducer/config/application.yaml +++ b/pmproducer/config/application.yaml @@ -80,12 +80,22 @@ app: # The url used to adress this component. This is used as a callback url sent to other components. pm-producer-base-url: https://localhost:8435 - # KAFKA boostrap servers. This is only needed if there are Information Types that uses a kafkaInputTopic - # several redundant boostrap servers can be specified, separated by a comma ','. + kafka: + # KAFKA boostrap servers. + # Several redundant boostrap servers can be specified, separated by a comma ','. bootstrap-servers: localhost:9092 # The maximum number of records returned in a single call to poll() (default 100) max-poll-records: 500 + # Configues if oath2 tokens shall be used. If set to true, auth-token-file must also be configured + use-oath-token: false + ssl: + key-store-type: PEM + key-store-location: + # key password is needed if the private key is encrypted + key-store-password: + trust-store-type: PEM + trust-store-location: # If the file name is empty, no authorization token is used auth-token-file: pm-files-path: /tmp diff --git a/pmproducer/src/main/java/org/oran/pmproducer/Application.java b/pmproducer/src/main/java/org/oran/pmproducer/Application.java index e92bd47..0ffb73f 100644 --- a/pmproducer/src/main/java/org/oran/pmproducer/Application.java +++ b/pmproducer/src/main/java/org/oran/pmproducer/Application.java @@ -37,7 +37,6 @@ public class Application { private static final Logger logger = LoggerFactory.getLogger(Application.class); - private long configFileLastModification = 0; private static ConfigurableApplicationContext applicationContext; public static void main(String[] args) { @@ -67,4 +66,5 @@ public class Application { thread.setDaemon(false); thread.start(); } + } diff --git a/pmproducer/src/main/java/org/oran/pmproducer/clients/AsyncRestClient.java b/pmproducer/src/main/java/org/oran/pmproducer/clients/AsyncRestClient.java index ac76436..1fc0620 100644 --- a/pmproducer/src/main/java/org/oran/pmproducer/clients/AsyncRestClient.java +++ b/pmproducer/src/main/java/org/oran/pmproducer/clients/AsyncRestClient.java @@ -29,6 +29,7 @@ import java.lang.invoke.MethodHandles; import java.util.concurrent.atomic.AtomicInteger; import org.oran.pmproducer.configuration.WebClientConfig.HttpProxyConfig; +import org.oran.pmproducer.oauth2.SecurityContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.MediaType; diff --git a/pmproducer/src/main/java/org/oran/pmproducer/clients/AsyncRestClientFactory.java b/pmproducer/src/main/java/org/oran/pmproducer/clients/AsyncRestClientFactory.java index 783da3a..7d50fa0 100644 --- a/pmproducer/src/main/java/org/oran/pmproducer/clients/AsyncRestClientFactory.java +++ b/pmproducer/src/main/java/org/oran/pmproducer/clients/AsyncRestClientFactory.java @@ -43,6 +43,7 @@ import javax.net.ssl.KeyManagerFactory; import org.oran.pmproducer.configuration.WebClientConfig; import org.oran.pmproducer.configuration.WebClientConfig.HttpProxyConfig; +import org.oran.pmproducer.oauth2.SecurityContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.util.ResourceUtils; diff --git a/pmproducer/src/main/java/org/oran/pmproducer/configuration/ApplicationConfig.java b/pmproducer/src/main/java/org/oran/pmproducer/configuration/ApplicationConfig.java index 18fcd8f..a70243b 100644 --- a/pmproducer/src/main/java/org/oran/pmproducer/configuration/ApplicationConfig.java +++ b/pmproducer/src/main/java/org/oran/pmproducer/configuration/ApplicationConfig.java @@ -26,12 +26,18 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.Collection; import java.util.Collections; +import java.util.Map; import lombok.Getter; import lombok.Setter; import lombok.ToString; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.config.SaslConfigs; +import org.apache.kafka.common.config.SslConfigs; +import org.apache.kafka.common.security.auth.SecurityProtocol; import org.oran.pmproducer.configuration.WebClientConfig.HttpProxyConfig; +import org.oran.pmproducer.oauth2.OAuthKafkaAuthenticateLoginCallbackHandler; import org.oran.pmproducer.repository.InfoType; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,10 +75,10 @@ public class ApplicationConfig { @Value("${app.webclient.trust-store}") private String sslTrustStore = ""; - @Value("${app.webclient.http.proxy-host:}") + @Value("${app.webclient.http.proxy-host}") private String httpProxyHost = ""; - @Value("${app.webclient.http.proxy-port:0}") + @Value("${app.webclient.http.proxy-port}") private int httpProxyPort = 0; @Getter @@ -97,7 +103,7 @@ public class ApplicationConfig { private int kafkaMaxPollRecords; @Getter - @Value("${app.pm-files-path:}") + @Value("${app.pm-files-path}") private String pmFilesPath; @Getter @@ -122,9 +128,27 @@ public class ApplicationConfig { @Getter @Setter - @Value("${app.zip-output:}") + @Value("${app.zip-output}") private boolean zipOutput; + @Value("${app.kafka.ssl.key-store-type}") + private String kafkaKeyStoreType; + + @Value("${app.kafka.ssl.key-store-location}") + private String kafkaKeyStoreLocation; + + @Value("${app.kafka.ssl.key-store-password}") + private String kafkaKeyStorePassword; + + @Value("${app.kafka.ssl.trust-store-type}") + private String kafkaTrustStoreType; + + @Value("${app.kafka.ssl.trust-store-location}") + private String kafkTrustStoreLocation; + + @Value("${app.kafka.use-oath-token}") + private boolean useOathToken; + private WebClientConfig webClientConfig = null; public WebClientConfig getWebClientConfig() { @@ -167,7 +191,31 @@ public class ApplicationConfig { logger.error("Could not load configuration file {}", getLocalConfigurationFilePath()); return Collections.emptyList(); } + } + + public void addKafkaSecurityProps(Map props) { + if (useOathToken) { + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name); + props.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER"); + props.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS, + OAuthKafkaAuthenticateLoginCallbackHandler.class.getName()); + props.put(SaslConfigs.SASL_JAAS_CONFIG, + "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub=\"alice\"; "); + } + if (!kafkaKeyStoreLocation.isEmpty()) { + props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name); + // SSL + props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, kafkaKeyStoreType); + props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, kafkaKeyStoreLocation); + if (!kafkaKeyStorePassword.isEmpty()) { + props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaKeyStorePassword); + } + if (!kafkTrustStoreLocation.isEmpty()) { + props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, kafkaTrustStoreType); + props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkTrustStoreLocation); + } + } } } diff --git a/pmproducer/src/main/java/org/oran/pmproducer/exceptions/ServiceException.java b/pmproducer/src/main/java/org/oran/pmproducer/exceptions/ServiceException.java index 55e2a48..5a8fa36 100644 --- a/pmproducer/src/main/java/org/oran/pmproducer/exceptions/ServiceException.java +++ b/pmproducer/src/main/java/org/oran/pmproducer/exceptions/ServiceException.java @@ -36,4 +36,9 @@ public class ServiceException extends Exception { this.httpStatus = httpStatus; } + public ServiceException(String message) { + super(message); + this.httpStatus = HttpStatus.BAD_REQUEST; + } + } diff --git a/pmproducer/src/main/java/org/oran/pmproducer/oauth2/OAuthBearerTokenJwt.java b/pmproducer/src/main/java/org/oran/pmproducer/oauth2/OAuthBearerTokenJwt.java new file mode 100644 index 0000000..7ee2053 --- /dev/null +++ b/pmproducer/src/main/java/org/oran/pmproducer/oauth2/OAuthBearerTokenJwt.java @@ -0,0 +1,101 @@ +// ============LICENSE_START=============================================== +// Copyright (C) 2023 Nordix Foundation. All rights reserved. +// ======================================================================== +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ============LICENSE_END================================================= +// + +package org.oran.pmproducer.oauth2; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonMappingException; + +import java.util.Base64; +import java.util.HashSet; +import java.util.Set; + +import lombok.ToString; + +import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken; +import org.oran.pmproducer.exceptions.ServiceException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OAuthBearerTokenJwt implements OAuthBearerToken { + private static final Logger logger = LoggerFactory.getLogger(OAuthBearerTokenJwt.class); + private static final com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create(); + + private final String jwtTokenRaw; + private final JwtTokenBody tokenBody; + + @ToString + private static class JwtTokenBody { + String sub = ""; // principalName + long exp = 0; // expirationTime + long iat = 0; // startTime + String scope = ""; + } + + public static OAuthBearerTokenJwt create(String tokenRaw) + throws ServiceException, JsonMappingException, JsonProcessingException { + String[] chunks = tokenRaw.split("\\."); + Base64.Decoder decoder = Base64.getUrlDecoder(); + if (chunks.length < 2) { + throw new ServiceException("Could not parse JWT token: " + tokenRaw); + + } + String payloadStr = new String(decoder.decode(chunks[1])); + JwtTokenBody token = gson.fromJson(payloadStr, JwtTokenBody.class); + logger.error("Token: {}", token); + return new OAuthBearerTokenJwt(token, tokenRaw); + } + + private OAuthBearerTokenJwt(JwtTokenBody jwtTokenBody, String accessToken) { + super(); + this.jwtTokenRaw = accessToken; + this.tokenBody = jwtTokenBody; + } + + @Override + public String value() { + return jwtTokenRaw; + } + + @Override + public Set scope() { + Set res = new HashSet<>(); + if (!this.tokenBody.scope.isEmpty()) { + res.add(this.tokenBody.scope); + } + return res; + } + + @Override + public long lifetimeMs() { + if (this.tokenBody.exp == 0) { + return Long.MAX_VALUE; + } + return this.tokenBody.exp * 1000; + } + + @Override + public String principalName() { + return this.tokenBody.sub; + } + + @Override + public Long startTimeMs() { + return this.tokenBody.iat; + } + +} diff --git a/pmproducer/src/main/java/org/oran/pmproducer/oauth2/OAuthKafkaAuthenticateLoginCallbackHandler.java b/pmproducer/src/main/java/org/oran/pmproducer/oauth2/OAuthKafkaAuthenticateLoginCallbackHandler.java new file mode 100644 index 0000000..b209da3 --- /dev/null +++ b/pmproducer/src/main/java/org/oran/pmproducer/oauth2/OAuthKafkaAuthenticateLoginCallbackHandler.java @@ -0,0 +1,93 @@ +// ============LICENSE_START=============================================== +// Copyright (C) 2023 Nordix Foundation. All rights reserved. +// ======================================================================== +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// ============LICENSE_END================================================= +// + +package org.oran.pmproducer.oauth2; + +import java.io.IOException; +import java.util.*; + +import javax.security.auth.callback.Callback; +import javax.security.auth.callback.UnsupportedCallbackException; +import javax.security.auth.login.AppConfigurationEntry; + +import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler; +import org.apache.kafka.common.security.auth.SaslExtensions; +import org.apache.kafka.common.security.auth.SaslExtensionsCallback; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule; +import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback; +import org.oran.pmproducer.exceptions.ServiceException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class OAuthKafkaAuthenticateLoginCallbackHandler implements AuthenticateCallbackHandler { + private final Logger logger = LoggerFactory.getLogger(OAuthKafkaAuthenticateLoginCallbackHandler.class); + + private boolean isConfigured = false; + + @Override + public void configure(Map map, String saslMechanism, List jaasConfigEntries) { + + if (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism)) + throw new IllegalArgumentException(String.format("Unexpected SASL mechanism: %s", saslMechanism)); + if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == null) + throw new IllegalArgumentException( + String.format("Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)", + jaasConfigEntries.size())); + isConfigured = true; + } + + @Override + public void close() {} + + @Override + public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { + + if (!this.isConfigured) + throw new IllegalStateException("Callback handler not configured"); + for (Callback callback : callbacks) { + logger.debug("callback " + callback.toString()); + if (callback instanceof OAuthBearerTokenCallback) { + handleCallback((OAuthBearerTokenCallback) callback); + } else if (callback instanceof SaslExtensionsCallback) { + handleCallback((SaslExtensionsCallback) callback); + } else { + logger.error("Unsupported callback: {}", callback); + throw new UnsupportedCallbackException(callback); + } + } + } + + private void handleCallback(SaslExtensionsCallback callback) { + callback.extensions(SaslExtensions.empty()); + } + + private void handleCallback(OAuthBearerTokenCallback callback) { + try { + if (callback.token() != null) { + throw new ServiceException("Callback had a token already", null); + } + + String accessToken = SecurityContext.getInstance().getBearerAuthToken(); + OAuthBearerTokenJwt token = OAuthBearerTokenJwt.create(accessToken); + + callback.token(token); + } catch (Exception e) { + logger.error("Could not handle login callback: {}", e.getMessage()); + } + } + +} diff --git a/influxlogger/src/main/java/org/oran/pmlog/clients/SecurityContext.java b/pmproducer/src/main/java/org/oran/pmproducer/oauth2/SecurityContext.java similarity index 85% rename from influxlogger/src/main/java/org/oran/pmlog/clients/SecurityContext.java rename to pmproducer/src/main/java/org/oran/pmproducer/oauth2/SecurityContext.java index 5aeb4a3..936d4a9 100644 --- a/influxlogger/src/main/java/org/oran/pmlog/clients/SecurityContext.java +++ b/pmproducer/src/main/java/org/oran/pmproducer/oauth2/SecurityContext.java @@ -2,7 +2,7 @@ * ========================LICENSE_START================================= * O-RAN-SC * %% - * Copyright (C) 2022 Nordix Foundation + * Copyright (C) 2023 Nordix Foundation * %% * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,12 +18,13 @@ * ========================LICENSE_END=================================== */ -package org.oran.pmlog.clients; +package org.oran.pmproducer.oauth2; import java.lang.invoke.MethodHandles; import java.nio.file.Files; import java.nio.file.Path; +import lombok.Getter; import lombok.Setter; import org.slf4j.Logger; @@ -44,10 +45,14 @@ public class SecurityContext { private String authToken = ""; + @Getter + private static SecurityContext instance; + @Setter private Path authTokenFilePath; - public SecurityContext(@Value("${app.auth-token-file:}") String authTokenFilename) { + public SecurityContext(@Value("${app.auth-token-file}") String authTokenFilename) { + instance = this; if (!authTokenFilename.isEmpty()) { this.authTokenFilePath = Path.of(authTokenFilename); } @@ -59,12 +64,14 @@ public class SecurityContext { public synchronized String getBearerAuthToken() { if (!isConfigured()) { + logger.warn("No configuration for auth token"); return ""; } try { long lastModified = authTokenFilePath.toFile().lastModified(); if (tokenTimestamp == 0 || lastModified != this.tokenTimestamp) { this.authToken = Files.readString(authTokenFilePath); + this.authToken = this.authToken.trim(); this.tokenTimestamp = lastModified; } } catch (Exception e) { diff --git a/pmproducer/src/main/java/org/oran/pmproducer/repository/Jobs.java b/pmproducer/src/main/java/org/oran/pmproducer/repository/Jobs.java index 5b257bf..2b48b62 100644 --- a/pmproducer/src/main/java/org/oran/pmproducer/repository/Jobs.java +++ b/pmproducer/src/main/java/org/oran/pmproducer/repository/Jobs.java @@ -29,13 +29,12 @@ import java.util.Vector; import lombok.Getter; -import org.oran.pmproducer.clients.AsyncRestClientFactory; -import org.oran.pmproducer.clients.SecurityContext; import org.oran.pmproducer.configuration.ApplicationConfig; import org.oran.pmproducer.exceptions.ServiceException; import org.oran.pmproducer.filter.FilterFactory; import org.oran.pmproducer.filter.FilteredData; import org.oran.pmproducer.filter.PmReportFilter; +import org.oran.pmproducer.oauth2.SecurityContext; import org.oran.pmproducer.repository.Job.Parameters; import org.oran.pmproducer.repository.Job.Parameters.KafkaDeliveryInfo; import org.oran.pmproducer.tasks.TopicListener.DataFromTopic; @@ -117,13 +116,11 @@ public class Jobs { private Map allJobs = new HashMap<>(); private MultiMap jobsByType = new MultiMap<>(); private Map jobGroups = new HashMap<>(); // Key is Topic - private final AsyncRestClientFactory restclientFactory; private final List observers = new ArrayList<>(); private final ApplicationConfig appConfig; public Jobs(@Autowired ApplicationConfig applicationConfig, @Autowired SecurityContext securityContext, @Autowired ApplicationConfig appConfig) { - restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig(), securityContext); this.appConfig = appConfig; } diff --git a/pmproducer/src/main/java/org/oran/pmproducer/tasks/JobDataDistributor.java b/pmproducer/src/main/java/org/oran/pmproducer/tasks/JobDataDistributor.java index d9f6632..f2d7b53 100644 --- a/pmproducer/src/main/java/org/oran/pmproducer/tasks/JobDataDistributor.java +++ b/pmproducer/src/main/java/org/oran/pmproducer/tasks/JobDataDistributor.java @@ -93,6 +93,7 @@ public class JobDataDistributor { SenderOptions senderOptions = senderOptions(applConfig, jobGroup.getDeliveryInfo()); this.sender = KafkaSender.create(senderOptions); + } public void start(Flux input) { @@ -150,6 +151,9 @@ public class JobDataDistributor { props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + + config.addKafkaSecurityProps(props); + return SenderOptions.create(props); } diff --git a/pmproducer/src/main/java/org/oran/pmproducer/tasks/ProducerRegstrationTask.java b/pmproducer/src/main/java/org/oran/pmproducer/tasks/ProducerRegstrationTask.java index 4bd95a1..14789f4 100644 --- a/pmproducer/src/main/java/org/oran/pmproducer/tasks/ProducerRegstrationTask.java +++ b/pmproducer/src/main/java/org/oran/pmproducer/tasks/ProducerRegstrationTask.java @@ -32,10 +32,10 @@ import lombok.Getter; import org.oran.pmproducer.clients.AsyncRestClient; import org.oran.pmproducer.clients.AsyncRestClientFactory; -import org.oran.pmproducer.clients.SecurityContext; import org.oran.pmproducer.configuration.ApplicationConfig; import org.oran.pmproducer.controllers.ProducerCallbacksController; import org.oran.pmproducer.exceptions.ServiceException; +import org.oran.pmproducer.oauth2.SecurityContext; import org.oran.pmproducer.r1.ConsumerJobInfo; import org.oran.pmproducer.r1.ProducerInfoTypeInfo; import org.oran.pmproducer.r1.ProducerRegistrationInfo; diff --git a/pmproducer/src/main/java/org/oran/pmproducer/tasks/TopicListener.java b/pmproducer/src/main/java/org/oran/pmproducer/tasks/TopicListener.java index 351fcc6..3dd2475 100644 --- a/pmproducer/src/main/java/org/oran/pmproducer/tasks/TopicListener.java +++ b/pmproducer/src/main/java/org/oran/pmproducer/tasks/TopicListener.java @@ -155,21 +155,22 @@ public class TopicListener { } private ReceiverOptions kafkaInputProperties(String clientId) { - Map consumerProps = new HashMap<>(); + Map props = new HashMap<>(); if (this.applicationConfig.getKafkaBootStrapServers().isEmpty()) { logger.error("No kafka boostrap server is setup"); } - consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); - consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers()); - consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId); - consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); - consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers()); + props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId); + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class); + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId + "_" + kafkaGroupId); - consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId + "_" + kafkaGroupId); + this.applicationConfig.addKafkaSecurityProps(props); - return ReceiverOptions.create(consumerProps) + return ReceiverOptions.create(props) .subscription(Collections.singleton(this.type.getKafkaInputTopic())); } diff --git a/pmproducer/src/test/java/org/oran/pmproducer/ApplicationTest.java b/pmproducer/src/test/java/org/oran/pmproducer/ApplicationTest.java index 8cba2ca..095a96c 100644 --- a/pmproducer/src/test/java/org/oran/pmproducer/ApplicationTest.java +++ b/pmproducer/src/test/java/org/oran/pmproducer/ApplicationTest.java @@ -53,7 +53,6 @@ import org.junit.jupiter.api.TestMethodOrder; import org.mockito.ArgumentCaptor; import org.oran.pmproducer.clients.AsyncRestClient; import org.oran.pmproducer.clients.AsyncRestClientFactory; -import org.oran.pmproducer.clients.SecurityContext; import org.oran.pmproducer.configuration.ApplicationConfig; import org.oran.pmproducer.configuration.WebClientConfig; import org.oran.pmproducer.configuration.WebClientConfig.HttpProxyConfig; @@ -64,6 +63,7 @@ import org.oran.pmproducer.filter.FilteredData; import org.oran.pmproducer.filter.PmReport; import org.oran.pmproducer.filter.PmReportFilter; import org.oran.pmproducer.filter.PmReportFilter.FilterData; +import org.oran.pmproducer.oauth2.SecurityContext; import org.oran.pmproducer.r1.ConsumerJobInfo; import org.oran.pmproducer.r1.ProducerJobInfo; import org.oran.pmproducer.repository.InfoType; @@ -235,11 +235,6 @@ class ApplicationTest { return "https://localhost:" + this.applicationConfig.getLocalServerHttpPort(); } - private String quote(String str) { - final String q = "\""; - return q + str.replace(q, "\\\"") + q; - } - private Object toJson(String json) { try { return JsonParser.parseString(json).getAsJsonObject(); diff --git a/pmproducer/src/test/java/org/oran/pmproducer/IntegrationWithIcs.java b/pmproducer/src/test/java/org/oran/pmproducer/IntegrationWithIcs.java index 8ce5547..de79b60 100644 --- a/pmproducer/src/test/java/org/oran/pmproducer/IntegrationWithIcs.java +++ b/pmproducer/src/test/java/org/oran/pmproducer/IntegrationWithIcs.java @@ -30,11 +30,11 @@ import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.oran.pmproducer.clients.AsyncRestClient; import org.oran.pmproducer.clients.AsyncRestClientFactory; -import org.oran.pmproducer.clients.SecurityContext; import org.oran.pmproducer.configuration.ApplicationConfig; import org.oran.pmproducer.configuration.WebClientConfig; import org.oran.pmproducer.configuration.WebClientConfig.HttpProxyConfig; import org.oran.pmproducer.filter.PmReportFilter; +import org.oran.pmproducer.oauth2.SecurityContext; import org.oran.pmproducer.r1.ConsumerJobInfo; import org.oran.pmproducer.repository.Jobs; import org.oran.pmproducer.tasks.ProducerRegstrationTask; diff --git a/pmproducer/src/test/java/org/oran/pmproducer/IntegrationWithKafka.java b/pmproducer/src/test/java/org/oran/pmproducer/IntegrationWithKafka.java index 39363f8..70922d8 100644 --- a/pmproducer/src/test/java/org/oran/pmproducer/IntegrationWithKafka.java +++ b/pmproducer/src/test/java/org/oran/pmproducer/IntegrationWithKafka.java @@ -44,7 +44,6 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.oran.pmproducer.clients.AsyncRestClient; import org.oran.pmproducer.clients.AsyncRestClientFactory; -import org.oran.pmproducer.clients.SecurityContext; import org.oran.pmproducer.configuration.ApplicationConfig; import org.oran.pmproducer.configuration.WebClientConfig; import org.oran.pmproducer.configuration.WebClientConfig.HttpProxyConfig; @@ -52,6 +51,7 @@ import org.oran.pmproducer.controllers.ProducerCallbacksController; import org.oran.pmproducer.controllers.ProducerCallbacksController.StatisticsCollection; import org.oran.pmproducer.datastore.DataStore; import org.oran.pmproducer.filter.PmReportFilter; +import org.oran.pmproducer.oauth2.SecurityContext; import org.oran.pmproducer.r1.ConsumerJobInfo; import org.oran.pmproducer.repository.InfoType; import org.oran.pmproducer.repository.InfoTypes; @@ -87,8 +87,9 @@ import reactor.kafka.sender.SenderRecord; "app.pm-files-path=./src/test/resources/", // "app.s3.locksBucket=ropfilelocks", // "app.pm-files-path=/tmp/dmaapadaptor", // - "app.s3.bucket=dmaaptest" // -}) // + "app.s3.bucket=dmaaptest", // + "app.auth-token-file=src/test/resources/jwtToken.b64", // + "app.kafka.use-oath-token=false"}) // class IntegrationWithKafka { final static String PM_TYPE_ID = "PmDataOverKafka"; @@ -321,6 +322,10 @@ class IntegrationWithKafka { props.put(ProducerConfig.ACKS_CONFIG, "all"); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class); + + // Security + this.applicationConfig.addKafkaSecurityProps(props); + return SenderOptions.create(props); } diff --git a/pmproducer/src/test/resources/jwtToken.b64 b/pmproducer/src/test/resources/jwtToken.b64 new file mode 100644 index 0000000..b19a883 --- /dev/null +++ b/pmproducer/src/test/resources/jwtToken.b64 @@ -0,0 +1 @@ +eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c -- 2.16.6