An authorization token is added to each REST call (if configured).
The token is read from a file.
Change-Id: Ie74c01e0186b382b8e093f999f851eada1ad388f
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-743
# several redundant boostrap servers can be specified, separated by a comma ','.
kafka:
bootstrap-servers: localhost:9092
-
+ # If the file name is empty, no authorization token is used
+ auth-token-file:
\ No newline at end of file
* ========================LICENSE_START=================================
* O-RAN-SC
* %%
- * Copyright (C) 2021 Nordix Foundation
+ * Copyright (C) 2022 Nordix Foundation
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
import org.springframework.lang.Nullable;
+import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
import org.springframework.web.reactive.function.client.ExchangeStrategies;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.reactive.function.client.WebClient.RequestHeadersSpec;
-import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
/**
* Generic reactive REST client.
*/
-@SuppressWarnings("java:S4449") // @Add Nullable to third party api
public class AsyncRestClient {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private static final AtomicInteger sequenceNumber = new AtomicInteger();
private final SslContext sslContext;
private final HttpProxyConfig httpProxyConfig;
+ private final SecurityContext securityContext;
- public AsyncRestClient(String baseUrl, @Nullable SslContext sslContext, @Nullable HttpProxyConfig httpProxyConfig) {
+ public AsyncRestClient(String baseUrl, @Nullable SslContext sslContext, @Nullable HttpProxyConfig httpProxyConfig,
+ SecurityContext securityContext) {
this.baseUrl = baseUrl;
this.sslContext = sslContext;
this.httpProxyConfig = httpProxyConfig;
+ this.securityContext = securityContext;
}
+ @SuppressWarnings("java:S4449") // contentType, is not @Nullable
public Mono<ResponseEntity<String>> postForEntity(String uri, @Nullable String body,
- @Nullable MediaType contentType) {
- Object traceTag = createTraceTag();
- logger.debug("{} POST uri = '{}{}''", traceTag, baseUrl, uri);
- logger.trace("{} POST body: {}", traceTag, body);
+ @Nullable MediaType mediaType) {
Mono<String> bodyProducer = body != null ? Mono.just(body) : Mono.empty();
RequestHeadersSpec<?> request = getWebClient() //
.post() //
.uri(uri) //
- .contentType(contentType) //
+ .contentType(mediaType) //
.body(bodyProducer, String.class);
- return retrieve(traceTag, request);
- }
-
- public Mono<String> post(String uri, @Nullable String body, @Nullable MediaType contentType) {
- return postForEntity(uri, body, contentType) //
- .map(this::toBody);
+ return retrieve(request);
}
- public Mono<String> postWithAuthHeader(String uri, String body, String username, String password,
- @Nullable MediaType mediaType) {
- Object traceTag = createTraceTag();
- logger.debug("{} POST (auth) uri = '{}{}''", traceTag, baseUrl, uri);
- logger.trace("{} POST body: {}", traceTag, body);
-
- RequestHeadersSpec<?> request = getWebClient() //
- .post() //
- .uri(uri) //
- .headers(headers -> headers.setBasicAuth(username, password)) //
- .contentType(mediaType) //
- .bodyValue(body);
- return retrieve(traceTag, request) //
+ public Mono<String> post(String uri, @Nullable String body, @Nullable MediaType mediaType) {
+ return postForEntity(uri, body, mediaType) //
.map(this::toBody);
}
public Mono<ResponseEntity<String>> putForEntity(String uri, String body) {
- Object traceTag = createTraceTag();
- logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri);
- logger.trace("{} PUT body: {}", traceTag, body);
-
RequestHeadersSpec<?> request = getWebClient() //
.put() //
.uri(uri) //
.contentType(MediaType.APPLICATION_JSON) //
.bodyValue(body);
- return retrieve(traceTag, request);
+ return retrieve(request);
}
public Mono<String> put(String uri, String body) {
}
public Mono<ResponseEntity<String>> getForEntity(String uri) {
- Object traceTag = createTraceTag();
- logger.debug("{} GET uri = '{}{}''", traceTag, baseUrl, uri);
RequestHeadersSpec<?> request = getWebClient().get().uri(uri);
- return retrieve(traceTag, request);
+ return retrieve(request);
}
public Mono<String> get(String uri) {
}
public Mono<ResponseEntity<String>> deleteForEntity(String uri) {
- Object traceTag = createTraceTag();
- logger.debug("{} DELETE uri = '{}{}''", traceTag, baseUrl, uri);
RequestHeadersSpec<?> request = getWebClient().delete().uri(uri);
- return retrieve(traceTag, request);
+ return retrieve(request);
}
public Mono<String> delete(String uri) {
.map(this::toBody);
}
- private Mono<ResponseEntity<String>> retrieve(Object traceTag, RequestHeadersSpec<?> request) {
- final Class<String> clazz = String.class;
+ private Mono<ResponseEntity<String>> retrieve(RequestHeadersSpec<?> request) {
+ if (securityContext.isConfigured()) {
+ request.headers(h -> h.setBearerAuth(securityContext.getBearerAuthToken()));
+ }
return request.retrieve() //
- .toEntity(clazz) //
- .doOnNext(entity -> logReceivedData(traceTag, entity)) //
- .doOnError(throwable -> onHttpError(traceTag, throwable));
- }
-
- private void logReceivedData(Object traceTag, ResponseEntity<String> entity) {
- logger.trace("{} Received: {} {}", traceTag, entity.getBody(), entity.getHeaders().getContentType());
+ .toEntity(String.class);
}
private static Object createTraceTag() {
return sequenceNumber.incrementAndGet();
}
- private void onHttpError(Object traceTag, Throwable t) {
- if (t instanceof WebClientResponseException) {
- WebClientResponseException exception = (WebClientResponseException) t;
- logger.debug("{} HTTP error status = '{}', body '{}'", traceTag, exception.getStatusCode(),
- exception.getResponseBodyAsString());
- } else {
- logger.debug("{} HTTP error {}", traceTag, t.getMessage());
- }
- }
-
private String toBody(ResponseEntity<String> entity) {
if (entity.getBody() == null) {
return "";
return httpClient;
}
- private WebClient buildWebClient(String baseUrl) {
+ public WebClient buildWebClient(String baseUrl) {
+ Object traceTag = createTraceTag();
+
final HttpClient httpClient = buildHttpClient();
ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder() //
.codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(-1)) //
.build();
+
+ ExchangeFilterFunction reqLogger = ExchangeFilterFunction.ofRequestProcessor(req -> {
+ logger.debug("{} {} uri = '{}''", traceTag, req.method(), req.url());
+ return Mono.just(req);
+ });
+
+ ExchangeFilterFunction respLogger = ExchangeFilterFunction.ofResponseProcessor(resp -> {
+ logger.debug("{} resp: {}", traceTag, resp.statusCode());
+ return Mono.just(resp);
+ });
+
return WebClient.builder() //
.clientConnector(new ReactorClientHttpConnector(httpClient)) //
.baseUrl(baseUrl) //
.exchangeStrategies(exchangeStrategies) //
+ .filter(reqLogger) //
+ .filter(respLogger) //
.build();
}
}
return this.webClient;
}
-
}
* ========================LICENSE_START=================================
* O-RAN-SC
* %%
- * Copyright (C) 2021 Nordix Foundation
+ * Copyright (C) 2022 Nordix Foundation
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
/**
* Factory for a generic reactive REST client.
*/
-@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
public class AsyncRestClientFactory {
private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final SslContextFactory sslContextFactory;
private final HttpProxyConfig httpProxyConfig;
+ private final SecurityContext securityContext;
- public AsyncRestClientFactory(WebClientConfig clientConfig) {
+ public AsyncRestClientFactory(WebClientConfig clientConfig, SecurityContext securityContext) {
if (clientConfig != null) {
this.sslContextFactory = new CachingSslContextFactory(clientConfig);
this.httpProxyConfig = clientConfig.httpProxyConfig();
this.sslContextFactory = null;
this.httpProxyConfig = null;
}
+ this.securityContext = securityContext;
}
public AsyncRestClient createRestClientNoHttpProxy(String baseUrl) {
if (this.sslContextFactory != null) {
try {
return new AsyncRestClient(baseUrl, this.sslContextFactory.createSslContext(),
- useHttpProxy ? httpProxyConfig : null);
+ useHttpProxy ? httpProxyConfig : null, this.securityContext);
} catch (Exception e) {
- logger.error("Could not init SSL context, reason: {}", e.toString());
+ String exceptionString = e.toString();
+ logger.error("Could not init SSL context, reason: {}", exceptionString);
}
}
- return new AsyncRestClient(baseUrl, null, httpProxyConfig);
+ return new AsyncRestClient(baseUrl, null, httpProxyConfig, this.securityContext);
}
private class SslContextFactory {
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2022 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.clients;
+
+import java.lang.invoke.MethodHandles;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import lombok.Setter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+@EnableConfigurationProperties
+@ConfigurationProperties()
+@Component
+public class SecurityContext {
+
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ private long tokenTimestamp = 0;
+
+ private String authToken = "";
+
+ @Setter
+ private Path authTokenFilePath;
+
+ public SecurityContext(@Value("${app.auth-token-file:\"\"}") String authTokenFilename) {
+ if (!authTokenFilename.isEmpty()) {
+ this.authTokenFilePath = Path.of(authTokenFilename);
+ }
+ }
+
+ public boolean isConfigured() {
+ return authTokenFilePath != null;
+ }
+
+ public synchronized String getBearerAuthToken() {
+ if (!isConfigured()) {
+ return "";
+ }
+ try {
+ long lastModified = authTokenFilePath.toFile().lastModified();
+ if (lastModified != this.tokenTimestamp) {
+ this.authToken = Files.readString(authTokenFilePath);
+ this.tokenTimestamp = lastModified;
+ }
+ } catch (Exception e) {
+ logger.warn("Could not read auth token file: {}, reason: {}", authTokenFilePath, e.getMessage());
+ }
+ return this.authToken;
+ }
+
+}
import org.oran.dmaapadapter.clients.AsyncRestClient;
import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
+import org.oran.dmaapadapter.clients.SecurityContext;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.exceptions.ServiceException;
import org.oran.dmaapadapter.repository.Job.Parameters;
private final AsyncRestClientFactory restclientFactory;
private final List<Observer> observers = new ArrayList<>();
- public Jobs(@Autowired ApplicationConfig applicationConfig) {
- restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
+ public Jobs(@Autowired ApplicationConfig applicationConfig, @Autowired SecurityContext securityContext) {
+ restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig(), securityContext);
}
public synchronized Job getJob(String id) throws ServiceException {
package org.oran.dmaapadapter.tasks;
import java.time.Duration;
-import java.util.Collection;
-import java.util.LinkedList;
import org.oran.dmaapadapter.clients.AsyncRestClient;
import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
+import org.oran.dmaapadapter.clients.SecurityContext;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.repository.InfoType;
import org.slf4j.Logger;
private Many<String> output;
private Disposable topicReceiverTask;
- public DmaapTopicListener(ApplicationConfig applicationConfig, InfoType type) {
- AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
+ public DmaapTopicListener(ApplicationConfig applicationConfig, InfoType type, SecurityContext securityContext) {
+ AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig(),
+ securityContext);
this.dmaapRestClient = restclientFactory.createRestClientNoHttpProxy("");
this.applicationConfig = applicationConfig;
this.type = type;
logger.trace("getFromMessageRouter {}", topicUrl);
return dmaapRestClient.get(topicUrl) //
.filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away.
- .flatMapMany(this::splitArray) //
+ .flatMapMany(this::splitJsonArray) //
.doOnNext(message -> logger.debug("Message from DMAAP topic: {} : {}", topicUrl, message)) //
.onErrorResume(this::handleDmaapErrorResponse); //
}
- private Flux<String> splitArray(String body) {
- Collection<String> messages = gson.fromJson(body, LinkedList.class);
- return Flux.fromIterable(messages);
+ private Flux<String> splitJsonArray(String body) {
+ String[] messages = gson.fromJson(body, String[].class);
+ return Flux.fromArray(messages);
}
}
public synchronized void start(Flux<String> input) {
stop();
this.errorStats.resetIrrecoverableErrors();
- this.subscription = handleReceivedMessages(input, job) //
+ this.subscription = handleReceivedMessage(input, job) //
.flatMap(this::postToClient, job.getParameters().getMaxConcurrency()) //
.onErrorResume(this::handleError) //
.subscribe(this::handleConsumerSentOk, //
return this.subscription != null;
}
- private Flux<String> handleReceivedMessages(Flux<String> input, Job job) {
+ private Flux<String> handleReceivedMessage(Flux<String> input, Job job) {
Flux<String> result = input.map(job::filter) //
.filter(t -> !t.isEmpty()); //
import org.oran.dmaapadapter.clients.AsyncRestClient;
import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
+import org.oran.dmaapadapter.clients.SecurityContext;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
import org.oran.dmaapadapter.exceptions.ServiceException;
private boolean isRegisteredInIcs = false;
private static final int REGISTRATION_SUPERVISION_INTERVAL_MS = 1000 * 10;
- public ProducerRegstrationTask(@Autowired ApplicationConfig applicationConfig, @Autowired InfoTypes types) {
- AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
+ public ProducerRegstrationTask(@Autowired ApplicationConfig applicationConfig, @Autowired InfoTypes types,
+ @Autowired SecurityContext securityContext) {
+ AsyncRestClientFactory restClientFactory =
+ new AsyncRestClientFactory(applicationConfig.getWebClientConfig(), securityContext);
this.restClient = restClientFactory.createRestClientNoHttpProxy("");
this.applicationConfig = applicationConfig;
this.types = types;
import lombok.Getter;
+import org.oran.dmaapadapter.clients.SecurityContext;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.repository.InfoType;
import org.oran.dmaapadapter.repository.InfoTypes;
private static final int CONSUMER_SUPERVISION_INTERVAL_MS = 1000 * 60 * 3;
- public TopicListeners(@Autowired ApplicationConfig appConfig, @Autowired InfoTypes types, @Autowired Jobs jobs) {
+ public TopicListeners(@Autowired ApplicationConfig appConfig, @Autowired InfoTypes types, @Autowired Jobs jobs,
+ @Autowired SecurityContext securityContext) {
for (InfoType type : types.getAll()) {
if (type.isKafkaTopicDefined()) {
kafkaTopicListeners.put(type.getId(), topicConsumer);
}
if (type.isDmaapTopicDefined()) {
- DmaapTopicListener topicListener = new DmaapTopicListener(appConfig, type);
+ DmaapTopicListener topicListener = new DmaapTopicListener(appConfig, type, securityContext);
dmaapTopicListeners.put(type.getId(), topicListener);
}
}
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
+import java.util.Map;
import org.json.JSONObject;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.extension.ExtendWith;
import org.oran.dmaapadapter.clients.AsyncRestClient;
import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
+import org.oran.dmaapadapter.clients.SecurityContext;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.configuration.ImmutableHttpProxyConfig;
import org.oran.dmaapadapter.configuration.ImmutableWebClientConfig;
@Autowired
ProducerRegstrationTask producerRegistrationTask;
+ @Autowired
+ private SecurityContext securityContext;
+
private com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
@LocalServerPort
.trustStorePassword(config.trustStorePassword()) //
.httpProxyConfig(httpProxyConfig).build();
- AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config);
+ AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config, securityContext);
return restClientFactory.createRestClientNoHttpProxy(baseUrl());
}
@Test
void testJsltFiltering() throws Exception {
-
final String JOB_ID = "ID";
// Register producer, Register types
assertThat(receivedFiltered).contains("event");
}
+ @Test
+ void testAuthToken() throws Exception {
+
+ // Create an auth token
+ final String AUTH_TOKEN = "testToken";
+ Path authFile = Files.createTempFile("icsTestAuthToken", ".txt");
+ Files.write(authFile, AUTH_TOKEN.getBytes());
+ this.securityContext.setAuthTokenFilePath(authFile);
+
+ final String JOB_ID = "ID";
+
+ // Register producer, Register types
+ waitForRegistration();
+
+ // Create a job with a PM filter
+
+ ConsumerJobInfo jobInfo = consumerJobInfo("DmaapInformationType", JOB_ID, jsonObjectRegexp());
+
+ this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+ DmaapSimulatorController.addResponse("Hello");
+
+ ConsumerController.TestResults consumer = this.consumerController.testResults;
+ await().untilAsserted(() -> assertThat(consumer.receivedHeaders).hasSize(1));
+ String received = consumer.receivedBodies.get(0);
+ assertThat(received).isEqualTo("Hello");
+
+ // Check that the auth token was received by the consumer
+ assertThat(consumer.receivedHeaders).hasSize(1);
+ Map<String, String> headers = consumer.receivedHeaders.get(0);
+ assertThat(headers).containsEntry("authorization", "Bearer " + AUTH_TOKEN);
+ Files.delete(authFile);
+ }
+
@Test
void testJsonPathFiltering() throws Exception {
final String JOB_ID = "ID";
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import org.oran.dmaapadapter.controllers.VoidResponse;
import org.slf4j.Logger;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RestController;
@RestController("ConsumerSimulatorController")
public List<String> receivedBodies = Collections.synchronizedList(new ArrayList<String>());
+ public List<Map<String, String>> receivedHeaders =
+ Collections.synchronizedList(new ArrayList<Map<String, String>>());
+
public TestResults() {}
public boolean hasReceived(String str) {
public void reset() {
receivedBodies.clear();
+ receivedHeaders.clear();
}
}
@ApiResponse(responseCode = "200", description = "OK", //
content = @Content(schema = @Schema(implementation = VoidResponse.class))) //
})
- public ResponseEntity<Object> postData(@RequestBody String body) {
+ public ResponseEntity<Object> postData(@RequestBody String body, @RequestHeader Map<String, String> headers) {
logger.info("Received by consumer: {}", body);
testResults.receivedBodies.add(body);
+ testResults.receivedHeaders.add(headers);
return new ResponseEntity<>(HttpStatus.OK);
}
import org.junit.jupiter.api.extension.ExtendWith;
import org.oran.dmaapadapter.clients.AsyncRestClient;
import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
+import org.oran.dmaapadapter.clients.SecurityContext;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.configuration.ImmutableHttpProxyConfig;
import org.oran.dmaapadapter.configuration.ImmutableWebClientConfig;
@Autowired
private ConsumerController consumerController;
+ @Autowired
+ private SecurityContext securityContext;
+
private static Gson gson = new GsonBuilder().create();
static class TestApplicationConfig extends ApplicationConfig {
.trustStorePassword(config.trustStorePassword()) //
.httpProxyConfig(httpProxyConfig).build();
- AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config);
+ AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config, securityContext);
return restClientFactory.createRestClientNoHttpProxy(selfBaseUrl());
}
import org.junit.jupiter.api.extension.ExtendWith;
import org.oran.dmaapadapter.clients.AsyncRestClient;
import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
+import org.oran.dmaapadapter.clients.SecurityContext;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.configuration.ImmutableHttpProxyConfig;
import org.oran.dmaapadapter.configuration.ImmutableWebClientConfig;
@Autowired
private TopicListeners topicListeners;
+ @Autowired
+ private SecurityContext securityContext;
+
private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
private static final Logger logger = LoggerFactory.getLogger(IntegrationWithKafka.class);
.trustStorePassword(config.trustStorePassword()) //
.httpProxyConfig(httpProxyConfig).build();
- AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config);
+ AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config, securityContext);
return restClientFactory.createRestClientNoHttpProxy(baseUrl());
}
.blockLast();
sender.close();
-
}
private void verifiedReceivedByConsumer(String... strings) {