From: PatrikBuhr Date: Thu, 11 Nov 2021 14:33:07 +0000 (+0100) Subject: NONRTRIC - Implement DMaaP mediator producer service in Java X-Git-Tag: 1.2.0~47^2 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=5e1623ab25b62c6c28849bfd862eba4648465922;p=nonrtric.git NONRTRIC - Implement DMaaP mediator producer service in Java Simplifications and optimizations. Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-597 Change-Id: Iae092ab0840571dc497513145a18cf5b23e11e32 --- diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java index ec1541cf..6939026d 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/clients/AsyncRestClient.java @@ -67,96 +67,85 @@ public class AsyncRestClient { logger.debug("{} POST uri = '{}{}''", traceTag, baseUrl, uri); logger.trace("{} POST body: {}", traceTag, body); Mono bodyProducer = body != null ? Mono.just(body) : Mono.empty(); - return getWebClient() // - .flatMap(client -> { - RequestHeadersSpec request = client.post() // - .uri(uri) // - .contentType(MediaType.APPLICATION_JSON) // - .body(bodyProducer, String.class); - return retrieve(traceTag, request); - }); + + RequestHeadersSpec request = getWebClient() // + .post() // + .uri(uri) // + .contentType(MediaType.APPLICATION_JSON) // + .body(bodyProducer, String.class); + return retrieve(traceTag, request); } public Mono post(String uri, @Nullable String body) { return postForEntity(uri, body) // - .flatMap(this::toBody); + .map(this::toBody); } public Mono postWithAuthHeader(String uri, String body, String username, String password) { Object traceTag = createTraceTag(); logger.debug("{} POST (auth) uri = '{}{}''", traceTag, baseUrl, uri); logger.trace("{} POST body: {}", traceTag, body); - return getWebClient() // - .flatMap(client -> { - RequestHeadersSpec request = client.post() // - .uri(uri) // - .headers(headers -> headers.setBasicAuth(username, password)) // - .contentType(MediaType.APPLICATION_JSON) // - .bodyValue(body); - return retrieve(traceTag, request) // - .flatMap(this::toBody); - }); + + RequestHeadersSpec request = getWebClient() // + .post() // + .uri(uri) // + .headers(headers -> headers.setBasicAuth(username, password)) // + .contentType(MediaType.APPLICATION_JSON) // + .bodyValue(body); + return retrieve(traceTag, request) // + .map(this::toBody); } public Mono> putForEntity(String uri, String body) { Object traceTag = createTraceTag(); logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri); logger.trace("{} PUT body: {}", traceTag, body); - return getWebClient() // - .flatMap(client -> { - RequestHeadersSpec request = client.put() // - .uri(uri) // - .contentType(MediaType.APPLICATION_JSON) // - .bodyValue(body); - return retrieve(traceTag, request); - }); + + RequestHeadersSpec request = getWebClient() // + .put() // + .uri(uri) // + .contentType(MediaType.APPLICATION_JSON) // + .bodyValue(body); + return retrieve(traceTag, request); } public Mono> putForEntity(String uri) { Object traceTag = createTraceTag(); logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri); logger.trace("{} PUT body: ", traceTag); - return getWebClient() // - .flatMap(client -> { - RequestHeadersSpec request = client.put() // - .uri(uri); - return retrieve(traceTag, request); - }); + RequestHeadersSpec request = getWebClient() // + .put() // + .uri(uri); + return retrieve(traceTag, request); } public Mono put(String uri, String body) { return putForEntity(uri, body) // - .flatMap(this::toBody); + .map(this::toBody); } public Mono> getForEntity(String uri) { Object traceTag = createTraceTag(); logger.debug("{} GET uri = '{}{}''", traceTag, baseUrl, uri); - return getWebClient() // - .flatMap(client -> { - RequestHeadersSpec request = client.get().uri(uri); - return retrieve(traceTag, request); - }); + RequestHeadersSpec request = getWebClient().get().uri(uri); + return retrieve(traceTag, request); } public Mono get(String uri) { return getForEntity(uri) // - .flatMap(this::toBody); + .map(this::toBody); } public Mono> deleteForEntity(String uri) { Object traceTag = createTraceTag(); logger.debug("{} DELETE uri = '{}{}''", traceTag, baseUrl, uri); - return getWebClient() // - .flatMap(client -> { - RequestHeadersSpec request = client.delete().uri(uri); - return retrieve(traceTag, request); - }); + RequestHeadersSpec request = getWebClient().delete().uri(uri); + return retrieve(traceTag, request); } public Mono delete(String uri) { return deleteForEntity(uri) // - .flatMap(this::toBody); + .map(this::toBody); } private Mono> retrieve(Object traceTag, RequestHeadersSpec request) { @@ -185,11 +174,11 @@ public class AsyncRestClient { } } - private Mono toBody(ResponseEntity entity) { + private String toBody(ResponseEntity entity) { if (entity.getBody() == null) { - return Mono.just(""); + return ""; } else { - return Mono.just(entity.getBody()); + return entity.getBody(); } } @@ -229,11 +218,11 @@ public class AsyncRestClient { .build(); } - private Mono getWebClient() { + private WebClient getWebClient() { if (this.webClient == null) { this.webClient = buildWebClient(baseUrl); } - return Mono.just(buildWebClient(baseUrl)); + return this.webClient; } } diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java index e4dca5b8..07f5aa72 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/controllers/ProducerCallbacksController.java @@ -82,11 +82,9 @@ public class ProducerCallbacksController { @RequestBody String body) { try { ProducerJobInfo request = gson.fromJson(body, ProducerJobInfo.class); - - logger.info("Job started callback {}", request.id); - Job job = new Job(request.id, request.targetUri, types.getType(request.typeId), request.owner, + logger.debug("Job started callback {}", request.id); + this.jobs.addJob(request.id, request.targetUri, types.getType(request.typeId), request.owner, request.lastUpdated, toJobParameters(request.jobData)); - this.jobs.put(job); return new ResponseEntity<>(HttpStatus.OK); } catch (Exception e) { return ErrorResponse.create(e, HttpStatus.NOT_FOUND); @@ -123,7 +121,7 @@ public class ProducerCallbacksController { public ResponseEntity jobDeletedCallback( // @PathVariable("infoJobId") String infoJobId) { - logger.info("Job deleted callback {}", infoJobId); + logger.debug("Job deleted callback {}", infoJobId); this.jobs.remove(infoJobId); return new ResponseEntity<>(HttpStatus.OK); } diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Job.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Job.java index d1697e96..fbeb9cbc 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Job.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Job.java @@ -20,39 +20,48 @@ package org.oran.dmaapadapter.repository; +import java.time.Duration; import java.util.regex.Matcher; import java.util.regex.Pattern; import lombok.Getter; import org.immutables.gson.Gson; +import org.oran.dmaapadapter.clients.AsyncRestClient; public class Job { @Gson.TypeAdapters public static class Parameters { - public String filter; - public BufferTimeout bufferTimeout; + @Getter + private String filter; + @Getter + private BufferTimeout bufferTimeout; - public Parameters() { - } + public Parameters() {} public Parameters(String filter, BufferTimeout bufferTimeout) { this.filter = filter; this.bufferTimeout = bufferTimeout; } + } + + @Gson.TypeAdapters + public static class BufferTimeout { + public BufferTimeout(int maxSize, int maxTimeMiliseconds) { + this.maxSize = maxSize; + this.maxTimeMiliseconds = maxTimeMiliseconds; + } + + public BufferTimeout() {} - public static class BufferTimeout { - public BufferTimeout(int maxSize, int maxTimeMiliseconds) { - this.maxSize = maxSize; - this.maxTimeMiliseconds = maxTimeMiliseconds; - } + @Getter + private int maxSize; - public BufferTimeout() { - } + private int maxTimeMiliseconds; - public int maxSize; - public int maxTimeMiliseconds; + public Duration getMaxTime() { + return Duration.ofMillis(maxTimeMiliseconds); } } @@ -76,7 +85,11 @@ public class Job { private final Pattern jobDataFilter; - public Job(String id, String callbackUrl, InfoType type, String owner, String lastUpdated, Parameters parameters) { + @Getter + private final AsyncRestClient consumerRestClient; + + public Job(String id, String callbackUrl, InfoType type, String owner, String lastUpdated, Parameters parameters, + AsyncRestClient consumerRestClient) { this.id = id; this.callbackUrl = callbackUrl; this.type = type; @@ -88,6 +101,7 @@ public class Job { } else { jobDataFilter = null; } + this.consumerRestClient = consumerRestClient; } public boolean isFilterMatch(String data) { diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java index 8a388248..e3bc61e8 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/repository/Jobs.java @@ -25,7 +25,11 @@ import java.util.HashMap; import java.util.Map; import java.util.Vector; +import org.oran.dmaapadapter.clients.AsyncRestClient; +import org.oran.dmaapadapter.clients.AsyncRestClientFactory; +import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.exceptions.ServiceException; +import org.oran.dmaapadapter.repository.Job.Parameters; import org.oran.dmaapadapter.tasks.KafkaTopicConsumers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,9 +43,12 @@ public class Jobs { private Map allJobs = new HashMap<>(); private MultiMap jobsByType = new MultiMap<>(); private final KafkaTopicConsumers kafkaConsumers; + private final AsyncRestClientFactory restclientFactory; - public Jobs(@Autowired KafkaTopicConsumers kafkaConsumers) { + public Jobs(@Autowired KafkaTopicConsumers kafkaConsumers, @Autowired ApplicationConfig applicationConfig) { this.kafkaConsumers = kafkaConsumers; + + restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig()); } public synchronized Job getJob(String id) throws ServiceException { @@ -56,7 +63,16 @@ public class Jobs { return allJobs.get(id); } - public synchronized void put(Job job) { + public void addJob(String id, String callbackUrl, InfoType type, String owner, String lastUpdated, + Parameters parameters) { + AsyncRestClient consumerRestClient = type.isUseHttpProxy() // + ? restclientFactory.createRestClientUseHttpProxy(callbackUrl) // + : restclientFactory.createRestClientNoHttpProxy(callbackUrl); + Job job = new Job(id, callbackUrl, type, owner, lastUpdated, parameters, consumerRestClient); + this.put(job); + } + + private synchronized void put(Job job) { logger.debug("Put job: {}", job.getId()); allJobs.put(job.getId(), job); jobsByType.put(job.getType().getId(), job.getId(), job); diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java index 7d557585..55a02abf 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicConsumer.java @@ -45,7 +45,6 @@ public class DmaapTopicConsumer { private final AsyncRestClient dmaapRestClient; private final InfiniteFlux infiniteSubmitter = new InfiniteFlux(); - private final AsyncRestClient consumerRestClient; protected final ApplicationConfig applicationConfig; protected final InfoType type; protected final Jobs jobs; @@ -85,8 +84,6 @@ public class DmaapTopicConsumer { AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig()); this.dmaapRestClient = restclientFactory.createRestClientNoHttpProxy(""); this.applicationConfig = applicationConfig; - this.consumerRestClient = type.isUseHttpProxy() ? restclientFactory.createRestClientUseHttpProxy("") - : restclientFactory.createRestClientNoHttpProxy(""); this.type = type; this.jobs = jobs; } @@ -108,7 +105,8 @@ public class DmaapTopicConsumer { private Mono handleDmaapErrorResponse(Throwable t) { logger.debug("error from DMAAP {} {}", t.getMessage(), type.getDmaapTopicUrl()); - return Mono.delay(TIME_BETWEEN_DMAAP_RETRIES).flatMap(notUsed -> Mono.empty()); + return Mono.delay(TIME_BETWEEN_DMAAP_RETRIES) // + .flatMap(notUsed -> Mono.empty()); } private Mono getFromMessageRouter(String topicUrl) { @@ -130,8 +128,8 @@ public class DmaapTopicConsumer { // Distibute the body to all jobs for this type return Flux.fromIterable(this.jobs.getJobsForType(this.type)) // - .doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl())) - .flatMap(job -> consumerRestClient.post(job.getCallbackUrl(), body), CONCURRENCY) // + .doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl())) // + .flatMap(job -> job.getConsumerRestClient().post("", body), CONCURRENCY) // .onErrorResume(this::handleConsumerErrorResponse); } } diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumer.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumer.java index 6079edfb..55900224 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumer.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumer.java @@ -20,7 +20,6 @@ package org.oran.dmaapadapter.tasks; -import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -29,8 +28,6 @@ import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.StringDeserializer; -import org.oran.dmaapadapter.clients.AsyncRestClient; -import org.oran.dmaapadapter.clients.AsyncRestClientFactory; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.repository.InfoType; import org.oran.dmaapadapter.repository.Job; @@ -52,7 +49,6 @@ import reactor.kafka.receiver.ReceiverOptions; @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally public class KafkaTopicConsumer { private static final Logger logger = LoggerFactory.getLogger(KafkaTopicConsumer.class); - private final AsyncRestClient consumerRestClient; private final ApplicationConfig applicationConfig; private final InfoType type; private final Many consumerDistributor; @@ -60,12 +56,9 @@ public class KafkaTopicConsumer { public KafkaTopicConsumer(ApplicationConfig applicationConfig, InfoType type) { this.applicationConfig = applicationConfig; - final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 10; + final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10; this.consumerDistributor = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE); - AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig()); - this.consumerRestClient = type.isUseHttpProxy() ? restclientFactory.createRestClientUseHttpProxy("") - : restclientFactory.createRestClientNoHttpProxy(""); this.type = type; startKafkaTopicReceiver(); } @@ -73,22 +66,23 @@ public class KafkaTopicConsumer { private Disposable startKafkaTopicReceiver() { return KafkaReceiver.create(kafkaInputProperties()) // .receive() // - .flatMap(this::onReceivedData) // + .doOnNext(this::onReceivedData) // .subscribe(null, // - throwable -> logger.error("KafkaMessageConsumer error: {}", throwable.getMessage()), // - () -> logger.warn("KafkaMessageConsumer stopped")); + throwable -> logger.error("KafkaTopicReceiver error: {}", throwable.getMessage()), // + () -> logger.warn("KafkaTopicReceiver stopped")); } - private Flux onReceivedData(ConsumerRecord input) { + private void onReceivedData(ConsumerRecord input) { logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(), input.value()); consumerDistributor.emitNext(input.value(), Sinks.EmitFailureHandler.FAIL_FAST); - return consumerDistributor.asFlux(); } public Disposable startDistributeToConsumer(Job job) { + final int CONCURRENCY = 10; // Has to be 1 to guarantee correct order. + return getMessagesFromKafka(job) // .doOnNext(data -> logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), data)) - .flatMap(body -> consumerRestClient.post(job.getCallbackUrl(), body)) // + .flatMap(body -> job.getConsumerRestClient().post("", body), CONCURRENCY) // .onErrorResume(this::handleConsumerErrorResponse) // .subscribe(null, // throwable -> logger.error("KafkaMessageConsumer error: {}", throwable.getMessage()), // @@ -99,9 +93,10 @@ public class KafkaTopicConsumer { if (job.isBuffered()) { return consumerDistributor.asFlux() // .filter(job::isFilterMatch) // - .bufferTimeout(job.getParameters().bufferTimeout.maxSize, - Duration.ofMillis(job.getParameters().bufferTimeout.maxTimeMiliseconds)) // - .flatMap(o -> Flux.just(o.toString())); + .bufferTimeout( // + job.getParameters().getBufferTimeout().getMaxSize(), // + job.getParameters().getBufferTimeout().getMaxTime()) // + .map(Object::toString); } else { return consumerDistributor.asFlux() // .filter(job::isFilterMatch); diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java index e8b236c9..7b719e3c 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java @@ -95,6 +95,7 @@ public class ProducerRegstrationTask { logger.warn("Registration of producer failed {}", t.getMessage()); } + // Returns TRUE if registration is correct private Mono checkRegistration() { final String url = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID; return restClient.get(url) // @@ -118,8 +119,8 @@ public class ProducerRegstrationTask { private Mono registerTypesAndProducer() { final int CONCURRENCY = 20; - final String producerUrl = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" - + PRODUCER_ID; + final String producerUrl = + applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID; return Flux.fromIterable(this.types.getAll()) // .doOnNext(type -> logger.info("Registering type {}", type.getId())) // diff --git a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java index 31ef970f..5ee34524 100644 --- a/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java +++ b/dmaap-adaptor-java/src/test/java/org/oran/dmaapadapter/IntegrationWithKafka.java @@ -175,8 +175,7 @@ class IntegrationWithKafka { } private Object jobParametersAsJsonObject(String filter, int maxTimeMiliseconds, int maxSize) { - Job.Parameters param = new Job.Parameters(filter, - new Job.Parameters.BufferTimeout(maxSize, maxTimeMiliseconds)); + Job.Parameters param = new Job.Parameters(filter, new Job.BufferTimeout(maxSize, maxTimeMiliseconds)); String str = gson.toJson(param); return jsonObject(str); } @@ -228,8 +227,8 @@ class IntegrationWithKafka { assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1); // Create a job - this.ecsSimulatorController.addJob(consumerJobInfo(".*", 10, 1000), JOB_ID1, restClient()); - this.ecsSimulatorController.addJob(consumerJobInfo(".*Message_1.*", 0, 0), JOB_ID2, restClient()); + this.ecsSimulatorController.addJob(consumerJobInfo(null, 10, 1000), JOB_ID1, restClient()); + this.ecsSimulatorController.addJob(consumerJobInfo("^Message_1$", 0, 0), JOB_ID2, restClient()); await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2)); final KafkaSender sender = KafkaSender.create(senderOptions()); diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/clients/AsyncRestClient.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/clients/AsyncRestClient.java index 1b8e0643..b7f23b1f 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/clients/AsyncRestClient.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/clients/AsyncRestClient.java @@ -67,96 +67,85 @@ public class AsyncRestClient { logger.debug("{} POST uri = '{}{}''", traceTag, baseUrl, uri); logger.trace("{} POST body: {}", traceTag, body); Mono bodyProducer = body != null ? Mono.just(body) : Mono.empty(); - return getWebClient() // - .flatMap(client -> { - RequestHeadersSpec request = client.post() // - .uri(uri) // - .contentType(MediaType.APPLICATION_JSON) // - .body(bodyProducer, String.class); - return retrieve(traceTag, request); - }); + + RequestHeadersSpec request = getWebClient() // + .post() // + .uri(uri) // + .contentType(MediaType.APPLICATION_JSON) // + .body(bodyProducer, String.class); + return retrieve(traceTag, request); } public Mono post(String uri, @Nullable String body) { return postForEntity(uri, body) // - .flatMap(this::toBody); + .map(this::toBody); } public Mono postWithAuthHeader(String uri, String body, String username, String password) { Object traceTag = createTraceTag(); logger.debug("{} POST (auth) uri = '{}{}''", traceTag, baseUrl, uri); logger.trace("{} POST body: {}", traceTag, body); - return getWebClient() // - .flatMap(client -> { - RequestHeadersSpec request = client.post() // - .uri(uri) // - .headers(headers -> headers.setBasicAuth(username, password)) // - .contentType(MediaType.APPLICATION_JSON) // - .bodyValue(body); - return retrieve(traceTag, request) // - .flatMap(this::toBody); - }); + + RequestHeadersSpec request = getWebClient() // + .post() // + .uri(uri) // + .headers(headers -> headers.setBasicAuth(username, password)) // + .contentType(MediaType.APPLICATION_JSON) // + .bodyValue(body); + return retrieve(traceTag, request) // + .map(this::toBody); } public Mono> putForEntity(String uri, String body) { Object traceTag = createTraceTag(); logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri); logger.trace("{} PUT body: {}", traceTag, body); - return getWebClient() // - .flatMap(client -> { - RequestHeadersSpec request = client.put() // - .uri(uri) // - .contentType(MediaType.APPLICATION_JSON) // - .bodyValue(body); - return retrieve(traceTag, request); - }); + + RequestHeadersSpec request = getWebClient() // + .put() // + .uri(uri) // + .contentType(MediaType.APPLICATION_JSON) // + .bodyValue(body); + return retrieve(traceTag, request); } public Mono> putForEntity(String uri) { Object traceTag = createTraceTag(); logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri); logger.trace("{} PUT body: ", traceTag); - return getWebClient() // - .flatMap(client -> { - RequestHeadersSpec request = client.put() // - .uri(uri); - return retrieve(traceTag, request); - }); + RequestHeadersSpec request = getWebClient() // + .put() // + .uri(uri); + return retrieve(traceTag, request); } public Mono put(String uri, String body) { return putForEntity(uri, body) // - .flatMap(this::toBody); + .map(this::toBody); } public Mono> getForEntity(String uri) { Object traceTag = createTraceTag(); logger.debug("{} GET uri = '{}{}''", traceTag, baseUrl, uri); - return getWebClient() // - .flatMap(client -> { - RequestHeadersSpec request = client.get().uri(uri); - return retrieve(traceTag, request); - }); + RequestHeadersSpec request = getWebClient().get().uri(uri); + return retrieve(traceTag, request); } public Mono get(String uri) { return getForEntity(uri) // - .flatMap(this::toBody); + .map(this::toBody); } public Mono> deleteForEntity(String uri) { Object traceTag = createTraceTag(); logger.debug("{} DELETE uri = '{}{}''", traceTag, baseUrl, uri); - return getWebClient() // - .flatMap(client -> { - RequestHeadersSpec request = client.delete().uri(uri); - return retrieve(traceTag, request); - }); + RequestHeadersSpec request = getWebClient().delete().uri(uri); + return retrieve(traceTag, request); } public Mono delete(String uri) { return deleteForEntity(uri) // - .flatMap(this::toBody); + .map(this::toBody); } private Mono> retrieve(Object traceTag, RequestHeadersSpec request) { @@ -185,11 +174,11 @@ public class AsyncRestClient { } } - private Mono toBody(ResponseEntity entity) { + private String toBody(ResponseEntity entity) { if (entity.getBody() == null) { - return Mono.just(""); + return ""; } else { - return Mono.just(entity.getBody()); + return entity.getBody(); } } @@ -229,11 +218,10 @@ public class AsyncRestClient { .build(); } - private Mono getWebClient() { + private WebClient getWebClient() { if (this.webClient == null) { this.webClient = buildWebClient(baseUrl); } - return Mono.just(buildWebClient(baseUrl)); + return this.webClient; } - } diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/a1e/A1eController.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/a1e/A1eController.java index 9609e276..8c056fc6 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/a1e/A1eController.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/a1e/A1eController.java @@ -298,7 +298,7 @@ public class A1eController { return validatePutEiJob(eiJobId, eiJobObject) // .flatMap(this::startEiJob) // .doOnNext(newEiJob -> this.eiJobs.put(newEiJob)) // - .flatMap(newEiJob -> Mono.just(new ResponseEntity<>(isNewJob ? HttpStatus.CREATED : HttpStatus.OK))) + .map(newEiJob -> new ResponseEntity<>(isNewJob ? HttpStatus.CREATED : HttpStatus.OK)) // .onErrorResume(throwable -> Mono.just(ErrorResponse.create(throwable, HttpStatus.INTERNAL_SERVER_ERROR))); } @@ -306,7 +306,7 @@ public class A1eController { return this.producerCallbacks.startInfoSubscriptionJob(newEiJob, infoProducers) // .doOnNext(noOfAcceptingProducers -> this.logger.debug( "Started EI job {}, number of activated producers: {}", newEiJob.getId(), noOfAcceptingProducers)) // - .flatMap(noOfAcceptingProducers -> Mono.just(newEiJob)); + .map(noOfAcceptingProducers -> newEiJob); } private Mono validatePutEiJob(String eiJobId, A1eEiJobInfo eiJobInfo) { diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/r1consumer/ConsumerController.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/r1consumer/ConsumerController.java index 47a4a2ec..b108380b 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/r1consumer/ConsumerController.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/r1consumer/ConsumerController.java @@ -308,7 +308,7 @@ public class ConsumerController { return validatePutInfoJob(jobId, informationJobObject, performTypeCheck) // .flatMap(this::startInfoSubscriptionJob) // .doOnNext(this.infoJobs::put) // - .flatMap(newEiJob -> Mono.just(new ResponseEntity<>(isNewJob ? HttpStatus.CREATED : HttpStatus.OK))) + .map(newEiJob -> new ResponseEntity<>(isNewJob ? HttpStatus.CREATED : HttpStatus.OK)) // .onErrorResume(throwable -> Mono.just(ErrorResponse.create(throwable, HttpStatus.NOT_FOUND))); } @@ -441,7 +441,7 @@ public class ConsumerController { return this.producerCallbacks.startInfoSubscriptionJob(newInfoJob, infoProducers) // .doOnNext(noOfAcceptingProducers -> this.logger.debug("Started job {}, number of activated producers: {}", newInfoJob.getId(), noOfAcceptingProducers)) // - .flatMap(noOfAcceptingProducers -> Mono.just(newInfoJob)); + .map(noOfAcceptingProducers -> newInfoJob); } private Mono validatePutInfoJob(String jobId, ConsumerJobInfo jobInfo, boolean performTypeCheck) { diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/r1producer/ProducerCallbacks.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/r1producer/ProducerCallbacks.java index a97bdf66..558ae799 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/r1producer/ProducerCallbacks.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/r1producer/ProducerCallbacks.java @@ -84,7 +84,7 @@ public class ProducerCallbacks { return Flux.fromIterable(getProducersForJob(infoJob, infoProducers)) // .flatMap(infoProducer -> startInfoJob(infoProducer, infoJob, retrySpec)) // .collectList() // - .flatMap(okResponses -> Mono.just(Integer.valueOf(okResponses.size()))); // + .map(okResponses -> Integer.valueOf(okResponses.size())); // } /** diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypeSubscriptions.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypeSubscriptions.java index 65978e15..533199ff 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypeSubscriptions.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypeSubscriptions.java @@ -222,8 +222,7 @@ public class InfoTypeSubscriptions { private Mono notifySubscriber(Function> notifyFunc, SubscriptionInfo subscriptionInfo) { Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1)); - return Mono.just(1) // - .flatMap(notUsed -> notifyFunc.apply(subscriptionInfo)) // + return notifyFunc.apply(subscriptionInfo) // .retryWhen(retrySpec) // .onErrorResume(throwable -> { logger.warn("Consumer callback failed {}, removing subscription {}", throwable.getMessage(), diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java index db7c29ba..08c5fc85 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java @@ -80,7 +80,7 @@ public class ProducerSupervision { })// .doOnNext(response -> handleRespondingProducer(response, producer)) .flatMap(response -> checkProducerJobs(producer)) // - .flatMap(responses -> Mono.just(producer)); + .map(responses -> producer); } private Mono checkProducerJobs(InfoProducer producer) { diff --git a/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/ApplicationTest.java b/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/ApplicationTest.java index 44184296..8c8ce5f1 100644 --- a/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/ApplicationTest.java +++ b/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/ApplicationTest.java @@ -1028,7 +1028,7 @@ class ApplicationTest { // Test that subscriptions are removed for a unresponsive consumer // PUT a subscription with a junk callback - final ConsumerTypeSubscriptionInfo info = new ConsumerTypeSubscriptionInfo(baseUrl() + "JUNK", "owner"); + final ConsumerTypeSubscriptionInfo info = new ConsumerTypeSubscriptionInfo(baseUrl() + "/JUNK", "owner"); String body = gson.toJson(info); restClient().putForEntity(typeSubscriptionUrl() + "/subscriptionId", body).block(); assertThat(this.infoTypeSubscriptions.size()).isEqualTo(1);