From 398870ca617e35c80b8c2e7af76f68313c365ce7 Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Sat, 7 May 2022 14:35:20 +0200 Subject: [PATCH] Added use of Auth token An authorization token is added to each REST call (if configured). The token is read from a file. Change-Id: Ie74c01e0186b382b8e093f999f851eada1ad388f Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-743 --- config/application.yaml | 3 +- .../oran/dmaapadapter/clients/AsyncRestClient.java | 96 ++++++++-------------- .../clients/AsyncRestClientFactory.java | 14 ++-- .../oran/dmaapadapter/clients/SecurityContext.java | 76 +++++++++++++++++ .../org/oran/dmaapadapter/repository/Jobs.java | 5 +- .../dmaapadapter/tasks/DmaapTopicListener.java | 16 ++-- .../oran/dmaapadapter/tasks/JobDataConsumer.java | 4 +- .../tasks/ProducerRegstrationTask.java | 7 +- .../oran/dmaapadapter/tasks/TopicListeners.java | 6 +- .../org/oran/dmaapadapter/ApplicationTest.java | 43 +++++++++- .../org/oran/dmaapadapter/ConsumerController.java | 9 +- .../org/oran/dmaapadapter/IntegrationWithIcs.java | 6 +- .../oran/dmaapadapter/IntegrationWithKafka.java | 7 +- 13 files changed, 203 insertions(+), 89 deletions(-) create mode 100644 src/main/java/org/oran/dmaapadapter/clients/SecurityContext.java diff --git a/config/application.yaml b/config/application.yaml index 9b4b682..f6cd665 100644 --- a/config/application.yaml +++ b/config/application.yaml @@ -57,4 +57,5 @@ app: # several redundant boostrap servers can be specified, separated by a comma ','. kafka: bootstrap-servers: localhost:9092 - + # If the file name is empty, no authorization token is used + auth-token-file: \ No newline at end of file diff --git a/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java b/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java index 746fdd7..4c7c1fa 100644 --- a/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java +++ b/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java @@ -2,7 +2,7 @@ * ========================LICENSE_START================================= * O-RAN-SC * %% - * Copyright (C) 2021 Nordix Foundation + * Copyright (C) 2022 Nordix Foundation * %% * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -35,10 +35,10 @@ import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.http.client.reactive.ReactorClientHttpConnector; import org.springframework.lang.Nullable; +import org.springframework.web.reactive.function.client.ExchangeFilterFunction; import org.springframework.web.reactive.function.client.ExchangeStrategies; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClient.RequestHeadersSpec; -import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClient; @@ -47,7 +47,6 @@ import reactor.netty.transport.ProxyProvider; /** * Generic reactive REST client. */ -@SuppressWarnings("java:S4449") // @Add Nullable to third party api public class AsyncRestClient { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); @@ -56,60 +55,41 @@ public class AsyncRestClient { private static final AtomicInteger sequenceNumber = new AtomicInteger(); private final SslContext sslContext; private final HttpProxyConfig httpProxyConfig; + private final SecurityContext securityContext; - public AsyncRestClient(String baseUrl, @Nullable SslContext sslContext, @Nullable HttpProxyConfig httpProxyConfig) { + public AsyncRestClient(String baseUrl, @Nullable SslContext sslContext, @Nullable HttpProxyConfig httpProxyConfig, + SecurityContext securityContext) { this.baseUrl = baseUrl; this.sslContext = sslContext; this.httpProxyConfig = httpProxyConfig; + this.securityContext = securityContext; } + @SuppressWarnings("java:S4449") // contentType, is not @Nullable public Mono> postForEntity(String uri, @Nullable String body, - @Nullable MediaType contentType) { - Object traceTag = createTraceTag(); - logger.debug("{} POST uri = '{}{}''", traceTag, baseUrl, uri); - logger.trace("{} POST body: {}", traceTag, body); + @Nullable MediaType mediaType) { Mono bodyProducer = body != null ? Mono.just(body) : Mono.empty(); RequestHeadersSpec request = getWebClient() // .post() // .uri(uri) // - .contentType(contentType) // + .contentType(mediaType) // .body(bodyProducer, String.class); - return retrieve(traceTag, request); - } - - public Mono post(String uri, @Nullable String body, @Nullable MediaType contentType) { - return postForEntity(uri, body, contentType) // - .map(this::toBody); + return retrieve(request); } - public Mono postWithAuthHeader(String uri, String body, String username, String password, - @Nullable MediaType mediaType) { - Object traceTag = createTraceTag(); - logger.debug("{} POST (auth) uri = '{}{}''", traceTag, baseUrl, uri); - logger.trace("{} POST body: {}", traceTag, body); - - RequestHeadersSpec request = getWebClient() // - .post() // - .uri(uri) // - .headers(headers -> headers.setBasicAuth(username, password)) // - .contentType(mediaType) // - .bodyValue(body); - return retrieve(traceTag, request) // + public Mono post(String uri, @Nullable String body, @Nullable MediaType mediaType) { + return postForEntity(uri, body, mediaType) // .map(this::toBody); } public Mono> putForEntity(String uri, String body) { - Object traceTag = createTraceTag(); - logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri); - logger.trace("{} PUT body: {}", traceTag, body); - RequestHeadersSpec request = getWebClient() // .put() // .uri(uri) // .contentType(MediaType.APPLICATION_JSON) // .bodyValue(body); - return retrieve(traceTag, request); + return retrieve(request); } public Mono put(String uri, String body) { @@ -118,10 +98,8 @@ public class AsyncRestClient { } public Mono> getForEntity(String uri) { - Object traceTag = createTraceTag(); - logger.debug("{} GET uri = '{}{}''", traceTag, baseUrl, uri); RequestHeadersSpec request = getWebClient().get().uri(uri); - return retrieve(traceTag, request); + return retrieve(request); } public Mono get(String uri) { @@ -130,10 +108,8 @@ public class AsyncRestClient { } public Mono> deleteForEntity(String uri) { - Object traceTag = createTraceTag(); - logger.debug("{} DELETE uri = '{}{}''", traceTag, baseUrl, uri); RequestHeadersSpec request = getWebClient().delete().uri(uri); - return retrieve(traceTag, request); + return retrieve(request); } public Mono delete(String uri) { @@ -141,32 +117,18 @@ public class AsyncRestClient { .map(this::toBody); } - private Mono> retrieve(Object traceTag, RequestHeadersSpec request) { - final Class clazz = String.class; + private Mono> retrieve(RequestHeadersSpec request) { + if (securityContext.isConfigured()) { + request.headers(h -> h.setBearerAuth(securityContext.getBearerAuthToken())); + } return request.retrieve() // - .toEntity(clazz) // - .doOnNext(entity -> logReceivedData(traceTag, entity)) // - .doOnError(throwable -> onHttpError(traceTag, throwable)); - } - - private void logReceivedData(Object traceTag, ResponseEntity entity) { - logger.trace("{} Received: {} {}", traceTag, entity.getBody(), entity.getHeaders().getContentType()); + .toEntity(String.class); } private static Object createTraceTag() { return sequenceNumber.incrementAndGet(); } - private void onHttpError(Object traceTag, Throwable t) { - if (t instanceof WebClientResponseException) { - WebClientResponseException exception = (WebClientResponseException) t; - logger.debug("{} HTTP error status = '{}', body '{}'", traceTag, exception.getStatusCode(), - exception.getResponseBodyAsString()); - } else { - logger.debug("{} HTTP error {}", traceTag, t.getMessage()); - } - } - private String toBody(ResponseEntity entity) { if (entity.getBody() == null) { return ""; @@ -199,15 +161,30 @@ public class AsyncRestClient { return httpClient; } - private WebClient buildWebClient(String baseUrl) { + public WebClient buildWebClient(String baseUrl) { + Object traceTag = createTraceTag(); + final HttpClient httpClient = buildHttpClient(); ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder() // .codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(-1)) // .build(); + + ExchangeFilterFunction reqLogger = ExchangeFilterFunction.ofRequestProcessor(req -> { + logger.debug("{} {} uri = '{}''", traceTag, req.method(), req.url()); + return Mono.just(req); + }); + + ExchangeFilterFunction respLogger = ExchangeFilterFunction.ofResponseProcessor(resp -> { + logger.debug("{} resp: {}", traceTag, resp.statusCode()); + return Mono.just(resp); + }); + return WebClient.builder() // .clientConnector(new ReactorClientHttpConnector(httpClient)) // .baseUrl(baseUrl) // .exchangeStrategies(exchangeStrategies) // + .filter(reqLogger) // + .filter(respLogger) // .build(); } @@ -217,5 +194,4 @@ public class AsyncRestClient { } return this.webClient; } - } diff --git a/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClientFactory.java b/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClientFactory.java index a1ef1ac..dc8e2db 100644 --- a/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClientFactory.java +++ b/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClientFactory.java @@ -2,7 +2,7 @@ * ========================LICENSE_START================================= * O-RAN-SC * %% - * Copyright (C) 2021 Nordix Foundation + * Copyright (C) 2022 Nordix Foundation * %% * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -50,14 +50,14 @@ import org.springframework.util.ResourceUtils; /** * Factory for a generic reactive REST client. */ -@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally public class AsyncRestClientFactory { private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final SslContextFactory sslContextFactory; private final HttpProxyConfig httpProxyConfig; + private final SecurityContext securityContext; - public AsyncRestClientFactory(WebClientConfig clientConfig) { + public AsyncRestClientFactory(WebClientConfig clientConfig, SecurityContext securityContext) { if (clientConfig != null) { this.sslContextFactory = new CachingSslContextFactory(clientConfig); this.httpProxyConfig = clientConfig.httpProxyConfig(); @@ -66,6 +66,7 @@ public class AsyncRestClientFactory { this.sslContextFactory = null; this.httpProxyConfig = null; } + this.securityContext = securityContext; } public AsyncRestClient createRestClientNoHttpProxy(String baseUrl) { @@ -80,12 +81,13 @@ public class AsyncRestClientFactory { if (this.sslContextFactory != null) { try { return new AsyncRestClient(baseUrl, this.sslContextFactory.createSslContext(), - useHttpProxy ? httpProxyConfig : null); + useHttpProxy ? httpProxyConfig : null, this.securityContext); } catch (Exception e) { - logger.error("Could not init SSL context, reason: {}", e.toString()); + String exceptionString = e.toString(); + logger.error("Could not init SSL context, reason: {}", exceptionString); } } - return new AsyncRestClient(baseUrl, null, httpProxyConfig); + return new AsyncRestClient(baseUrl, null, httpProxyConfig, this.securityContext); } private class SslContextFactory { diff --git a/src/main/java/org/oran/dmaapadapter/clients/SecurityContext.java b/src/main/java/org/oran/dmaapadapter/clients/SecurityContext.java new file mode 100644 index 0000000..83c0c42 --- /dev/null +++ b/src/main/java/org/oran/dmaapadapter/clients/SecurityContext.java @@ -0,0 +1,76 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2022 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.clients; + +import java.lang.invoke.MethodHandles; +import java.nio.file.Files; +import java.nio.file.Path; + +import lombok.Setter; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.stereotype.Component; + +@EnableConfigurationProperties +@ConfigurationProperties() +@Component +public class SecurityContext { + + private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + + private long tokenTimestamp = 0; + + private String authToken = ""; + + @Setter + private Path authTokenFilePath; + + public SecurityContext(@Value("${app.auth-token-file:\"\"}") String authTokenFilename) { + if (!authTokenFilename.isEmpty()) { + this.authTokenFilePath = Path.of(authTokenFilename); + } + } + + public boolean isConfigured() { + return authTokenFilePath != null; + } + + public synchronized String getBearerAuthToken() { + if (!isConfigured()) { + return ""; + } + try { + long lastModified = authTokenFilePath.toFile().lastModified(); + if (lastModified != this.tokenTimestamp) { + this.authToken = Files.readString(authTokenFilePath); + this.tokenTimestamp = lastModified; + } + } catch (Exception e) { + logger.warn("Could not read auth token file: {}, reason: {}", authTokenFilePath, e.getMessage()); + } + return this.authToken; + } + +} diff --git a/src/main/java/org/oran/dmaapadapter/repository/Jobs.java b/src/main/java/org/oran/dmaapadapter/repository/Jobs.java index be83296..3479720 100644 --- a/src/main/java/org/oran/dmaapadapter/repository/Jobs.java +++ b/src/main/java/org/oran/dmaapadapter/repository/Jobs.java @@ -29,6 +29,7 @@ import java.util.Vector; import org.oran.dmaapadapter.clients.AsyncRestClient; import org.oran.dmaapadapter.clients.AsyncRestClientFactory; +import org.oran.dmaapadapter.clients.SecurityContext; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.exceptions.ServiceException; import org.oran.dmaapadapter.repository.Job.Parameters; @@ -53,8 +54,8 @@ public class Jobs { private final AsyncRestClientFactory restclientFactory; private final List observers = new ArrayList<>(); - public Jobs(@Autowired ApplicationConfig applicationConfig) { - restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig()); + public Jobs(@Autowired ApplicationConfig applicationConfig, @Autowired SecurityContext securityContext) { + restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig(), securityContext); } public synchronized Job getJob(String id) throws ServiceException { diff --git a/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java index 6b4f253..c34b470 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java @@ -21,11 +21,10 @@ package org.oran.dmaapadapter.tasks; import java.time.Duration; -import java.util.Collection; -import java.util.LinkedList; import org.oran.dmaapadapter.clients.AsyncRestClient; import org.oran.dmaapadapter.clients.AsyncRestClientFactory; +import org.oran.dmaapadapter.clients.SecurityContext; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.repository.InfoType; import org.slf4j.Logger; @@ -52,8 +51,9 @@ public class DmaapTopicListener implements TopicListener { private Many output; private Disposable topicReceiverTask; - public DmaapTopicListener(ApplicationConfig applicationConfig, InfoType type) { - AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig()); + public DmaapTopicListener(ApplicationConfig applicationConfig, InfoType type, SecurityContext securityContext) { + AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig(), + securityContext); this.dmaapRestClient = restclientFactory.createRestClientNoHttpProxy(""); this.applicationConfig = applicationConfig; this.type = type; @@ -112,14 +112,14 @@ public class DmaapTopicListener implements TopicListener { logger.trace("getFromMessageRouter {}", topicUrl); return dmaapRestClient.get(topicUrl) // .filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away. - .flatMapMany(this::splitArray) // + .flatMapMany(this::splitJsonArray) // .doOnNext(message -> logger.debug("Message from DMAAP topic: {} : {}", topicUrl, message)) // .onErrorResume(this::handleDmaapErrorResponse); // } - private Flux splitArray(String body) { - Collection messages = gson.fromJson(body, LinkedList.class); - return Flux.fromIterable(messages); + private Flux splitJsonArray(String body) { + String[] messages = gson.fromJson(body, String[].class); + return Flux.fromArray(messages); } } diff --git a/src/main/java/org/oran/dmaapadapter/tasks/JobDataConsumer.java b/src/main/java/org/oran/dmaapadapter/tasks/JobDataConsumer.java index 0d941d9..99904df 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/JobDataConsumer.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/JobDataConsumer.java @@ -77,7 +77,7 @@ public class JobDataConsumer { public synchronized void start(Flux input) { stop(); this.errorStats.resetIrrecoverableErrors(); - this.subscription = handleReceivedMessages(input, job) // + this.subscription = handleReceivedMessage(input, job) // .flatMap(this::postToClient, job.getParameters().getMaxConcurrency()) // .onErrorResume(this::handleError) // .subscribe(this::handleConsumerSentOk, // @@ -107,7 +107,7 @@ public class JobDataConsumer { return this.subscription != null; } - private Flux handleReceivedMessages(Flux input, Job job) { + private Flux handleReceivedMessage(Flux input, Job job) { Flux result = input.map(job::filter) // .filter(t -> !t.isEmpty()); // diff --git a/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java b/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java index f3b663b..304eb18 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java @@ -32,6 +32,7 @@ import lombok.Getter; import org.oran.dmaapadapter.clients.AsyncRestClient; import org.oran.dmaapadapter.clients.AsyncRestClientFactory; +import org.oran.dmaapadapter.clients.SecurityContext; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.controllers.ProducerCallbacksController; import org.oran.dmaapadapter.exceptions.ServiceException; @@ -70,8 +71,10 @@ public class ProducerRegstrationTask { private boolean isRegisteredInIcs = false; private static final int REGISTRATION_SUPERVISION_INTERVAL_MS = 1000 * 10; - public ProducerRegstrationTask(@Autowired ApplicationConfig applicationConfig, @Autowired InfoTypes types) { - AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig()); + public ProducerRegstrationTask(@Autowired ApplicationConfig applicationConfig, @Autowired InfoTypes types, + @Autowired SecurityContext securityContext) { + AsyncRestClientFactory restClientFactory = + new AsyncRestClientFactory(applicationConfig.getWebClientConfig(), securityContext); this.restClient = restClientFactory.createRestClientNoHttpProxy(""); this.applicationConfig = applicationConfig; this.types = types; diff --git a/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java b/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java index 5176867..685379c 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java @@ -25,6 +25,7 @@ import java.util.Map; import lombok.Getter; +import org.oran.dmaapadapter.clients.SecurityContext; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.repository.InfoType; import org.oran.dmaapadapter.repository.InfoTypes; @@ -53,7 +54,8 @@ public class TopicListeners { private static final int CONSUMER_SUPERVISION_INTERVAL_MS = 1000 * 60 * 3; - public TopicListeners(@Autowired ApplicationConfig appConfig, @Autowired InfoTypes types, @Autowired Jobs jobs) { + public TopicListeners(@Autowired ApplicationConfig appConfig, @Autowired InfoTypes types, @Autowired Jobs jobs, + @Autowired SecurityContext securityContext) { for (InfoType type : types.getAll()) { if (type.isKafkaTopicDefined()) { @@ -61,7 +63,7 @@ public class TopicListeners { kafkaTopicListeners.put(type.getId(), topicConsumer); } if (type.isDmaapTopicDefined()) { - DmaapTopicListener topicListener = new DmaapTopicListener(appConfig, type); + DmaapTopicListener topicListener = new DmaapTopicListener(appConfig, type, securityContext); dmaapTopicListeners.put(type.getId(), topicListener); } } diff --git a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java index 8b91d0f..155e112 100644 --- a/src/test/java/org/oran/dmaapadapter/ApplicationTest.java +++ b/src/test/java/org/oran/dmaapadapter/ApplicationTest.java @@ -33,6 +33,7 @@ import java.nio.charset.Charset; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; +import java.util.Map; import org.json.JSONObject; import org.junit.jupiter.api.AfterEach; @@ -41,6 +42,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.oran.dmaapadapter.clients.AsyncRestClient; import org.oran.dmaapadapter.clients.AsyncRestClientFactory; +import org.oran.dmaapadapter.clients.SecurityContext; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.configuration.ImmutableHttpProxyConfig; import org.oran.dmaapadapter.configuration.ImmutableWebClientConfig; @@ -106,6 +108,9 @@ class ApplicationTest { @Autowired ProducerRegstrationTask producerRegistrationTask; + @Autowired + private SecurityContext securityContext; + private com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create(); @LocalServerPort @@ -181,7 +186,7 @@ class ApplicationTest { .trustStorePassword(config.trustStorePassword()) // .httpProxyConfig(httpProxyConfig).build(); - AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config); + AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config, securityContext); return restClientFactory.createRestClientNoHttpProxy(baseUrl()); } @@ -371,7 +376,6 @@ class ApplicationTest { @Test void testJsltFiltering() throws Exception { - final String JOB_ID = "ID"; // Register producer, Register types @@ -399,6 +403,41 @@ class ApplicationTest { assertThat(receivedFiltered).contains("event"); } + @Test + void testAuthToken() throws Exception { + + // Create an auth token + final String AUTH_TOKEN = "testToken"; + Path authFile = Files.createTempFile("icsTestAuthToken", ".txt"); + Files.write(authFile, AUTH_TOKEN.getBytes()); + this.securityContext.setAuthTokenFilePath(authFile); + + final String JOB_ID = "ID"; + + // Register producer, Register types + waitForRegistration(); + + // Create a job with a PM filter + + ConsumerJobInfo jobInfo = consumerJobInfo("DmaapInformationType", JOB_ID, jsonObjectRegexp()); + + this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient()); + await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1)); + + DmaapSimulatorController.addResponse("Hello"); + + ConsumerController.TestResults consumer = this.consumerController.testResults; + await().untilAsserted(() -> assertThat(consumer.receivedHeaders).hasSize(1)); + String received = consumer.receivedBodies.get(0); + assertThat(received).isEqualTo("Hello"); + + // Check that the auth token was received by the consumer + assertThat(consumer.receivedHeaders).hasSize(1); + Map headers = consumer.receivedHeaders.get(0); + assertThat(headers).containsEntry("authorization", "Bearer " + AUTH_TOKEN); + Files.delete(authFile); + } + @Test void testJsonPathFiltering() throws Exception { final String JOB_ID = "ID"; diff --git a/src/test/java/org/oran/dmaapadapter/ConsumerController.java b/src/test/java/org/oran/dmaapadapter/ConsumerController.java index 70e89d6..d02e48b 100644 --- a/src/test/java/org/oran/dmaapadapter/ConsumerController.java +++ b/src/test/java/org/oran/dmaapadapter/ConsumerController.java @@ -31,6 +31,7 @@ import java.lang.invoke.MethodHandles; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; import org.oran.dmaapadapter.controllers.VoidResponse; import org.slf4j.Logger; @@ -40,6 +41,7 @@ import org.springframework.http.MediaType; import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestHeader; import org.springframework.web.bind.annotation.RestController; @RestController("ConsumerSimulatorController") @@ -54,6 +56,9 @@ public class ConsumerController { public List receivedBodies = Collections.synchronizedList(new ArrayList()); + public List> receivedHeaders = + Collections.synchronizedList(new ArrayList>()); + public TestResults() {} public boolean hasReceived(String str) { @@ -67,6 +72,7 @@ public class ConsumerController { public void reset() { receivedBodies.clear(); + receivedHeaders.clear(); } } @@ -78,9 +84,10 @@ public class ConsumerController { @ApiResponse(responseCode = "200", description = "OK", // content = @Content(schema = @Schema(implementation = VoidResponse.class))) // }) - public ResponseEntity postData(@RequestBody String body) { + public ResponseEntity postData(@RequestBody String body, @RequestHeader Map headers) { logger.info("Received by consumer: {}", body); testResults.receivedBodies.add(body); + testResults.receivedHeaders.add(headers); return new ResponseEntity<>(HttpStatus.OK); } diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java index 5153443..2c0dedc 100644 --- a/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java +++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java @@ -36,6 +36,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.oran.dmaapadapter.clients.AsyncRestClient; import org.oran.dmaapadapter.clients.AsyncRestClientFactory; +import org.oran.dmaapadapter.clients.SecurityContext; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.configuration.ImmutableHttpProxyConfig; import org.oran.dmaapadapter.configuration.ImmutableWebClientConfig; @@ -85,6 +86,9 @@ class IntegrationWithIcs { @Autowired private ConsumerController consumerController; + @Autowired + private SecurityContext securityContext; + private static Gson gson = new GsonBuilder().create(); static class TestApplicationConfig extends ApplicationConfig { @@ -152,7 +156,7 @@ class IntegrationWithIcs { .trustStorePassword(config.trustStorePassword()) // .httpProxyConfig(httpProxyConfig).build(); - AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config); + AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config, securityContext); return restClientFactory.createRestClientNoHttpProxy(selfBaseUrl()); } diff --git a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java index 1e288e5..246bb0b 100644 --- a/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java +++ b/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java @@ -39,6 +39,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.oran.dmaapadapter.clients.AsyncRestClient; import org.oran.dmaapadapter.clients.AsyncRestClientFactory; +import org.oran.dmaapadapter.clients.SecurityContext; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.configuration.ImmutableHttpProxyConfig; import org.oran.dmaapadapter.configuration.ImmutableWebClientConfig; @@ -99,6 +100,9 @@ class IntegrationWithKafka { @Autowired private TopicListeners topicListeners; + @Autowired + private SecurityContext securityContext; + private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create(); private static final Logger logger = LoggerFactory.getLogger(IntegrationWithKafka.class); @@ -171,7 +175,7 @@ class IntegrationWithKafka { .trustStorePassword(config.trustStorePassword()) // .httpProxyConfig(httpProxyConfig).build(); - AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config); + AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config, securityContext); return restClientFactory.createRestClientNoHttpProxy(baseUrl()); } @@ -237,7 +241,6 @@ class IntegrationWithKafka { .blockLast(); sender.close(); - } private void verifiedReceivedByConsumer(String... strings) { -- 2.16.6