max-poll-records: 500
group-id: kafkaGroupId
client-id: kafkaClientId
+ # Configues if oath2 tokens shall be used. If set to true, auth-token-file must also be configured
+ use-oath-token: false
+ ssl:
+ key-store-type: PEM
+ key-store-location:
+ # key password is needed if the private key is encrypted
+ key-store-password:
+ trust-store-type: PEM
+ trust-store-location:
influx:
url: http://localhost:8086
access-token: xmrt1YobMTl-Nx-a8iiO6fC8xJc5BvKZLSU8U18VfAYza4N0YHTFrLy15W4Ss2bxXhgX95qagxsBJ0GCBSFveQ==
database: pm_data
ics-base-url: https://localhost:8434
consumer-job-id: "pmlog"
+ # If the file name is empty, no authorization token is used
+ auth-token-file:
import org.oran.pmlog.clients.AsyncRestClient;
import org.oran.pmlog.clients.AsyncRestClientFactory;
-import org.oran.pmlog.clients.SecurityContext;
import org.oran.pmlog.configuration.ApplicationConfig;
+import org.oran.pmlog.oauth2.SecurityContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId + "_" + applicationConfig.getKafkaGroupId());
+ this.applicationConfig.addKafkaSecurityProps(consumerProps);
return ReceiverOptions.<byte[], byte[]>create(consumerProps)
.subscription(Collections.singleton(this.applicationConfig.getKafkaInputTopic()));
import java.util.concurrent.atomic.AtomicInteger;
import org.oran.pmlog.configuration.WebClientConfig.HttpProxyConfig;
+import org.oran.pmlog.oauth2.SecurityContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.oran.pmlog.configuration.WebClientConfig;
import org.oran.pmlog.configuration.WebClientConfig.HttpProxyConfig;
+import org.oran.pmlog.oauth2.SecurityContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.ResourceUtils;
import java.lang.invoke.MethodHandles;
import java.nio.charset.Charset;
import java.nio.file.Files;
+import java.util.Map;
import lombok.Getter;
import lombok.ToString;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.oran.pmlog.oauth2.OAuthKafkaAuthenticateLoginCallbackHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
@Value("${app.webclient.trust-store}")
private String sslTrustStore = "";
- @Value("${app.webclient.http.proxy-host:}")
+ @Value("${app.webclient.http.proxy-host}")
private String httpProxyHost = "";
- @Value("${app.webclient.http.proxy-port:0}")
+ @Value("${app.webclient.http.proxy-port}")
private int httpProxyPort = 0;
@Getter
private String icsBaseUrl;
@Getter
- @Value("${app.consumer-job-id:shouldHaveBeenDefinedInYaml}")
+ @Value("${app.consumer-job-id}")
private String consumerJobId;
@Getter
@Value("${app.influx.org}")
private String influxOrg;
+ @Value("${app.kafka.ssl.key-store-type}")
+ private String kafkaKeyStoreType;
+
+ @Value("${app.kafka.ssl.key-store-location}")
+ private String kafkaKeyStoreLocation;
+
+ @Value("${app.kafka.ssl.key-store-password}")
+ private String kafkaKeyStorePassword;
+
+ @Value("${app.kafka.ssl.trust-store-type}")
+ private String kafkaTrustStoreType;
+
+ @Value("${app.kafka.ssl.trust-store-location}")
+ private String kafkTrustStoreLocation;
+
+ @Value("${app.kafka.use-oath-token}")
+ private boolean useOathToken;
+
private WebClientConfig webClientConfig = null;
public WebClientConfig getWebClientConfig() {
return this.webClientConfig;
}
+ public void addKafkaSecurityProps(Map<String, Object> props) {
+
+ if (useOathToken) {
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
+ props.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
+ props.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS,
+ OAuthKafkaAuthenticateLoginCallbackHandler.class.getName());
+ props.put(SaslConfigs.SASL_JAAS_CONFIG,
+ "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub=\"alice\"; ");
+ }
+ if (!kafkaKeyStoreLocation.isEmpty()) {
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name);
+ // SSL
+ props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, kafkaKeyStoreType);
+ props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, kafkaKeyStoreLocation);
+ if (!kafkaKeyStorePassword.isEmpty()) {
+ props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaKeyStorePassword);
+ }
+ if (!kafkTrustStoreLocation.isEmpty()) {
+ props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, kafkaTrustStoreType);
+ props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkTrustStoreLocation);
+ }
+ }
+
+ }
+
public String getConsumerJobInfo() {
try {
this.httpStatus = httpStatus;
}
+ public ServiceException(String message) {
+ this(message, HttpStatus.I_AM_A_TEAPOT);
+ }
+
}
--- /dev/null
+// ============LICENSE_START===============================================
+// Copyright (C) 2023 Nordix Foundation. All rights reserved.
+// ========================================================================
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ============LICENSE_END=================================================
+//
+
+package org.oran.pmlog.oauth2;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+
+import java.util.Base64;
+import java.util.HashSet;
+import java.util.Set;
+
+import lombok.ToString;
+
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.oran.pmlog.exceptions.ServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OAuthBearerTokenJwt implements OAuthBearerToken {
+ private static final Logger logger = LoggerFactory.getLogger(OAuthBearerTokenJwt.class);
+ private static final com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
+
+ private final String jwtTokenRaw;
+ private final JwtTokenBody tokenBody;
+
+ @ToString
+ private static class JwtTokenBody {
+ String sub = ""; // principalName
+ long exp = 0; // expirationTime
+ long iat = 0; // startTime
+ String scope = "";
+ }
+
+ public static OAuthBearerTokenJwt create(String tokenRaw)
+ throws ServiceException, JsonMappingException, JsonProcessingException {
+ String[] chunks = tokenRaw.split("\\.");
+ Base64.Decoder decoder = Base64.getUrlDecoder();
+ if (chunks.length < 2) {
+ throw new ServiceException("Could not parse JWT token: " + tokenRaw);
+
+ }
+ String payloadStr = new String(decoder.decode(chunks[1]));
+ JwtTokenBody token = gson.fromJson(payloadStr, JwtTokenBody.class);
+ logger.error("Token: {}", token);
+ return new OAuthBearerTokenJwt(token, tokenRaw);
+ }
+
+ private OAuthBearerTokenJwt(JwtTokenBody jwtTokenBody, String accessToken) {
+ super();
+ this.jwtTokenRaw = accessToken;
+ this.tokenBody = jwtTokenBody;
+ }
+
+ @Override
+ public String value() {
+ return jwtTokenRaw;
+ }
+
+ @Override
+ public Set<String> scope() {
+ Set<String> res = new HashSet<>();
+ if (!this.tokenBody.scope.isEmpty()) {
+ res.add(this.tokenBody.scope);
+ }
+ return res;
+ }
+
+ @Override
+ public long lifetimeMs() {
+ if (this.tokenBody.exp == 0) {
+ return Long.MAX_VALUE;
+ }
+ return this.tokenBody.exp * 1000;
+ }
+
+ @Override
+ public String principalName() {
+ return this.tokenBody.sub;
+ }
+
+ @Override
+ public Long startTimeMs() {
+ return this.tokenBody.iat;
+ }
+
+}
--- /dev/null
+// ============LICENSE_START===============================================
+// Copyright (C) 2023 Nordix Foundation. All rights reserved.
+// ========================================================================
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ============LICENSE_END=================================================
+//
+
+package org.oran.pmlog.oauth2;
+
+import java.io.IOException;
+import java.util.*;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.auth.SaslExtensions;
+import org.apache.kafka.common.security.auth.SaslExtensionsCallback;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
+import org.oran.pmlog.exceptions.ServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OAuthKafkaAuthenticateLoginCallbackHandler implements AuthenticateCallbackHandler {
+ private final Logger logger = LoggerFactory.getLogger(OAuthKafkaAuthenticateLoginCallbackHandler.class);
+
+ private boolean isConfigured = false;
+
+ @Override
+ public void configure(Map<String, ?> map, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
+
+ if (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism))
+ throw new IllegalArgumentException(String.format("Unexpected SASL mechanism: %s", saslMechanism));
+ if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == null)
+ throw new IllegalArgumentException(
+ String.format("Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)",
+ jaasConfigEntries.size()));
+ isConfigured = true;
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+
+ if (!this.isConfigured)
+ throw new IllegalStateException("Callback handler not configured");
+ for (Callback callback : callbacks) {
+ logger.debug("callback " + callback.toString());
+ if (callback instanceof OAuthBearerTokenCallback) {
+ handleCallback((OAuthBearerTokenCallback) callback);
+ } else if (callback instanceof SaslExtensionsCallback) {
+ handleCallback((SaslExtensionsCallback) callback);
+ } else {
+ logger.error("Unsupported callback: {}", callback);
+ throw new UnsupportedCallbackException(callback);
+ }
+ }
+ }
+
+ private void handleCallback(SaslExtensionsCallback callback) {
+ callback.extensions(SaslExtensions.empty());
+ }
+
+ private void handleCallback(OAuthBearerTokenCallback callback) {
+ try {
+ if (callback.token() != null) {
+ throw new ServiceException("Callback had a token already");
+ }
+
+ String accessToken = SecurityContext.getInstance().getBearerAuthToken();
+ OAuthBearerTokenJwt token = OAuthBearerTokenJwt.create(accessToken);
+
+ callback.token(token);
+ } catch (Exception e) {
+ logger.error("Could not handle login callback: {}", e.getMessage());
+ }
+ }
+
+}
* ========================LICENSE_END===================================
*/
-package org.oran.pmproducer.clients;
+package org.oran.pmlog.oauth2;
import java.lang.invoke.MethodHandles;
import java.nio.file.Files;
import java.nio.file.Path;
+import lombok.Getter;
import lombok.Setter;
import org.slf4j.Logger;
private String authToken = "";
+ @Getter
+ private static SecurityContext instance;
+
@Setter
private Path authTokenFilePath;
public SecurityContext(@Value("${app.auth-token-file:}") String authTokenFilename) {
+ instance = this;
if (!authTokenFilename.isEmpty()) {
this.authTokenFilePath = Path.of(authTokenFilename);
}
public synchronized String getBearerAuthToken() {
if (!isConfigured()) {
+ logger.warn("No configuration for auth token");
return "";
}
try {
long lastModified = authTokenFilePath.toFile().lastModified();
if (tokenTimestamp == 0 || lastModified != this.tokenTimestamp) {
this.authToken = Files.readString(authTokenFilePath);
+ this.authToken = this.authToken.trim();
this.tokenTimestamp = lastModified;
}
} catch (Exception e) {
import org.mockito.Mockito;
import org.oran.pmlog.clients.AsyncRestClient;
import org.oran.pmlog.clients.AsyncRestClientFactory;
-import org.oran.pmlog.clients.SecurityContext;
import org.oran.pmlog.configuration.ApplicationConfig;
import org.oran.pmlog.configuration.ConsumerJobInfo;
import org.oran.pmlog.configuration.WebClientConfig;
import org.oran.pmlog.configuration.WebClientConfig.HttpProxyConfig;
+import org.oran.pmlog.oauth2.SecurityContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
"server.ssl.key-store=./config/keystore.jks", //
"app.webclient.trust-store=./config/truststore.jks", //
"app.configuration-filepath=./src/test/resources/test_application_configuration.json", //
- "app.pm-files-path=./src/test/resources/" //
+ "app.pm-files-path=./src/test/resources/", //
+ "app.auth-token-file=src/test/resources/jwtToken.b64", //
+ "app.kafka.use-oath-token=false" //
}) //
class Integration {
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+ this.applicationConfig.addKafkaSecurityProps(props);
return SenderOptions.create(props);
}
--- /dev/null
+eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
# The url used to adress this component. This is used as a callback url sent to other components.
pm-producer-base-url: https://localhost:8435
- # KAFKA boostrap servers. This is only needed if there are Information Types that uses a kafkaInputTopic
- # several redundant boostrap servers can be specified, separated by a comma ','.
+
kafka:
+ # KAFKA boostrap servers.
+ # Several redundant boostrap servers can be specified, separated by a comma ','.
bootstrap-servers: localhost:9092
# The maximum number of records returned in a single call to poll() (default 100)
max-poll-records: 500
+ # Configues if oath2 tokens shall be used. If set to true, auth-token-file must also be configured
+ use-oath-token: false
+ ssl:
+ key-store-type: PEM
+ key-store-location:
+ # key password is needed if the private key is encrypted
+ key-store-password:
+ trust-store-type: PEM
+ trust-store-location:
# If the file name is empty, no authorization token is used
auth-token-file:
pm-files-path: /tmp
private static final Logger logger = LoggerFactory.getLogger(Application.class);
- private long configFileLastModification = 0;
private static ConfigurableApplicationContext applicationContext;
public static void main(String[] args) {
thread.setDaemon(false);
thread.start();
}
+
}
import java.util.concurrent.atomic.AtomicInteger;
import org.oran.pmproducer.configuration.WebClientConfig.HttpProxyConfig;
+import org.oran.pmproducer.oauth2.SecurityContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.MediaType;
import org.oran.pmproducer.configuration.WebClientConfig;
import org.oran.pmproducer.configuration.WebClientConfig.HttpProxyConfig;
+import org.oran.pmproducer.oauth2.SecurityContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.ResourceUtils;
import java.nio.file.Path;
import java.util.Collection;
import java.util.Collections;
+import java.util.Map;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.oran.pmproducer.configuration.WebClientConfig.HttpProxyConfig;
+import org.oran.pmproducer.oauth2.OAuthKafkaAuthenticateLoginCallbackHandler;
import org.oran.pmproducer.repository.InfoType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Value("${app.webclient.trust-store}")
private String sslTrustStore = "";
- @Value("${app.webclient.http.proxy-host:}")
+ @Value("${app.webclient.http.proxy-host}")
private String httpProxyHost = "";
- @Value("${app.webclient.http.proxy-port:0}")
+ @Value("${app.webclient.http.proxy-port}")
private int httpProxyPort = 0;
@Getter
private int kafkaMaxPollRecords;
@Getter
- @Value("${app.pm-files-path:}")
+ @Value("${app.pm-files-path}")
private String pmFilesPath;
@Getter
@Getter
@Setter
- @Value("${app.zip-output:}")
+ @Value("${app.zip-output}")
private boolean zipOutput;
+ @Value("${app.kafka.ssl.key-store-type}")
+ private String kafkaKeyStoreType;
+
+ @Value("${app.kafka.ssl.key-store-location}")
+ private String kafkaKeyStoreLocation;
+
+ @Value("${app.kafka.ssl.key-store-password}")
+ private String kafkaKeyStorePassword;
+
+ @Value("${app.kafka.ssl.trust-store-type}")
+ private String kafkaTrustStoreType;
+
+ @Value("${app.kafka.ssl.trust-store-location}")
+ private String kafkTrustStoreLocation;
+
+ @Value("${app.kafka.use-oath-token}")
+ private boolean useOathToken;
+
private WebClientConfig webClientConfig = null;
public WebClientConfig getWebClientConfig() {
logger.error("Could not load configuration file {}", getLocalConfigurationFilePath());
return Collections.emptyList();
}
+ }
+
+ public void addKafkaSecurityProps(Map<String, Object> props) {
+ if (useOathToken) {
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT.name);
+ props.put(SaslConfigs.SASL_MECHANISM, "OAUTHBEARER");
+ props.put(SaslConfigs.SASL_LOGIN_CALLBACK_HANDLER_CLASS,
+ OAuthKafkaAuthenticateLoginCallbackHandler.class.getName());
+ props.put(SaslConfigs.SASL_JAAS_CONFIG,
+ "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required unsecuredLoginStringClaim_sub=\"alice\"; ");
+ }
+ if (!kafkaKeyStoreLocation.isEmpty()) {
+ props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_SSL.name);
+ // SSL
+ props.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, kafkaKeyStoreType);
+ props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, kafkaKeyStoreLocation);
+ if (!kafkaKeyStorePassword.isEmpty()) {
+ props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, kafkaKeyStorePassword);
+ }
+ if (!kafkTrustStoreLocation.isEmpty()) {
+ props.put(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG, kafkaTrustStoreType);
+ props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, kafkTrustStoreLocation);
+ }
+ }
}
}
this.httpStatus = httpStatus;
}
+ public ServiceException(String message) {
+ super(message);
+ this.httpStatus = HttpStatus.BAD_REQUEST;
+ }
+
}
--- /dev/null
+// ============LICENSE_START===============================================
+// Copyright (C) 2023 Nordix Foundation. All rights reserved.
+// ========================================================================
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ============LICENSE_END=================================================
+//
+
+package org.oran.pmproducer.oauth2;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+
+import java.util.Base64;
+import java.util.HashSet;
+import java.util.Set;
+
+import lombok.ToString;
+
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerToken;
+import org.oran.pmproducer.exceptions.ServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OAuthBearerTokenJwt implements OAuthBearerToken {
+ private static final Logger logger = LoggerFactory.getLogger(OAuthBearerTokenJwt.class);
+ private static final com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
+
+ private final String jwtTokenRaw;
+ private final JwtTokenBody tokenBody;
+
+ @ToString
+ private static class JwtTokenBody {
+ String sub = ""; // principalName
+ long exp = 0; // expirationTime
+ long iat = 0; // startTime
+ String scope = "";
+ }
+
+ public static OAuthBearerTokenJwt create(String tokenRaw)
+ throws ServiceException, JsonMappingException, JsonProcessingException {
+ String[] chunks = tokenRaw.split("\\.");
+ Base64.Decoder decoder = Base64.getUrlDecoder();
+ if (chunks.length < 2) {
+ throw new ServiceException("Could not parse JWT token: " + tokenRaw);
+
+ }
+ String payloadStr = new String(decoder.decode(chunks[1]));
+ JwtTokenBody token = gson.fromJson(payloadStr, JwtTokenBody.class);
+ logger.error("Token: {}", token);
+ return new OAuthBearerTokenJwt(token, tokenRaw);
+ }
+
+ private OAuthBearerTokenJwt(JwtTokenBody jwtTokenBody, String accessToken) {
+ super();
+ this.jwtTokenRaw = accessToken;
+ this.tokenBody = jwtTokenBody;
+ }
+
+ @Override
+ public String value() {
+ return jwtTokenRaw;
+ }
+
+ @Override
+ public Set<String> scope() {
+ Set<String> res = new HashSet<>();
+ if (!this.tokenBody.scope.isEmpty()) {
+ res.add(this.tokenBody.scope);
+ }
+ return res;
+ }
+
+ @Override
+ public long lifetimeMs() {
+ if (this.tokenBody.exp == 0) {
+ return Long.MAX_VALUE;
+ }
+ return this.tokenBody.exp * 1000;
+ }
+
+ @Override
+ public String principalName() {
+ return this.tokenBody.sub;
+ }
+
+ @Override
+ public Long startTimeMs() {
+ return this.tokenBody.iat;
+ }
+
+}
--- /dev/null
+// ============LICENSE_START===============================================
+// Copyright (C) 2023 Nordix Foundation. All rights reserved.
+// ========================================================================
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+// ============LICENSE_END=================================================
+//
+
+package org.oran.pmproducer.oauth2;
+
+import java.io.IOException;
+import java.util.*;
+
+import javax.security.auth.callback.Callback;
+import javax.security.auth.callback.UnsupportedCallbackException;
+import javax.security.auth.login.AppConfigurationEntry;
+
+import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
+import org.apache.kafka.common.security.auth.SaslExtensions;
+import org.apache.kafka.common.security.auth.SaslExtensionsCallback;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule;
+import org.apache.kafka.common.security.oauthbearer.OAuthBearerTokenCallback;
+import org.oran.pmproducer.exceptions.ServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class OAuthKafkaAuthenticateLoginCallbackHandler implements AuthenticateCallbackHandler {
+ private final Logger logger = LoggerFactory.getLogger(OAuthKafkaAuthenticateLoginCallbackHandler.class);
+
+ private boolean isConfigured = false;
+
+ @Override
+ public void configure(Map<String, ?> map, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
+
+ if (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism))
+ throw new IllegalArgumentException(String.format("Unexpected SASL mechanism: %s", saslMechanism));
+ if (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == null)
+ throw new IllegalArgumentException(
+ String.format("Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)",
+ jaasConfigEntries.size()));
+ isConfigured = true;
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public void handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
+
+ if (!this.isConfigured)
+ throw new IllegalStateException("Callback handler not configured");
+ for (Callback callback : callbacks) {
+ logger.debug("callback " + callback.toString());
+ if (callback instanceof OAuthBearerTokenCallback) {
+ handleCallback((OAuthBearerTokenCallback) callback);
+ } else if (callback instanceof SaslExtensionsCallback) {
+ handleCallback((SaslExtensionsCallback) callback);
+ } else {
+ logger.error("Unsupported callback: {}", callback);
+ throw new UnsupportedCallbackException(callback);
+ }
+ }
+ }
+
+ private void handleCallback(SaslExtensionsCallback callback) {
+ callback.extensions(SaslExtensions.empty());
+ }
+
+ private void handleCallback(OAuthBearerTokenCallback callback) {
+ try {
+ if (callback.token() != null) {
+ throw new ServiceException("Callback had a token already", null);
+ }
+
+ String accessToken = SecurityContext.getInstance().getBearerAuthToken();
+ OAuthBearerTokenJwt token = OAuthBearerTokenJwt.create(accessToken);
+
+ callback.token(token);
+ } catch (Exception e) {
+ logger.error("Could not handle login callback: {}", e.getMessage());
+ }
+ }
+
+}
* ========================LICENSE_START=================================
* O-RAN-SC
* %%
- * Copyright (C) 2022 Nordix Foundation
+ * Copyright (C) 2023 Nordix Foundation
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* ========================LICENSE_END===================================
*/
-package org.oran.pmlog.clients;
+package org.oran.pmproducer.oauth2;
import java.lang.invoke.MethodHandles;
import java.nio.file.Files;
import java.nio.file.Path;
+import lombok.Getter;
import lombok.Setter;
import org.slf4j.Logger;
private String authToken = "";
+ @Getter
+ private static SecurityContext instance;
+
@Setter
private Path authTokenFilePath;
- public SecurityContext(@Value("${app.auth-token-file:}") String authTokenFilename) {
+ public SecurityContext(@Value("${app.auth-token-file}") String authTokenFilename) {
+ instance = this;
if (!authTokenFilename.isEmpty()) {
this.authTokenFilePath = Path.of(authTokenFilename);
}
public synchronized String getBearerAuthToken() {
if (!isConfigured()) {
+ logger.warn("No configuration for auth token");
return "";
}
try {
long lastModified = authTokenFilePath.toFile().lastModified();
if (tokenTimestamp == 0 || lastModified != this.tokenTimestamp) {
this.authToken = Files.readString(authTokenFilePath);
+ this.authToken = this.authToken.trim();
this.tokenTimestamp = lastModified;
}
} catch (Exception e) {
import lombok.Getter;
-import org.oran.pmproducer.clients.AsyncRestClientFactory;
-import org.oran.pmproducer.clients.SecurityContext;
import org.oran.pmproducer.configuration.ApplicationConfig;
import org.oran.pmproducer.exceptions.ServiceException;
import org.oran.pmproducer.filter.FilterFactory;
import org.oran.pmproducer.filter.FilteredData;
import org.oran.pmproducer.filter.PmReportFilter;
+import org.oran.pmproducer.oauth2.SecurityContext;
import org.oran.pmproducer.repository.Job.Parameters;
import org.oran.pmproducer.repository.Job.Parameters.KafkaDeliveryInfo;
import org.oran.pmproducer.tasks.TopicListener.DataFromTopic;
private Map<String, Job> allJobs = new HashMap<>();
private MultiMap<Job> jobsByType = new MultiMap<>();
private Map<String, JobGroup> jobGroups = new HashMap<>(); // Key is Topic
- private final AsyncRestClientFactory restclientFactory;
private final List<Observer> observers = new ArrayList<>();
private final ApplicationConfig appConfig;
public Jobs(@Autowired ApplicationConfig applicationConfig, @Autowired SecurityContext securityContext,
@Autowired ApplicationConfig appConfig) {
- restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig(), securityContext);
this.appConfig = appConfig;
}
SenderOptions<byte[], byte[]> senderOptions = senderOptions(applConfig, jobGroup.getDeliveryInfo());
this.sender = KafkaSender.create(senderOptions);
+
}
public void start(Flux<TopicListener.DataFromTopic> input) {
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+
+ config.addKafkaSecurityProps(props);
+
return SenderOptions.create(props);
}
import org.oran.pmproducer.clients.AsyncRestClient;
import org.oran.pmproducer.clients.AsyncRestClientFactory;
-import org.oran.pmproducer.clients.SecurityContext;
import org.oran.pmproducer.configuration.ApplicationConfig;
import org.oran.pmproducer.controllers.ProducerCallbacksController;
import org.oran.pmproducer.exceptions.ServiceException;
+import org.oran.pmproducer.oauth2.SecurityContext;
import org.oran.pmproducer.r1.ConsumerJobInfo;
import org.oran.pmproducer.r1.ProducerInfoTypeInfo;
import org.oran.pmproducer.r1.ProducerRegistrationInfo;
}
private ReceiverOptions<byte[], byte[]> kafkaInputProperties(String clientId) {
- Map<String, Object> consumerProps = new HashMap<>();
+ Map<String, Object> props = new HashMap<>();
if (this.applicationConfig.getKafkaBootStrapServers().isEmpty()) {
logger.error("No kafka boostrap server is setup");
}
- consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
- consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers());
- consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
- consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
- consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
- consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.applicationConfig.getKafkaBootStrapServers());
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaGroupId);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
+ props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+ props.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId + "_" + kafkaGroupId);
- consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, clientId + "_" + kafkaGroupId);
+ this.applicationConfig.addKafkaSecurityProps(props);
- return ReceiverOptions.<byte[], byte[]>create(consumerProps)
+ return ReceiverOptions.<byte[], byte[]>create(props)
.subscription(Collections.singleton(this.type.getKafkaInputTopic()));
}
import org.mockito.ArgumentCaptor;
import org.oran.pmproducer.clients.AsyncRestClient;
import org.oran.pmproducer.clients.AsyncRestClientFactory;
-import org.oran.pmproducer.clients.SecurityContext;
import org.oran.pmproducer.configuration.ApplicationConfig;
import org.oran.pmproducer.configuration.WebClientConfig;
import org.oran.pmproducer.configuration.WebClientConfig.HttpProxyConfig;
import org.oran.pmproducer.filter.PmReport;
import org.oran.pmproducer.filter.PmReportFilter;
import org.oran.pmproducer.filter.PmReportFilter.FilterData;
+import org.oran.pmproducer.oauth2.SecurityContext;
import org.oran.pmproducer.r1.ConsumerJobInfo;
import org.oran.pmproducer.r1.ProducerJobInfo;
import org.oran.pmproducer.repository.InfoType;
return "https://localhost:" + this.applicationConfig.getLocalServerHttpPort();
}
- private String quote(String str) {
- final String q = "\"";
- return q + str.replace(q, "\\\"") + q;
- }
-
private Object toJson(String json) {
try {
return JsonParser.parseString(json).getAsJsonObject();
import org.junit.jupiter.api.Test;
import org.oran.pmproducer.clients.AsyncRestClient;
import org.oran.pmproducer.clients.AsyncRestClientFactory;
-import org.oran.pmproducer.clients.SecurityContext;
import org.oran.pmproducer.configuration.ApplicationConfig;
import org.oran.pmproducer.configuration.WebClientConfig;
import org.oran.pmproducer.configuration.WebClientConfig.HttpProxyConfig;
import org.oran.pmproducer.filter.PmReportFilter;
+import org.oran.pmproducer.oauth2.SecurityContext;
import org.oran.pmproducer.r1.ConsumerJobInfo;
import org.oran.pmproducer.repository.Jobs;
import org.oran.pmproducer.tasks.ProducerRegstrationTask;
import org.junit.jupiter.api.Test;
import org.oran.pmproducer.clients.AsyncRestClient;
import org.oran.pmproducer.clients.AsyncRestClientFactory;
-import org.oran.pmproducer.clients.SecurityContext;
import org.oran.pmproducer.configuration.ApplicationConfig;
import org.oran.pmproducer.configuration.WebClientConfig;
import org.oran.pmproducer.configuration.WebClientConfig.HttpProxyConfig;
import org.oran.pmproducer.controllers.ProducerCallbacksController.StatisticsCollection;
import org.oran.pmproducer.datastore.DataStore;
import org.oran.pmproducer.filter.PmReportFilter;
+import org.oran.pmproducer.oauth2.SecurityContext;
import org.oran.pmproducer.r1.ConsumerJobInfo;
import org.oran.pmproducer.repository.InfoType;
import org.oran.pmproducer.repository.InfoTypes;
"app.pm-files-path=./src/test/resources/", //
"app.s3.locksBucket=ropfilelocks", //
"app.pm-files-path=/tmp/dmaapadaptor", //
- "app.s3.bucket=dmaaptest" //
-}) //
+ "app.s3.bucket=dmaaptest", //
+ "app.auth-token-file=src/test/resources/jwtToken.b64", //
+ "app.kafka.use-oath-token=false"}) //
class IntegrationWithKafka {
final static String PM_TYPE_ID = "PmDataOverKafka";
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class);
+
+ // Security
+ this.applicationConfig.addKafkaSecurityProps(props);
+
return SenderOptions.create(props);
}
--- /dev/null
+eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c