NONRTRIC - Implement DMaaP mediator producer service in Java 36/7036/2
authorPatrikBuhr <patrik.buhr@est.tech>
Thu, 11 Nov 2021 14:33:07 +0000 (15:33 +0100)
committerPatrikBuhr <patrik.buhr@est.tech>
Thu, 11 Nov 2021 16:05:23 +0000 (17:05 +0100)
Simplifications and optimizations.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-597
Change-Id: Iae092ab0840571dc497513145a18cf5b23e11e32

15 files changed:
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Job.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumer.java
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java
dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/clients/AsyncRestClient.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/a1e/A1eController.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/r1consumer/ConsumerController.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/r1producer/ProducerCallbacks.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypeSubscriptions.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java
enrichment-coordinator-service/src/test/java/org/oransc/enrichment/ApplicationTest.java

index ec1541c..6939026 100644 (file)
@@ -67,96 +67,85 @@ public class AsyncRestClient {
         logger.debug("{} POST uri = '{}{}''", traceTag, baseUrl, uri);
         logger.trace("{} POST body: {}", traceTag, body);
         Mono<String> bodyProducer = body != null ? Mono.just(body) : Mono.empty();
-        return getWebClient() //
-                .flatMap(client -> {
-                    RequestHeadersSpec<?> request = client.post() //
-                            .uri(uri) //
-                            .contentType(MediaType.APPLICATION_JSON) //
-                            .body(bodyProducer, String.class);
-                    return retrieve(traceTag, request);
-                });
+
+        RequestHeadersSpec<?> request = getWebClient() //
+                .post() //
+                .uri(uri) //
+                .contentType(MediaType.APPLICATION_JSON) //
+                .body(bodyProducer, String.class);
+        return retrieve(traceTag, request);
     }
 
     public Mono<String> post(String uri, @Nullable String body) {
         return postForEntity(uri, body) //
-                .flatMap(this::toBody);
+                .map(this::toBody);
     }
 
     public Mono<String> postWithAuthHeader(String uri, String body, String username, String password) {
         Object traceTag = createTraceTag();
         logger.debug("{} POST (auth) uri = '{}{}''", traceTag, baseUrl, uri);
         logger.trace("{} POST body: {}", traceTag, body);
-        return getWebClient() //
-                .flatMap(client -> {
-                    RequestHeadersSpec<?> request = client.post() //
-                            .uri(uri) //
-                            .headers(headers -> headers.setBasicAuth(username, password)) //
-                            .contentType(MediaType.APPLICATION_JSON) //
-                            .bodyValue(body);
-                    return retrieve(traceTag, request) //
-                            .flatMap(this::toBody);
-                });
+
+        RequestHeadersSpec<?> request = getWebClient() //
+                .post() //
+                .uri(uri) //
+                .headers(headers -> headers.setBasicAuth(username, password)) //
+                .contentType(MediaType.APPLICATION_JSON) //
+                .bodyValue(body);
+        return retrieve(traceTag, request) //
+                .map(this::toBody);
     }
 
     public Mono<ResponseEntity<String>> putForEntity(String uri, String body) {
         Object traceTag = createTraceTag();
         logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri);
         logger.trace("{} PUT body: {}", traceTag, body);
-        return getWebClient() //
-                .flatMap(client -> {
-                    RequestHeadersSpec<?> request = client.put() //
-                            .uri(uri) //
-                            .contentType(MediaType.APPLICATION_JSON) //
-                            .bodyValue(body);
-                    return retrieve(traceTag, request);
-                });
+
+        RequestHeadersSpec<?> request = getWebClient() //
+                .put() //
+                .uri(uri) //
+                .contentType(MediaType.APPLICATION_JSON) //
+                .bodyValue(body);
+        return retrieve(traceTag, request);
     }
 
     public Mono<ResponseEntity<String>> putForEntity(String uri) {
         Object traceTag = createTraceTag();
         logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri);
         logger.trace("{} PUT body: <empty>", traceTag);
