Added use of Auth token 12/8212/1
authorPatrikBuhr <patrik.buhr@est.tech>
Sat, 7 May 2022 12:35:20 +0000 (14:35 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Mon, 9 May 2022 09:24:41 +0000 (11:24 +0200)
An authorization token is added to each REST call (if configured).
The token is read from a file.

Change-Id: Ie74c01e0186b382b8e093f999f851eada1ad388f
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-743

13 files changed:
config/application.yaml
src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java
src/main/java/org/oran/dmaapadapter/clients/AsyncRestClientFactory.java
src/main/java/org/oran/dmaapadapter/clients/SecurityContext.java [new file with mode: 0644]
src/main/java/org/oran/dmaapadapter/repository/Jobs.java
src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java
src/main/java/org/oran/dmaapadapter/tasks/JobDataConsumer.java
src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java
src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java
src/test/java/org/oran/dmaapadapter/ApplicationTest.java
src/test/java/org/oran/dmaapadapter/ConsumerController.java
src/test/java/org/oran/dmaapadapter/IntegrationWithIcs.java
src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java

index 9b4b682..f6cd665 100644 (file)
@@ -57,4 +57,5 @@ app:
   # several redundant boostrap servers can be specified, separated by a comma ','.
   kafka:
     bootstrap-servers: localhost:9092
-
+  # If the file name is empty, no authorization token is used
+  auth-token-file:
\ No newline at end of file
index 746fdd7..4c7c1fa 100644 (file)
@@ -2,7 +2,7 @@
  * ========================LICENSE_START=================================
  * O-RAN-SC
  * %%
- * Copyright (C) 2021 Nordix Foundation
+ * Copyright (C) 2022 Nordix Foundation
  * %%
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -35,10 +35,10 @@ import org.springframework.http.MediaType;
 import org.springframework.http.ResponseEntity;
 import org.springframework.http.client.reactive.ReactorClientHttpConnector;
 import org.springframework.lang.Nullable;
+import org.springframework.web.reactive.function.client.ExchangeFilterFunction;
 import org.springframework.web.reactive.function.client.ExchangeStrategies;
 import org.springframework.web.reactive.function.client.WebClient;
 import org.springframework.web.reactive.function.client.WebClient.RequestHeadersSpec;
-import org.springframework.web.reactive.function.client.WebClientResponseException;
 
 import reactor.core.publisher.Mono;
 import reactor.netty.http.client.HttpClient;
@@ -47,7 +47,6 @@ import reactor.netty.transport.ProxyProvider;
 /**
  * Generic reactive REST client.
  */
-@SuppressWarnings("java:S4449") // @Add Nullable to third party api
 public class AsyncRestClient {
 
     private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@@ -56,60 +55,41 @@ public class AsyncRestClient {
     private static final AtomicInteger sequenceNumber = new AtomicInteger();
     private final SslContext sslContext;
     private final HttpProxyConfig httpProxyConfig;
+    private final SecurityContext securityContext;
 
-    public AsyncRestClient(String baseUrl, @Nullable SslContext sslContext, @Nullable HttpProxyConfig httpProxyConfig) {
+    public AsyncRestClient(String baseUrl, @Nullable SslContext sslContext, @Nullable HttpProxyConfig httpProxyConfig,
+            SecurityContext securityContext) {
         this.baseUrl = baseUrl;
         this.sslContext = sslContext;
         this.httpProxyConfig = httpProxyConfig;
+        this.securityContext = securityContext;
     }
 
+    @SuppressWarnings("java:S4449") // contentType, is not @Nullable
     public Mono<ResponseEntity<String>> postForEntity(String uri, @Nullable String body,
-            @Nullable MediaType contentType) {
-        Object traceTag = createTraceTag();
-        logger.debug("{} POST uri = '{}{}''", traceTag, baseUrl, uri);
-        logger.trace("{} POST body: {}", traceTag, body);
+            @Nullable MediaType mediaType) {
         Mono<String> bodyProducer = body != null ? Mono.just(body) : Mono.empty();
 
         RequestHeadersSpec<?> request = getWebClient() //
                 .post() //
                 .uri(uri) //
-                .contentType(contentType) //
+                .contentType(mediaType) //
                 .body(bodyProducer, String.class);
-        return retrieve(traceTag, request);
-    }
-
-    public Mono<String> post(String uri, @Nullable String body, @Nullable MediaType contentType) {
-        return postForEntity(uri, body, contentType) //
-                .map(this::toBody);
+        return retrieve(request);
     }
 
-    public Mono<String> postWithAuthHeader(String uri, String body, String username, String password,
-            @Nullable MediaType mediaType) {
-        Object traceTag = createTraceTag();
-        logger.debug("{} POST (auth) uri = '{}{}''", traceTag, baseUrl, uri);
-        logger.trace("{} POST body: {}", traceTag, body);
-
-        RequestHeadersSpec<?> request = getWebClient() //
-                .post() //
-                .uri(uri) //
-                .headers(headers -> headers.setBasicAuth(username, password)) //
-                .contentType(mediaType) //
-                .bodyValue(body);
-        return retrieve(traceTag, request) //
+    public Mono<String> post(String uri, @Nullable String body, @Nullable MediaType mediaType) {
+        return postForEntity(uri, body, mediaType) //
                 .map(this::toBody);
     }
 
     public Mono<ResponseEntity<String>> putForEntity(String uri, String body) {
-        Object traceTag = createTraceTag();
-        logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri);
-        logger.trace("{} PUT body: {}", traceTag, body);
-
         RequestHeadersSpec<?> request = getWebClient() //
                 .put() //
                 .uri(uri) //
                 .contentType(MediaType.APPLICATION_JSON) //
                 .bodyValue(body);
-        return retrieve(traceTag, request);
+        return retrieve(request);
     }
 
     public Mono<String> put(String uri, String body) {
@@ -118,10 +98,8 @@ public class AsyncRestClient {
     }
 
     public Mono<ResponseEntity<String>> getForEntity(String uri) {
-        Object traceTag = createTraceTag();
-        logger.debug("{} GET uri = '{}{}''", traceTag, baseUrl, uri);
         RequestHeadersSpec<?> request = getWebClient().get().uri(uri);
-        return retrieve(traceTag, request);
+        return retrieve(request);
     }
 
     public Mono<String> get(String uri) {
@@ -130,10 +108,8 @@ public class AsyncRestClient {
     }
 
     public Mono<ResponseEntity<String>> deleteForEntity(String uri) {
-        Object traceTag = createTraceTag();
-        logger.debug("{} DELETE uri = '{}{}''", traceTag, baseUrl, uri);
         RequestHeadersSpec<?> request = getWebClient().delete().uri(uri);
-        return retrieve(traceTag, request);
+        return retrieve(request);
     }
 
     public Mono<String> delete(String uri) {
@@ -141,32 +117,18 @@ public class AsyncRestClient {
                 .map(this::toBody);
     }
 
-    private Mono<ResponseEntity<String>> retrieve(Object traceTag, RequestHeadersSpec<?> request) {
-        final Class<String> clazz = String.class;
+    private Mono<ResponseEntity<String>> retrieve(RequestHeadersSpec<?> request) {
+        if (securityContext.isConfigured()) {
+            request.headers(h -> h.setBearerAuth(securityContext.getBearerAuthToken()));
+        }
         return request.retrieve() //
-                .toEntity(clazz) //
-                .doOnNext(entity -> logReceivedData(traceTag, entity)) //
-                .doOnError(throwable -> onHttpError(traceTag, throwable));
-    }
-
-    private void logReceivedData(Object traceTag, ResponseEntity<String> entity) {
-        logger.trace("{} Received: {} {}", traceTag, entity.getBody(), entity.getHeaders().getContentType());
+                .toEntity(String.class);
     }
 
     private static Object createTraceTag() {
         return sequenceNumber.incrementAndGet();
     }
 
-    private void onHttpError(Object traceTag, Throwable t) {
-        if (t instanceof WebClientResponseException) {
-            WebClientResponseException exception = (WebClientResponseException) t;
-            logger.debug("{} HTTP error status = '{}', body '{}'", traceTag, exception.getStatusCode(),
-                    exception.getResponseBodyAsString());
-        } else {
-            logger.debug("{} HTTP error {}", traceTag, t.getMessage());
-        }
-    }
-
     private String toBody(ResponseEntity<String> entity) {
         if (entity.getBody() == null) {
             return "";
@@ -199,15 +161,30 @@ public class AsyncRestClient {
         return httpClient;
     }
 
-    private WebClient buildWebClient(String baseUrl) {
+    public WebClient buildWebClient(String baseUrl) {
+        Object traceTag = createTraceTag();
+
         final HttpClient httpClient = buildHttpClient();
         ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder() //
                 .codecs(configurer -> configurer.defaultCodecs().maxInMemorySize(-1)) //
                 .build();
+
+        ExchangeFilterFunction reqLogger = ExchangeFilterFunction.ofRequestProcessor(req -> {
+            logger.debug("{} {} uri = '{}''", traceTag, req.method(), req.url());
+            return Mono.just(req);
+        });
+
+        ExchangeFilterFunction respLogger = ExchangeFilterFunction.ofResponseProcessor(resp -> {
+            logger.debug("{} resp: {}", traceTag, resp.statusCode());
+            return Mono.just(resp);
+        });
+
         return WebClient.builder() //
                 .clientConnector(new ReactorClientHttpConnector(httpClient)) //
                 .baseUrl(baseUrl) //
                 .exchangeStrategies(exchangeStrategies) //
+                .filter(reqLogger) //
+                .filter(respLogger) //
                 .build();
     }
 
@@ -217,5 +194,4 @@ public class AsyncRestClient {
         }
         return this.webClient;
     }
-
 }
index a1ef1ac..dc8e2db 100644 (file)
@@ -2,7 +2,7 @@
  * ========================LICENSE_START=================================
  * O-RAN-SC
  * %%
- * Copyright (C) 2021 Nordix Foundation
+ * Copyright (C) 2022 Nordix Foundation
  * %%
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -50,14 +50,14 @@ import org.springframework.util.ResourceUtils;
 /**
  * Factory for a generic reactive REST client.
  */
-@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
 public class AsyncRestClientFactory {
     private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
     private final SslContextFactory sslContextFactory;
     private final HttpProxyConfig httpProxyConfig;
+    private final SecurityContext securityContext;
 
-    public AsyncRestClientFactory(WebClientConfig clientConfig) {
+    public AsyncRestClientFactory(WebClientConfig clientConfig, SecurityContext securityContext) {
         if (clientConfig != null) {
             this.sslContextFactory = new CachingSslContextFactory(clientConfig);
             this.httpProxyConfig = clientConfig.httpProxyConfig();
@@ -66,6 +66,7 @@ public class AsyncRestClientFactory {
             this.sslContextFactory = null;
             this.httpProxyConfig = null;
         }
+        this.securityContext = securityContext;
     }
 
     public AsyncRestClient createRestClientNoHttpProxy(String baseUrl) {
@@ -80,12 +81,13 @@ public class AsyncRestClientFactory {
         if (this.sslContextFactory != null) {
             try {
                 return new AsyncRestClient(baseUrl, this.sslContextFactory.createSslContext(),
-                        useHttpProxy ? httpProxyConfig : null);
+                        useHttpProxy ? httpProxyConfig : null, this.securityContext);
             } catch (Exception e) {
-                logger.error("Could not init SSL context, reason: {}", e.toString());
+                String exceptionString = e.toString();
+                logger.error("Could not init SSL context, reason: {}", exceptionString);
             }
         }
-        return new AsyncRestClient(baseUrl, null, httpProxyConfig);
+        return new AsyncRestClient(baseUrl, null, httpProxyConfig, this.securityContext);
     }
 
     private class SslContextFactory {
diff --git a/src/main/java/org/oran/dmaapadapter/clients/SecurityContext.java b/src/main/java/org/oran/dmaapadapter/clients/SecurityContext.java
new file mode 100644 (file)
index 0000000..83c0c42
--- /dev/null
@@ -0,0 +1,76 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2022 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.clients;
+
+import java.lang.invoke.MethodHandles;
+import java.nio.file.Files;
+import java.nio.file.Path;
+
+import lombok.Setter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.stereotype.Component;
+
+@EnableConfigurationProperties
+@ConfigurationProperties()
+@Component
+public class SecurityContext {
+
+    private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+    private long tokenTimestamp = 0;
+
+    private String authToken = "";
+
+    @Setter
+    private Path authTokenFilePath;
+
+    public SecurityContext(@Value("${app.auth-token-file:\"\"}") String authTokenFilename) {
+        if (!authTokenFilename.isEmpty()) {
+            this.authTokenFilePath = Path.of(authTokenFilename);
+        }
+    }
+
+    public boolean isConfigured() {
+        return authTokenFilePath != null;
+    }
+
+    public synchronized String getBearerAuthToken() {
+        if (!isConfigured()) {
+            return "";
+        }
+        try {
+            long lastModified = authTokenFilePath.toFile().lastModified();
+            if (lastModified != this.tokenTimestamp) {
+                this.authToken = Files.readString(authTokenFilePath);
+                this.tokenTimestamp = lastModified;
+            }
+        } catch (Exception e) {
+            logger.warn("Could not read auth token file: {}, reason: {}", authTokenFilePath, e.getMessage());
+        }
+        return this.authToken;
+    }
+
+}
index be83296..3479720 100644 (file)
@@ -29,6 +29,7 @@ import java.util.Vector;
 
 import org.oran.dmaapadapter.clients.AsyncRestClient;
 import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
+import org.oran.dmaapadapter.clients.SecurityContext;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.exceptions.ServiceException;
 import org.oran.dmaapadapter.repository.Job.Parameters;
@@ -53,8 +54,8 @@ public class Jobs {
     private final AsyncRestClientFactory restclientFactory;
     private final List<Observer> observers = new ArrayList<>();
 
-    public Jobs(@Autowired ApplicationConfig applicationConfig) {
-        restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
+    public Jobs(@Autowired ApplicationConfig applicationConfig, @Autowired SecurityContext securityContext) {
+        restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig(), securityContext);
     }
 
     public synchronized Job getJob(String id) throws ServiceException {
index 6b4f253..c34b470 100644 (file)
 package org.oran.dmaapadapter.tasks;
 
 import java.time.Duration;
-import java.util.Collection;
-import java.util.LinkedList;
 
 import org.oran.dmaapadapter.clients.AsyncRestClient;
 import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
+import org.oran.dmaapadapter.clients.SecurityContext;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.repository.InfoType;
 import org.slf4j.Logger;
@@ -52,8 +51,9 @@ public class DmaapTopicListener implements TopicListener {
     private Many<String> output;
     private Disposable topicReceiverTask;
 
-    public DmaapTopicListener(ApplicationConfig applicationConfig, InfoType type) {
-        AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
+    public DmaapTopicListener(ApplicationConfig applicationConfig, InfoType type, SecurityContext securityContext) {
+        AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig(),
+                securityContext);
         this.dmaapRestClient = restclientFactory.createRestClientNoHttpProxy("");
         this.applicationConfig = applicationConfig;
         this.type = type;
@@ -112,14 +112,14 @@ public class DmaapTopicListener implements TopicListener {
         logger.trace("getFromMessageRouter {}", topicUrl);
         return dmaapRestClient.get(topicUrl) //
                 .filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away.
-                .flatMapMany(this::splitArray) //
+                .flatMapMany(this::splitJsonArray) //
                 .doOnNext(message -> logger.debug("Message from DMAAP topic: {} : {}", topicUrl, message)) //
                 .onErrorResume(this::handleDmaapErrorResponse); //
     }
 
-    private Flux<String> splitArray(String body) {
-        Collection<String> messages = gson.fromJson(body, LinkedList.class);
-        return Flux.fromIterable(messages);
+    private Flux<String> splitJsonArray(String body) {
+        String[] messages = gson.fromJson(body, String[].class);
+        return Flux.fromArray(messages);
     }
 
 }
index 0d941d9..99904df 100644 (file)
@@ -77,7 +77,7 @@ public class JobDataConsumer {
     public synchronized void start(Flux<String> input) {
         stop();
         this.errorStats.resetIrrecoverableErrors();
-        this.subscription = handleReceivedMessages(input, job) //
+        this.subscription = handleReceivedMessage(input, job) //
                 .flatMap(this::postToClient, job.getParameters().getMaxConcurrency()) //
                 .onErrorResume(this::handleError) //
                 .subscribe(this::handleConsumerSentOk, //
@@ -107,7 +107,7 @@ public class JobDataConsumer {
         return this.subscription != null;
     }
 
-    private Flux<String> handleReceivedMessages(Flux<String> input, Job job) {
+    private Flux<String> handleReceivedMessage(Flux<String> input, Job job) {
         Flux<String> result = input.map(job::filter) //
                 .filter(t -> !t.isEmpty()); //
 
index f3b663b..304eb18 100644 (file)
@@ -32,6 +32,7 @@ import lombok.Getter;
 
 import org.oran.dmaapadapter.clients.AsyncRestClient;
 import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
+import org.oran.dmaapadapter.clients.SecurityContext;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
 import org.oran.dmaapadapter.exceptions.ServiceException;
@@ -70,8 +71,10 @@ public class ProducerRegstrationTask {
     private boolean isRegisteredInIcs = false;
     private static final int REGISTRATION_SUPERVISION_INTERVAL_MS = 1000 * 10;
 
-    public ProducerRegstrationTask(@Autowired ApplicationConfig applicationConfig, @Autowired InfoTypes types) {
-        AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
+    public ProducerRegstrationTask(@Autowired ApplicationConfig applicationConfig, @Autowired InfoTypes types,
+            @Autowired SecurityContext securityContext) {
+        AsyncRestClientFactory restClientFactory =
+                new AsyncRestClientFactory(applicationConfig.getWebClientConfig(), securityContext);
         this.restClient = restClientFactory.createRestClientNoHttpProxy("");
         this.applicationConfig = applicationConfig;
         this.types = types;
index 5176867..685379c 100644 (file)
@@ -25,6 +25,7 @@ import java.util.Map;
 
 import lombok.Getter;
 
+import org.oran.dmaapadapter.clients.SecurityContext;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.repository.InfoType;
 import org.oran.dmaapadapter.repository.InfoTypes;
@@ -53,7 +54,8 @@ public class TopicListeners {
 
     private static final int CONSUMER_SUPERVISION_INTERVAL_MS = 1000 * 60 * 3;
 
-    public TopicListeners(@Autowired ApplicationConfig appConfig, @Autowired InfoTypes types, @Autowired Jobs jobs) {
+    public TopicListeners(@Autowired ApplicationConfig appConfig, @Autowired InfoTypes types, @Autowired Jobs jobs,
+            @Autowired SecurityContext securityContext) {
 
         for (InfoType type : types.getAll()) {
             if (type.isKafkaTopicDefined()) {
@@ -61,7 +63,7 @@ public class TopicListeners {
                 kafkaTopicListeners.put(type.getId(), topicConsumer);
             }
             if (type.isDmaapTopicDefined()) {
-                DmaapTopicListener topicListener = new DmaapTopicListener(appConfig, type);
+                DmaapTopicListener topicListener = new DmaapTopicListener(appConfig, type, securityContext);
                 dmaapTopicListeners.put(type.getId(), topicListener);
             }
         }
index 8b91d0f..155e112 100644 (file)
@@ -33,6 +33,7 @@ import java.nio.charset.Charset;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.Map;
 
 import org.json.JSONObject;
 import org.junit.jupiter.api.AfterEach;
@@ -41,6 +42,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.oran.dmaapadapter.clients.AsyncRestClient;
 import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
+import org.oran.dmaapadapter.clients.SecurityContext;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.configuration.ImmutableHttpProxyConfig;
 import org.oran.dmaapadapter.configuration.ImmutableWebClientConfig;
@@ -106,6 +108,9 @@ class ApplicationTest {
     @Autowired
     ProducerRegstrationTask producerRegistrationTask;
 
+    @Autowired
+    private SecurityContext securityContext;
+
     private com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
 
     @LocalServerPort
@@ -181,7 +186,7 @@ class ApplicationTest {
                 .trustStorePassword(config.trustStorePassword()) //
                 .httpProxyConfig(httpProxyConfig).build();
 
-        AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config);
+        AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config, securityContext);
         return restClientFactory.createRestClientNoHttpProxy(baseUrl());
     }
 
@@ -371,7 +376,6 @@ class ApplicationTest {
 
     @Test
     void testJsltFiltering() throws Exception {
-
         final String JOB_ID = "ID";
 
         // Register producer, Register types
@@ -399,6 +403,41 @@ class ApplicationTest {
         assertThat(receivedFiltered).contains("event");
     }
 
+    @Test
+    void testAuthToken() throws Exception {
+
+        // Create an auth token
+        final String AUTH_TOKEN = "testToken";
+        Path authFile = Files.createTempFile("icsTestAuthToken", ".txt");
+        Files.write(authFile, AUTH_TOKEN.getBytes());
+        this.securityContext.setAuthTokenFilePath(authFile);
+
+        final String JOB_ID = "ID";
+
+        // Register producer, Register types
+        waitForRegistration();
+
+        // Create a job with a PM filter
+
+        ConsumerJobInfo jobInfo = consumerJobInfo("DmaapInformationType", JOB_ID, jsonObjectRegexp());
+
+        this.icsSimulatorController.addJob(jobInfo, JOB_ID, restClient());
+        await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+        DmaapSimulatorController.addResponse("Hello");
+
+        ConsumerController.TestResults consumer = this.consumerController.testResults;
+        await().untilAsserted(() -> assertThat(consumer.receivedHeaders).hasSize(1));
+        String received = consumer.receivedBodies.get(0);
+        assertThat(received).isEqualTo("Hello");
+
+        // Check that the auth token was received by the consumer
+        assertThat(consumer.receivedHeaders).hasSize(1);
+        Map<String, String> headers = consumer.receivedHeaders.get(0);
+        assertThat(headers).containsEntry("authorization", "Bearer " + AUTH_TOKEN);
+        Files.delete(authFile);
+    }
+
     @Test
     void testJsonPathFiltering() throws Exception {
         final String JOB_ID = "ID";
index 70e89d6..d02e48b 100644 (file)
@@ -31,6 +31,7 @@ import java.lang.invoke.MethodHandles;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 import org.oran.dmaapadapter.controllers.VoidResponse;
 import org.slf4j.Logger;
@@ -40,6 +41,7 @@ import org.springframework.http.MediaType;
 import org.springframework.http.ResponseEntity;
 import org.springframework.web.bind.annotation.PostMapping;
 import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestHeader;
 import org.springframework.web.bind.annotation.RestController;
 
 @RestController("ConsumerSimulatorController")
@@ -54,6 +56,9 @@ public class ConsumerController {
 
         public List<String> receivedBodies = Collections.synchronizedList(new ArrayList<String>());
 
+        public List<Map<String, String>> receivedHeaders =
+                Collections.synchronizedList(new ArrayList<Map<String, String>>());
+
         public TestResults() {}
 
         public boolean hasReceived(String str) {
@@ -67,6 +72,7 @@ public class ConsumerController {
 
         public void reset() {
             receivedBodies.clear();
+            receivedHeaders.clear();
         }
     }
 
@@ -78,9 +84,10 @@ public class ConsumerController {
             @ApiResponse(responseCode = "200", description = "OK", //
                     content = @Content(schema = @Schema(implementation = VoidResponse.class))) //
     })
-    public ResponseEntity<Object> postData(@RequestBody String body) {
+    public ResponseEntity<Object> postData(@RequestBody String body, @RequestHeader Map<String, String> headers) {
         logger.info("Received by consumer: {}", body);
         testResults.receivedBodies.add(body);
+        testResults.receivedHeaders.add(headers);
         return new ResponseEntity<>(HttpStatus.OK);
     }
 
index 5153443..2c0dedc 100644 (file)
@@ -36,6 +36,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.oran.dmaapadapter.clients.AsyncRestClient;
 import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
+import org.oran.dmaapadapter.clients.SecurityContext;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.configuration.ImmutableHttpProxyConfig;
 import org.oran.dmaapadapter.configuration.ImmutableWebClientConfig;
@@ -85,6 +86,9 @@ class IntegrationWithIcs {
     @Autowired
     private ConsumerController consumerController;
 
+    @Autowired
+    private SecurityContext securityContext;
+
     private static Gson gson = new GsonBuilder().create();
 
     static class TestApplicationConfig extends ApplicationConfig {
@@ -152,7 +156,7 @@ class IntegrationWithIcs {
                 .trustStorePassword(config.trustStorePassword()) //
                 .httpProxyConfig(httpProxyConfig).build();
 
-        AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config);
+        AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config, securityContext);
         return restClientFactory.createRestClientNoHttpProxy(selfBaseUrl());
     }
 
index 1e288e5..246bb0b 100644 (file)
@@ -39,6 +39,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
 import org.oran.dmaapadapter.clients.AsyncRestClient;
 import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
+import org.oran.dmaapadapter.clients.SecurityContext;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.configuration.ImmutableHttpProxyConfig;
 import org.oran.dmaapadapter.configuration.ImmutableWebClientConfig;
@@ -99,6 +100,9 @@ class IntegrationWithKafka {
     @Autowired
     private TopicListeners topicListeners;
 
+    @Autowired
+    private SecurityContext securityContext;
+
     private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
 
     private static final Logger logger = LoggerFactory.getLogger(IntegrationWithKafka.class);
@@ -171,7 +175,7 @@ class IntegrationWithKafka {
                 .trustStorePassword(config.trustStorePassword()) //
                 .httpProxyConfig(httpProxyConfig).build();
 
-        AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config);
+        AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config, securityContext);
         return restClientFactory.createRestClientNoHttpProxy(baseUrl());
     }
 
@@ -237,7 +241,6 @@ class IntegrationWithKafka {
                 .blockLast();
 
         sender.close();
-
     }
 
     private void verifiedReceivedByConsumer(String... strings) {