-        return getWebClient() //
-                .flatMap(client -> {
-                    RequestHeadersSpec<?> request = client.put() //
-                            .uri(uri);
-                    return retrieve(traceTag, request);
-                });
+        RequestHeadersSpec<?> request = getWebClient() //
+                .put() //
+                .uri(uri);
+        return retrieve(traceTag, request);
     }
 
     public Mono<String> put(String uri, String body) {
         return putForEntity(uri, body) //
-                .flatMap(this::toBody);
+                .map(this::toBody);
     }
 
     public Mono<ResponseEntity<String>> getForEntity(String uri) {
         Object traceTag = createTraceTag();
         logger.debug("{} GET uri = '{}{}''", traceTag, baseUrl, uri);
-        return getWebClient() //
-                .flatMap(client -> {
-                    RequestHeadersSpec<?> request = client.get().uri(uri);
-                    return retrieve(traceTag, request);
-                });
+        RequestHeadersSpec<?> request = getWebClient().get().uri(uri);
+        return retrieve(traceTag, request);
     }
 
     public Mono<String> get(String uri) {
         return getForEntity(uri) //
-                .flatMap(this::toBody);
+                .map(this::toBody);
     }
 
     public Mono<ResponseEntity<String>> deleteForEntity(String uri) {
         Object traceTag = createTraceTag();
         logger.debug("{} DELETE uri = '{}{}''", traceTag, baseUrl, uri);
-        return getWebClient() //
-                .flatMap(client -> {
-                    RequestHeadersSpec<?> request = client.delete().uri(uri);
-                    return retrieve(traceTag, request);
-                });
+        RequestHeadersSpec<?> request = getWebClient().delete().uri(uri);
+        return retrieve(traceTag, request);
     }
 
     public Mono<String> delete(String uri) {
         return deleteForEntity(uri) //
-                .flatMap(this::toBody);
+                .map(this::toBody);
     }
 
     private Mono<ResponseEntity<String>> retrieve(Object traceTag, RequestHeadersSpec<?> request) {
@@ -185,11 +174,11 @@ public class AsyncRestClient {
         }
     }
 
-    private Mono<String> toBody(ResponseEntity<String> entity) {
+    private String toBody(ResponseEntity<String> entity) {
         if (entity.getBody() == null) {
-            return Mono.just("");
+            return "";
         } else {
-            return Mono.just(entity.getBody());
+            return entity.getBody();
         }
     }
 
@@ -229,11 +218,11 @@ public class AsyncRestClient {
                 .build();
     }
 
-    private Mono<WebClient> getWebClient() {
+    private WebClient getWebClient() {
         if (this.webClient == null) {
             this.webClient = buildWebClient(baseUrl);
         }
-        return Mono.just(buildWebClient(baseUrl));
+        return this.webClient;
     }
 
 }
index e4dca5b..07f5aa7 100644 (file)
@@ -82,11 +82,9 @@ public class ProducerCallbacksController {
             @RequestBody String body) {
         try {
             ProducerJobInfo request = gson.fromJson(body, ProducerJobInfo.class);
-
-            logger.info("Job started callback {}", request.id);
-            Job job = new Job(request.id, request.targetUri, types.getType(request.typeId), request.owner,
+            logger.debug("Job started callback {}", request.id);
+            this.jobs.addJob(request.id, request.targetUri, types.getType(request.typeId), request.owner,
                     request.lastUpdated, toJobParameters(request.jobData));
-            this.jobs.put(job);
             return new ResponseEntity<>(HttpStatus.OK);
         } catch (Exception e) {
             return ErrorResponse.create(e, HttpStatus.NOT_FOUND);
@@ -123,7 +121,7 @@ public class ProducerCallbacksController {
     public ResponseEntity<Object> jobDeletedCallback( //
             @PathVariable("infoJobId") String infoJobId) {
 
-        logger.info("Job deleted callback {}", infoJobId);
+        logger.debug("Job deleted callback {}", infoJobId);
         this.jobs.remove(infoJobId);
         return new ResponseEntity<>(HttpStatus.OK);
     }
index d1697e9..fbeb9cb 100644 (file)
 
 package org.oran.dmaapadapter.repository;
 
+import java.time.Duration;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import lombok.Getter;
 
 import org.immutables.gson.Gson;
+import org.oran.dmaapadapter.clients.AsyncRestClient;
 
 public class Job {
 
     @Gson.TypeAdapters
     public static class Parameters {
-        public String filter;
-        public BufferTimeout bufferTimeout;
+        @Getter
+        private String filter;
+        @Getter
+        private BufferTimeout bufferTimeout;
 
-        public Parameters() {
-        }
+        public Parameters() {}
 
         public Parameters(String filter, BufferTimeout bufferTimeout) {
             this.filter = filter;
             this.bufferTimeout = bufferTimeout;
         }
+    }
+
+    @Gson.TypeAdapters
+    public static class BufferTimeout {
+        public BufferTimeout(int maxSize, int maxTimeMiliseconds) {
+            this.maxSize = maxSize;
+            this.maxTimeMiliseconds = maxTimeMiliseconds;
+        }
+
+        public BufferTimeout() {}
 
-        public static class BufferTimeout {
-            public BufferTimeout(int maxSize, int maxTimeMiliseconds) {
-                this.maxSize = maxSize;
-                this.maxTimeMiliseconds = maxTimeMiliseconds;
-            }
+        @Getter
+        private int maxSize;
 
-            public BufferTimeout() {
-            }
+        private int maxTimeMiliseconds;
 
-            public int maxSize;
-            public int maxTimeMiliseconds;
+        public Duration getMaxTime() {
+            return Duration.ofMillis(maxTimeMiliseconds);
         }
     }
 
@@ -76,7 +85,11 @@ public class Job {
 
     private final Pattern jobDataFilter;
 
-    public Job(String id, String callbackUrl, InfoType type, String owner, String lastUpdated, Parameters parameters) {
+    @Getter
+    private final AsyncRestClient consumerRestClient;
+
+    public Job(String id, String callbackUrl, InfoType type, String owner, String lastUpdated, Parameters parameters,
+            AsyncRestClient consumerRestClient) {
         this.id = id;
         this.callbackUrl = callbackUrl;
         this.type = type;
@@ -88,6 +101,7 @@ public class Job {
         } else {
             jobDataFilter = null;
         }
+        this.consumerRestClient = consumerRestClient;
     }
 
     public boolean isFilterMatch(String data) {
index 8a38824..e3bc61e 100644 (file)
@@ -25,7 +25,11 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Vector;
 
+import org.oran.dmaapadapter.clients.AsyncRestClient;
+import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.exceptions.ServiceException;
+import org.oran.dmaapadapter.repository.Job.Parameters;
 import org.oran.dmaapadapter.tasks.KafkaTopicConsumers;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -39,9 +43,12 @@ public class Jobs {
     private Map<String, Job> allJobs = new HashMap<>();
     private MultiMap<Job> jobsByType = new MultiMap<>();
     private final KafkaTopicConsumers kafkaConsumers;
+    private final AsyncRestClientFactory restclientFactory;
 
-    public Jobs(@Autowired KafkaTopicConsumers kafkaConsumers) {
+    public Jobs(@Autowired KafkaTopicConsumers kafkaConsumers, @Autowired ApplicationConfig applicationConfig) {
         this.kafkaConsumers = kafkaConsumers;
+
+        restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
     }
 
     public synchronized Job getJob(String id) throws ServiceException {
@@ -56,7 +63,16 @@ public class Jobs {
         return allJobs.get(id);
     }
 
-    public synchronized void put(Job job) {
+    public void addJob(String id, String callbackUrl, InfoType type, String owner, String lastUpdated,
+            Parameters parameters) {
+        AsyncRestClient consumerRestClient = type.isUseHttpProxy() //
+                ? restclientFactory.createRestClientUseHttpProxy(callbackUrl) //
+                : restclientFactory.createRestClientNoHttpProxy(callbackUrl);
+        Job job = new Job(id, callbackUrl, type, owner, lastUpdated, parameters, consumerRestClient);
+        this.put(job);
+    }
+
+    private synchronized void put(Job job) {
         logger.debug("Put job: {}", job.getId());
         allJobs.put(job.getId(), job);
         jobsByType.put(job.getType().getId(), job.getId(), job);
index 7d55758..55a02ab 100644 (file)
@@ -45,7 +45,6 @@ public class DmaapTopicConsumer {
 
     private final AsyncRestClient dmaapRestClient;
     private final InfiniteFlux infiniteSubmitter = new InfiniteFlux();
-    private final AsyncRestClient consumerRestClient;
     protected final ApplicationConfig applicationConfig;
     protected final InfoType type;
     protected final Jobs jobs;
@@ -85,8 +84,6 @@ public class DmaapTopicConsumer {
         AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
         this.dmaapRestClient = restclientFactory.createRestClientNoHttpProxy("");
         this.applicationConfig = applicationConfig;
-        this.consumerRestClient = type.isUseHttpProxy() ? restclientFactory.createRestClientUseHttpProxy("")
-                : restclientFactory.createRestClientNoHttpProxy("");
         this.type = type;
         this.jobs = jobs;
     }
@@ -108,7 +105,8 @@ public class DmaapTopicConsumer {
 
     private Mono<String> handleDmaapErrorResponse(Throwable t) {
         logger.debug("error from DMAAP {} {}", t.getMessage(), type.getDmaapTopicUrl());
-        return Mono.delay(TIME_BETWEEN_DMAAP_RETRIES).flatMap(notUsed -> Mono.empty());
+        return Mono.delay(TIME_BETWEEN_DMAAP_RETRIES) //
+                .flatMap(notUsed -> Mono.empty());
     }
 
     private Mono<String> getFromMessageRouter(String topicUrl) {
@@ -130,8 +128,8 @@ public class DmaapTopicConsumer {
 
         // Distibute the body to all jobs for this type
         return Flux.fromIterable(this.jobs.getJobsForType(this.type)) //
-                .doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl()))
-                .flatMap(job -> consumerRestClient.post(job.getCallbackUrl(), body), CONCURRENCY) //
+                .doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl())) //
+                .flatMap(job -> job.getConsumerRestClient().post("", body), CONCURRENCY) //
                 .onErrorResume(this::handleConsumerErrorResponse);
     }
 }
index 6079edf..5590022 100644 (file)
@@ -20,7 +20,6 @@
 
 package org.oran.dmaapadapter.tasks;
 
-import java.time.Duration;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
@@ -29,8 +28,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.serialization.IntegerDeserializer;
 import org.apache.kafka.common.serialization.StringDeserializer;
-import org.oran.dmaapadapter.clients.AsyncRestClient;
-import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.repository.InfoType;
 import org.oran.dmaapadapter.repository.Job;
@@ -52,7 +49,6 @@ import reactor.kafka.receiver.ReceiverOptions;
 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
 public class KafkaTopicConsumer {
     private static final Logger logger = LoggerFactory.getLogger(KafkaTopicConsumer.class);
-    private final AsyncRestClient consumerRestClient;
     private final ApplicationConfig applicationConfig;
     private final InfoType type;
     private final Many<String> consumerDistributor;
@@ -60,12 +56,9 @@ public class KafkaTopicConsumer {
     public KafkaTopicConsumer(ApplicationConfig applicationConfig, InfoType type) {
         this.applicationConfig = applicationConfig;
 
-        final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 10;
+        final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10;
         this.consumerDistributor = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE);
 
-        AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
-        this.consumerRestClient = type.isUseHttpProxy() ? restclientFactory.createRestClientUseHttpProxy("")
-                : restclientFactory.createRestClientNoHttpProxy("");
         this.type = type;
         startKafkaTopicReceiver();
     }
@@ -73,22 +66,23 @@ public class KafkaTopicConsumer {
     private Disposable startKafkaTopicReceiver() {
         return KafkaReceiver.create(kafkaInputProperties()) //
                 .receive() //
-                .flatMap(this::onReceivedData) //
+                .doOnNext(this::onReceivedData) //
                 .subscribe(null, //
-                        throwable -> logger.error("KafkaMessageConsumer error: {}", throwable.getMessage()), //
-                        () -> logger.warn("KafkaMessageConsumer stopped"));
+                        throwable -> logger.error("KafkaTopicReceiver error: {}", throwable.getMessage()), //
+                        () -> logger.warn("KafkaTopicReceiver stopped"));
     }
 
-    private Flux<String> onReceivedData(ConsumerRecord<Integer, String> input) {
+    private void onReceivedData(ConsumerRecord<Integer, String> input) {
         logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(), input.value());
         consumerDistributor.emitNext(input.value(), Sinks.EmitFailureHandler.FAIL_FAST);
-        return consumerDistributor.asFlux();
     }
 
     public Disposable startDistributeToConsumer(Job job) {
+        final int CONCURRENCY = 10; // Has to be 1 to guarantee correct order.
+
         return getMessagesFromKafka(job) //
                 .doOnNext(data -> logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), data))
-                .flatMap(body -> consumerRestClient.post(job.getCallbackUrl(), body)) //
+                .flatMap(body -> job.getConsumerRestClient().post("", body), CONCURRENCY) //
                 .onErrorResume(this::handleConsumerErrorResponse) //
                 .subscribe(null, //
                         throwable -> logger.error("KafkaMessageConsumer error: {}", throwable.getMessage()), //
@@ -99,9 +93,10 @@ public class KafkaTopicConsumer {
         if (job.isBuffered()) {
             return consumerDistributor.asFlux() //
                     .filter(job::isFilterMatch) //
-                    .bufferTimeout(job.getParameters().bufferTimeout.maxSize,
-                            Duration.ofMillis(job.getParameters().bufferTimeout.maxTimeMiliseconds)) //
-                    .flatMap(o -> Flux.just(o.toString()));
+                    .bufferTimeout( //
+                            job.getParameters().getBufferTimeout().getMaxSize(), //
+                            job.getParameters().getBufferTimeout().getMaxTime()) //
+                    .map(Object::toString);
         } else {
             return consumerDistributor.asFlux() //
                     .filter(job::isFilterMatch);
index e8b236c..7b719e3 100644 (file)
@@ -95,6 +95,7 @@ public class ProducerRegstrationTask {
         logger.warn("Registration of producer failed {}", t.getMessage());
     }
 
+    // Returns TRUE if registration is correct
     private Mono<Boolean> checkRegistration() {
         final String url = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
         return restClient.get(url) //
@@ -118,8 +119,8 @@ public class ProducerRegstrationTask {
 
     private Mono<String> registerTypesAndProducer() {
         final int CONCURRENCY = 20;
-        final String producerUrl = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/"
-                + PRODUCER_ID;
+        final String producerUrl =
+                applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
 
         return Flux.fromIterable(this.types.getAll()) //
                 .doOnNext(type -> logger.info("Registering type {}", type.getId())) //
index 31ef970..5ee3452 100644 (file)
@@ -175,8 +175,7 @@ class IntegrationWithKafka {
     }
 
     private Object jobParametersAsJsonObject(String filter, int maxTimeMiliseconds, int maxSize) {
-        Job.Parameters param = new Job.Parameters(filter,
-                new Job.Parameters.BufferTimeout(maxSize, maxTimeMiliseconds));
+        Job.Parameters param = new Job.Parameters(filter, new Job.BufferTimeout(maxSize, maxTimeMiliseconds));
         String str = gson.toJson(param);
         return jsonObject(str);
     }
@@ -228,8 +227,8 @@ class IntegrationWithKafka {
         assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1);
 
         // Create a job
-        this.ecsSimulatorController.addJob(consumerJobInfo(".*", 10, 1000), JOB_ID1, restClient());
-        this.ecsSimulatorController.addJob(consumerJobInfo(".*Message_1.*", 0, 0), JOB_ID2, restClient());
+        this.ecsSimulatorController.addJob(consumerJobInfo(null, 10, 1000), JOB_ID1, restClient());
+        this.ecsSimulatorController.addJob(consumerJobInfo("^Message_1$", 0, 0), JOB_ID2, restClient());
         await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
 
         final KafkaSender<Integer, String> sender = KafkaSender.create(senderOptions());
index 1b8e064..b7f23b1 100644 (file)
@@ -67,96 +67,85 @@ public class AsyncRestClient {
         logger.debug("{} POST uri = '{}{}''", traceTag, baseUrl, uri);
         logger.trace("{} POST body: {}", traceTag, body);
         Mono<String> bodyProducer = body != null ? Mono.just(body) : Mono.empty();
-        return getWebClient() //
-            .flatMap(client -> {
-                RequestHeadersSpec<?> request = client.post() //
-                    .uri(uri) //
-                    .contentType(MediaType.APPLICATION_JSON) //
-                    .body(bodyProducer, String.class);
-                return retrieve(traceTag, request);
-            });
+
+        RequestHeadersSpec<?> request = getWebClient() //
+            .post() //
+            .uri(uri) //
+            .contentType(MediaType.APPLICATION_JSON) //
+            .body(bodyProducer, String.class);
+        return retrieve(traceTag, request);
     }
 
     public Mono<String> post(String uri, @Nullable String body) {
         return postForEntity(uri, body) //
-            .flatMap(this::toBody);
+            .map(this::toBody);
     }
 
     public Mono<String> postWithAuthHeader(String uri, String body, String username, String password) {
         Object traceTag = createTraceTag();
         logger.debug("{} POST (auth) uri = '{}{}''", traceTag, baseUrl, uri);
         logger.trace("{} POST body: {}", traceTag, body);
-        return getWebClient() //
-            .flatMap(client -> {
-                RequestHeadersSpec<?> request = client.post() //
-                    .uri(uri) //
-                    .headers(headers -> headers.setBasicAuth(username, password)) //
-                    .contentType(MediaType.APPLICATION_JSON) //
-                    .bodyValue(body);
-                return retrieve(traceTag, request) //
-                    .flatMap(this::toBody);
-            });
+
+        RequestHeadersSpec<?> request = getWebClient() //
+            .post() //
+            .uri(uri) //
+            .headers(headers -> headers.setBasicAuth(username, password)) //
+            .contentType(MediaType.APPLICATION_JSON) //
+            .bodyValue(body);
+        return retrieve(traceTag, request) //
+            .map(this::toBody);
     }
 
     public Mono<ResponseEntity<String>> putForEntity(String uri, String body) {
         Object traceTag = createTraceTag();
         logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri);
         logger.trace("{} PUT body: {}", traceTag, body);
-        return getWebClient() //
-            .flatMap(client -> {
-                RequestHeadersSpec<?> request = client.put() //
-                    .uri(uri) //
-                    .contentType(MediaType.APPLICATION_JSON) //
-                    .bodyValue(body);
-                return retrieve(traceTag, request);
-            });
+
+        RequestHeadersSpec<?> request = getWebClient() //
+            .put() //
+            .uri(uri) //
+            .contentType(MediaType.APPLICATION_JSON) //
+            .bodyValue(body);
+        return retrieve(traceTag, request);
     }
 
     public Mono<ResponseEntity<String>> putForEntity(String uri) {
         Object traceTag = createTraceTag();
         logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri);
         logger.trace("{} PUT body: <empty>", traceTag);
-        return getWebClient() //
-            .flatMap(client -> {
-                RequestHeadersSpec<?> request = client.put() //
-                    .uri(uri);
-                return retrieve(traceTag, request);
-            });
+        RequestHeadersSpec<?> request = getWebClient() //
+            .put() //
+            .uri(uri);
+        return retrieve(traceTag, request);
     }
 
     public Mono<String> put(String uri, String body) {
         return putForEntity(uri, body) //
-            .flatMap(this::toBody);
+            .map(this::toBody);
     }
 
     public Mono<ResponseEntity<String>> getForEntity(String uri) {
         Object traceTag = createTraceTag();
         logger.debug("{} GET uri = '{}{}''", traceTag, baseUrl, uri);
-        return getWebClient() //
-            .flatMap(client -> {
-                RequestHeadersSpec<?> request = client.get().uri(uri);
-                return retrieve(traceTag, request);
-            });
+        RequestHeadersSpec<?> request = getWebClient().get().uri(uri);
+        return retrieve(traceTag, request);
     }
 
     public Mono<String> get(String uri) {
         return getForEntity(uri) //
-            .flatMap(this::toBody);
+            .map(this::toBody);
     }
 
     public Mono<ResponseEntity<String>> deleteForEntity(String uri) {
         Object traceTag = createTraceTag();
         logger.debug("{} DELETE uri = '{}{}''", traceTag, baseUrl, uri);
-        return getWebClient() //
-            .flatMap(client -> {
-                RequestHeadersSpec<?> request = client.delete().uri(uri);
-                return retrieve(traceTag, request);
-            });
+        RequestHeadersSpec<?> request = getWebClient().delete().uri(uri);
+        return retrieve(traceTag, request);
     }
 
     public Mono<String> delete(String uri) {
         return deleteForEntity(uri) //
-            .flatMap(this::toBody);
+            .map(this::toBody);
     }
 
     private Mono<ResponseEntity<String>> retrieve(Object traceTag, RequestHeadersSpec<?> request) {
@@ -185,11 +174,11 @@ public class AsyncRestClient {
         }
     }
 
-    private Mono<String> toBody(ResponseEntity<String> entity) {
+    private String toBody(ResponseEntity<String> entity) {
         if (entity.getBody() == null) {
-            return Mono.just("");
+            return "";
         } else {
-            return Mono.just(entity.getBody());
+            return entity.getBody();
         }
     }
 
@@ -229,11 +218,10 @@ public class AsyncRestClient {
             .build();
     }
 
-    private Mono<WebClient> getWebClient() {
+    private WebClient getWebClient() {
         if (this.webClient == null) {
             this.webClient = buildWebClient(baseUrl);
         }
-        return Mono.just(buildWebClient(baseUrl));
+        return this.webClient;
     }
-
 }
index 9609e27..8c056fc 100644 (file)
@@ -298,7 +298,7 @@ public class A1eController {
         return validatePutEiJob(eiJobId, eiJobObject) //
             .flatMap(this::startEiJob) //
             .doOnNext(newEiJob -> this.eiJobs.put(newEiJob)) //
-            .flatMap(newEiJob -> Mono.just(new ResponseEntity<>(isNewJob ? HttpStatus.CREATED : HttpStatus.OK)))
+            .map(newEiJob -> new ResponseEntity<>(isNewJob ? HttpStatus.CREATED : HttpStatus.OK)) //
             .onErrorResume(throwable -> Mono.just(ErrorResponse.create(throwable, HttpStatus.INTERNAL_SERVER_ERROR)));
     }
 
@@ -306,7 +306,7 @@ public class A1eController {
         return this.producerCallbacks.startInfoSubscriptionJob(newEiJob, infoProducers) //
             .doOnNext(noOfAcceptingProducers -> this.logger.debug(
                 "Started EI job {}, number of activated producers: {}", newEiJob.getId(), noOfAcceptingProducers)) //
-            .flatMap(noOfAcceptingProducers -> Mono.just(newEiJob));
+            .map(noOfAcceptingProducers -> newEiJob);
     }
 
     private Mono<InfoJob> validatePutEiJob(String eiJobId, A1eEiJobInfo eiJobInfo) {
index 47a4a2e..b108380 100644 (file)
@@ -308,7 +308,7 @@ public class ConsumerController {
         return validatePutInfoJob(jobId, informationJobObject, performTypeCheck) //
             .flatMap(this::startInfoSubscriptionJob) //
             .doOnNext(this.infoJobs::put) //
-            .flatMap(newEiJob -> Mono.just(new ResponseEntity<>(isNewJob ? HttpStatus.CREATED : HttpStatus.OK)))
+            .map(newEiJob -> new ResponseEntity<>(isNewJob ? HttpStatus.CREATED : HttpStatus.OK)) //
             .onErrorResume(throwable -> Mono.just(ErrorResponse.create(throwable, HttpStatus.NOT_FOUND)));
     }
 
@@ -441,7 +441,7 @@ public class ConsumerController {
         return this.producerCallbacks.startInfoSubscriptionJob(newInfoJob, infoProducers) //
             .doOnNext(noOfAcceptingProducers -> this.logger.debug("Started job {}, number of activated producers: {}",
                 newInfoJob.getId(), noOfAcceptingProducers)) //
-            .flatMap(noOfAcceptingProducers -> Mono.just(newInfoJob));
+            .map(noOfAcceptingProducers -> newInfoJob);
     }
 
     private Mono<InfoJob> validatePutInfoJob(String jobId, ConsumerJobInfo jobInfo, boolean performTypeCheck) {
index a97bdf6..558ae79 100644 (file)
@@ -84,7 +84,7 @@ public class ProducerCallbacks {
         return Flux.fromIterable(getProducersForJob(infoJob, infoProducers)) //
             .flatMap(infoProducer -> startInfoJob(infoProducer, infoJob, retrySpec)) //
             .collectList() //
-            .flatMap(okResponses -> Mono.just(Integer.valueOf(okResponses.size()))); //
+            .map(okResponses -> Integer.valueOf(okResponses.size())); //
     }
 
     /**
index 65978e1..533199f 100644 (file)
@@ -222,8 +222,7 @@ public class InfoTypeSubscriptions {
     private Mono<String> notifySubscriber(Function<? super SubscriptionInfo, Mono<String>> notifyFunc,
         SubscriptionInfo subscriptionInfo) {
         Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1));
-        return Mono.just(1) //
-            .flatMap(notUsed -> notifyFunc.apply(subscriptionInfo)) //
+        return notifyFunc.apply(subscriptionInfo) //
             .retryWhen(retrySpec) //
             .onErrorResume(throwable -> {
                 logger.warn("Consumer callback failed {}, removing subscription {}", throwable.getMessage(),
index db7c29b..08c5fc8 100644 (file)
@@ -80,7 +80,7 @@ public class ProducerSupervision {
             })//
             .doOnNext(response -> handleRespondingProducer(response, producer))
             .flatMap(response -> checkProducerJobs(producer)) //
-            .flatMap(responses -> Mono.just(producer));
+            .map(responses -> producer);
     }
 
     private Mono<?> checkProducerJobs(InfoProducer producer) {
index 4418429..8c8ce5f 100644 (file)
@@ -1028,7 +1028,7 @@ class ApplicationTest {
         // Test that subscriptions are removed for a unresponsive consumer
 
         // PUT a subscription with a junk callback
-        final ConsumerTypeSubscriptionInfo info = new ConsumerTypeSubscriptionInfo(baseUrl() + "JUNK", "owner");
+        final ConsumerTypeSubscriptionInfo info = new ConsumerTypeSubscriptionInfo(baseUrl() + "/JUNK", "owner");
         String body = gson.toJson(info);
         restClient().putForEntity(typeSubscriptionUrl() + "/subscriptionId", body).block();
         assertThat(this.infoTypeSubscriptions.size()).isEqualTo(1);