Simplifications and optimizations.
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-597
Change-Id: Iae092ab0840571dc497513145a18cf5b23e11e32
logger.debug("{} POST uri = '{}{}''", traceTag, baseUrl, uri);
logger.trace("{} POST body: {}", traceTag, body);
Mono<String> bodyProducer = body != null ? Mono.just(body) : Mono.empty();
- return getWebClient() //
- .flatMap(client -> {
- RequestHeadersSpec<?> request = client.post() //
- .uri(uri) //
- .contentType(MediaType.APPLICATION_JSON) //
- .body(bodyProducer, String.class);
- return retrieve(traceTag, request);
- });
+
+ RequestHeadersSpec<?> request = getWebClient() //
+ .post() //
+ .uri(uri) //
+ .contentType(MediaType.APPLICATION_JSON) //
+ .body(bodyProducer, String.class);
+ return retrieve(traceTag, request);
}
public Mono<String> post(String uri, @Nullable String body) {
return postForEntity(uri, body) //
- .flatMap(this::toBody);
+ .map(this::toBody);
}
public Mono<String> postWithAuthHeader(String uri, String body, String username, String password) {
Object traceTag = createTraceTag();
logger.debug("{} POST (auth) uri = '{}{}''", traceTag, baseUrl, uri);
logger.trace("{} POST body: {}", traceTag, body);
- return getWebClient() //
- .flatMap(client -> {
- RequestHeadersSpec<?> request = client.post() //
- .uri(uri) //
- .headers(headers -> headers.setBasicAuth(username, password)) //
- .contentType(MediaType.APPLICATION_JSON) //
- .bodyValue(body);
- return retrieve(traceTag, request) //
- .flatMap(this::toBody);
- });
+
+ RequestHeadersSpec<?> request = getWebClient() //
+ .post() //
+ .uri(uri) //
+ .headers(headers -> headers.setBasicAuth(username, password)) //
+ .contentType(MediaType.APPLICATION_JSON) //
+ .bodyValue(body);
+ return retrieve(traceTag, request) //
+ .map(this::toBody);
}
public Mono<ResponseEntity<String>> putForEntity(String uri, String body) {
Object traceTag = createTraceTag();
logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri);
logger.trace("{} PUT body: {}", traceTag, body);
- return getWebClient() //
- .flatMap(client -> {
- RequestHeadersSpec<?> request = client.put() //
- .uri(uri) //
- .contentType(MediaType.APPLICATION_JSON) //
- .bodyValue(body);
- return retrieve(traceTag, request);
- });
+
+ RequestHeadersSpec<?> request = getWebClient() //
+ .put() //
+ .uri(uri) //
+ .contentType(MediaType.APPLICATION_JSON) //
+ .bodyValue(body);
+ return retrieve(traceTag, request);
}
public Mono<ResponseEntity<String>> putForEntity(String uri) {
Object traceTag = createTraceTag();
logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri);
logger.trace("{} PUT body: <empty>", traceTag);
- return getWebClient() //
- .flatMap(client -> {
- RequestHeadersSpec<?> request = client.put() //
- .uri(uri);
- return retrieve(traceTag, request);
- });
+ RequestHeadersSpec<?> request = getWebClient() //
+ .put() //
+ .uri(uri);
+ return retrieve(traceTag, request);
}
public Mono<String> put(String uri, String body) {
return putForEntity(uri, body) //
- .flatMap(this::toBody);
+ .map(this::toBody);
}
public Mono<ResponseEntity<String>> getForEntity(String uri) {
Object traceTag = createTraceTag();
logger.debug("{} GET uri = '{}{}''", traceTag, baseUrl, uri);
- return getWebClient() //
- .flatMap(client -> {
- RequestHeadersSpec<?> request = client.get().uri(uri);
- return retrieve(traceTag, request);
- });
+ RequestHeadersSpec<?> request = getWebClient().get().uri(uri);
+ return retrieve(traceTag, request);
}
public Mono<String> get(String uri) {
return getForEntity(uri) //
- .flatMap(this::toBody);
+ .map(this::toBody);
}
public Mono<ResponseEntity<String>> deleteForEntity(String uri) {
Object traceTag = createTraceTag();
logger.debug("{} DELETE uri = '{}{}''", traceTag, baseUrl, uri);
- return getWebClient() //
- .flatMap(client -> {
- RequestHeadersSpec<?> request = client.delete().uri(uri);
- return retrieve(traceTag, request);
- });
+ RequestHeadersSpec<?> request = getWebClient().delete().uri(uri);
+ return retrieve(traceTag, request);
}
public Mono<String> delete(String uri) {
return deleteForEntity(uri) //
- .flatMap(this::toBody);
+ .map(this::toBody);
}
private Mono<ResponseEntity<String>> retrieve(Object traceTag, RequestHeadersSpec<?> request) {
}
}
- private Mono<String> toBody(ResponseEntity<String> entity) {
+ private String toBody(ResponseEntity<String> entity) {
if (entity.getBody() == null) {
- return Mono.just("");
+ return "";
} else {
- return Mono.just(entity.getBody());
+ return entity.getBody();
}
}
.build();
}
- private Mono<WebClient> getWebClient() {
+ private WebClient getWebClient() {
if (this.webClient == null) {
this.webClient = buildWebClient(baseUrl);
}
- return Mono.just(buildWebClient(baseUrl));
+ return this.webClient;
}
}
@RequestBody String body) {
try {
ProducerJobInfo request = gson.fromJson(body, ProducerJobInfo.class);
-
- logger.info("Job started callback {}", request.id);
- Job job = new Job(request.id, request.targetUri, types.getType(request.typeId), request.owner,
+ logger.debug("Job started callback {}", request.id);
+ this.jobs.addJob(request.id, request.targetUri, types.getType(request.typeId), request.owner,
request.lastUpdated, toJobParameters(request.jobData));
- this.jobs.put(job);
return new ResponseEntity<>(HttpStatus.OK);
} catch (Exception e) {
return ErrorResponse.create(e, HttpStatus.NOT_FOUND);
public ResponseEntity<Object> jobDeletedCallback( //
@PathVariable("infoJobId") String infoJobId) {
- logger.info("Job deleted callback {}", infoJobId);
+ logger.debug("Job deleted callback {}", infoJobId);
this.jobs.remove(infoJobId);
return new ResponseEntity<>(HttpStatus.OK);
}
package org.oran.dmaapadapter.repository;
+import java.time.Duration;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import lombok.Getter;
import org.immutables.gson.Gson;
+import org.oran.dmaapadapter.clients.AsyncRestClient;
public class Job {
@Gson.TypeAdapters
public static class Parameters {
- public String filter;
- public BufferTimeout bufferTimeout;
+ @Getter
+ private String filter;
+ @Getter
+ private BufferTimeout bufferTimeout;
- public Parameters() {
- }
+ public Parameters() {}
public Parameters(String filter, BufferTimeout bufferTimeout) {
this.filter = filter;
this.bufferTimeout = bufferTimeout;
}
+ }
+
+ @Gson.TypeAdapters
+ public static class BufferTimeout {
+ public BufferTimeout(int maxSize, int maxTimeMiliseconds) {
+ this.maxSize = maxSize;
+ this.maxTimeMiliseconds = maxTimeMiliseconds;
+ }
+
+ public BufferTimeout() {}
- public static class BufferTimeout {
- public BufferTimeout(int maxSize, int maxTimeMiliseconds) {
- this.maxSize = maxSize;
- this.maxTimeMiliseconds = maxTimeMiliseconds;
- }
+ @Getter
+ private int maxSize;
- public BufferTimeout() {
- }
+ private int maxTimeMiliseconds;
- public int maxSize;
- public int maxTimeMiliseconds;
+ public Duration getMaxTime() {
+ return Duration.ofMillis(maxTimeMiliseconds);
}
}
private final Pattern jobDataFilter;
- public Job(String id, String callbackUrl, InfoType type, String owner, String lastUpdated, Parameters parameters) {
+ @Getter
+ private final AsyncRestClient consumerRestClient;
+
+ public Job(String id, String callbackUrl, InfoType type, String owner, String lastUpdated, Parameters parameters,
+ AsyncRestClient consumerRestClient) {
this.id = id;
this.callbackUrl = callbackUrl;
this.type = type;
} else {
jobDataFilter = null;
}
+ this.consumerRestClient = consumerRestClient;
}
public boolean isFilterMatch(String data) {
import java.util.Map;
import java.util.Vector;
+import org.oran.dmaapadapter.clients.AsyncRestClient;
+import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
+import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.exceptions.ServiceException;
+import org.oran.dmaapadapter.repository.Job.Parameters;
import org.oran.dmaapadapter.tasks.KafkaTopicConsumers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private Map<String, Job> allJobs = new HashMap<>();
private MultiMap<Job> jobsByType = new MultiMap<>();
private final KafkaTopicConsumers kafkaConsumers;
+ private final AsyncRestClientFactory restclientFactory;
- public Jobs(@Autowired KafkaTopicConsumers kafkaConsumers) {
+ public Jobs(@Autowired KafkaTopicConsumers kafkaConsumers, @Autowired ApplicationConfig applicationConfig) {
this.kafkaConsumers = kafkaConsumers;
+
+ restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
}
public synchronized Job getJob(String id) throws ServiceException {
return allJobs.get(id);
}
- public synchronized void put(Job job) {
+ public void addJob(String id, String callbackUrl, InfoType type, String owner, String lastUpdated,
+ Parameters parameters) {
+ AsyncRestClient consumerRestClient = type.isUseHttpProxy() //
+ ? restclientFactory.createRestClientUseHttpProxy(callbackUrl) //
+ : restclientFactory.createRestClientNoHttpProxy(callbackUrl);
+ Job job = new Job(id, callbackUrl, type, owner, lastUpdated, parameters, consumerRestClient);
+ this.put(job);
+ }
+
+ private synchronized void put(Job job) {
logger.debug("Put job: {}", job.getId());
allJobs.put(job.getId(), job);
jobsByType.put(job.getType().getId(), job.getId(), job);
private final AsyncRestClient dmaapRestClient;
private final InfiniteFlux infiniteSubmitter = new InfiniteFlux();
- private final AsyncRestClient consumerRestClient;
protected final ApplicationConfig applicationConfig;
protected final InfoType type;
protected final Jobs jobs;
AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
this.dmaapRestClient = restclientFactory.createRestClientNoHttpProxy("");
this.applicationConfig = applicationConfig;
- this.consumerRestClient = type.isUseHttpProxy() ? restclientFactory.createRestClientUseHttpProxy("")
- : restclientFactory.createRestClientNoHttpProxy("");
this.type = type;
this.jobs = jobs;
}
private Mono<String> handleDmaapErrorResponse(Throwable t) {
logger.debug("error from DMAAP {} {}", t.getMessage(), type.getDmaapTopicUrl());
- return Mono.delay(TIME_BETWEEN_DMAAP_RETRIES).flatMap(notUsed -> Mono.empty());
+ return Mono.delay(TIME_BETWEEN_DMAAP_RETRIES) //
+ .flatMap(notUsed -> Mono.empty());
}
private Mono<String> getFromMessageRouter(String topicUrl) {
// Distibute the body to all jobs for this type
return Flux.fromIterable(this.jobs.getJobsForType(this.type)) //
- .doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl()))
- .flatMap(job -> consumerRestClient.post(job.getCallbackUrl(), body), CONCURRENCY) //
+ .doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl())) //
+ .flatMap(job -> job.getConsumerRestClient().post("", body), CONCURRENCY) //
.onErrorResume(this::handleConsumerErrorResponse);
}
}
package org.oran.dmaapadapter.tasks;
-import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
-import org.oran.dmaapadapter.clients.AsyncRestClient;
-import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.repository.InfoType;
import org.oran.dmaapadapter.repository.Job;
@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
public class KafkaTopicConsumer {
private static final Logger logger = LoggerFactory.getLogger(KafkaTopicConsumer.class);
- private final AsyncRestClient consumerRestClient;
private final ApplicationConfig applicationConfig;
private final InfoType type;
private final Many<String> consumerDistributor;
public KafkaTopicConsumer(ApplicationConfig applicationConfig, InfoType type) {
this.applicationConfig = applicationConfig;
- final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 10;
+ final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10;
this.consumerDistributor = Sinks.many().multicast().onBackpressureBuffer(CONSUMER_BACKPRESSURE_BUFFER_SIZE);
- AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
- this.consumerRestClient = type.isUseHttpProxy() ? restclientFactory.createRestClientUseHttpProxy("")
- : restclientFactory.createRestClientNoHttpProxy("");
this.type = type;
startKafkaTopicReceiver();
}
private Disposable startKafkaTopicReceiver() {
return KafkaReceiver.create(kafkaInputProperties()) //
.receive() //
- .flatMap(this::onReceivedData) //
+ .doOnNext(this::onReceivedData) //
.subscribe(null, //
- throwable -> logger.error("KafkaMessageConsumer error: {}", throwable.getMessage()), //
- () -> logger.warn("KafkaMessageConsumer stopped"));
+ throwable -> logger.error("KafkaTopicReceiver error: {}", throwable.getMessage()), //
+ () -> logger.warn("KafkaTopicReceiver stopped"));
}
- private Flux<String> onReceivedData(ConsumerRecord<Integer, String> input) {
+ private void onReceivedData(ConsumerRecord<Integer, String> input) {
logger.debug("Received from kafka topic: {} :{}", this.type.getKafkaInputTopic(), input.value());
consumerDistributor.emitNext(input.value(), Sinks.EmitFailureHandler.FAIL_FAST);
- return consumerDistributor.asFlux();
}
public Disposable startDistributeToConsumer(Job job) {
+ final int CONCURRENCY = 10; // Has to be 1 to guarantee correct order.
+
return getMessagesFromKafka(job) //
.doOnNext(data -> logger.debug("Sending to consumer {} {} {}", job.getId(), job.getCallbackUrl(), data))
- .flatMap(body -> consumerRestClient.post(job.getCallbackUrl(), body)) //
+ .flatMap(body -> job.getConsumerRestClient().post("", body), CONCURRENCY) //
.onErrorResume(this::handleConsumerErrorResponse) //
.subscribe(null, //
throwable -> logger.error("KafkaMessageConsumer error: {}", throwable.getMessage()), //
if (job.isBuffered()) {
return consumerDistributor.asFlux() //
.filter(job::isFilterMatch) //
- .bufferTimeout(job.getParameters().bufferTimeout.maxSize,
- Duration.ofMillis(job.getParameters().bufferTimeout.maxTimeMiliseconds)) //
- .flatMap(o -> Flux.just(o.toString()));
+ .bufferTimeout( //
+ job.getParameters().getBufferTimeout().getMaxSize(), //
+ job.getParameters().getBufferTimeout().getMaxTime()) //
+ .map(Object::toString);
} else {
return consumerDistributor.asFlux() //
.filter(job::isFilterMatch);
logger.warn("Registration of producer failed {}", t.getMessage());
}
+ // Returns TRUE if registration is correct
private Mono<Boolean> checkRegistration() {
final String url = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
return restClient.get(url) //
private Mono<String> registerTypesAndProducer() {
final int CONCURRENCY = 20;
- final String producerUrl = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/"
- + PRODUCER_ID;
+ final String producerUrl =
+ applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
return Flux.fromIterable(this.types.getAll()) //
.doOnNext(type -> logger.info("Registering type {}", type.getId())) //
}
private Object jobParametersAsJsonObject(String filter, int maxTimeMiliseconds, int maxSize) {
- Job.Parameters param = new Job.Parameters(filter,
- new Job.Parameters.BufferTimeout(maxSize, maxTimeMiliseconds));
+ Job.Parameters param = new Job.Parameters(filter, new Job.BufferTimeout(maxSize, maxTimeMiliseconds));
String str = gson.toJson(param);
return jsonObject(str);
}
assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(1);
// Create a job
- this.ecsSimulatorController.addJob(consumerJobInfo(".*", 10, 1000), JOB_ID1, restClient());
- this.ecsSimulatorController.addJob(consumerJobInfo(".*Message_1.*", 0, 0), JOB_ID2, restClient());
+ this.ecsSimulatorController.addJob(consumerJobInfo(null, 10, 1000), JOB_ID1, restClient());
+ this.ecsSimulatorController.addJob(consumerJobInfo("^Message_1$", 0, 0), JOB_ID2, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
final KafkaSender<Integer, String> sender = KafkaSender.create(senderOptions());
logger.debug("{} POST uri = '{}{}''", traceTag, baseUrl, uri);
logger.trace("{} POST body: {}", traceTag, body);
Mono<String> bodyProducer = body != null ? Mono.just(body) : Mono.empty();
- return getWebClient() //
- .flatMap(client -> {
- RequestHeadersSpec<?> request = client.post() //
- .uri(uri) //
- .contentType(MediaType.APPLICATION_JSON) //
- .body(bodyProducer, String.class);
- return retrieve(traceTag, request);
- });
+
+ RequestHeadersSpec<?> request = getWebClient() //
+ .post() //
+ .uri(uri) //
+ .contentType(MediaType.APPLICATION_JSON) //
+ .body(bodyProducer, String.class);
+ return retrieve(traceTag, request);
}
public Mono<String> post(String uri, @Nullable String body) {
return postForEntity(uri, body) //
- .flatMap(this::toBody);
+ .map(this::toBody);
}
public Mono<String> postWithAuthHeader(String uri, String body, String username, String password) {
Object traceTag = createTraceTag();
logger.debug("{} POST (auth) uri = '{}{}''", traceTag, baseUrl, uri);
logger.trace("{} POST body: {}", traceTag, body);
- return getWebClient() //
- .flatMap(client -> {
- RequestHeadersSpec<?> request = client.post() //
- .uri(uri) //
- .headers(headers -> headers.setBasicAuth(username, password)) //
- .contentType(MediaType.APPLICATION_JSON) //
- .bodyValue(body);
- return retrieve(traceTag, request) //
- .flatMap(this::toBody);
- });
+
+ RequestHeadersSpec<?> request = getWebClient() //
+ .post() //
+ .uri(uri) //
+ .headers(headers -> headers.setBasicAuth(username, password)) //
+ .contentType(MediaType.APPLICATION_JSON) //
+ .bodyValue(body);
+ return retrieve(traceTag, request) //
+ .map(this::toBody);
}
public Mono<ResponseEntity<String>> putForEntity(String uri, String body) {
Object traceTag = createTraceTag();
logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri);
logger.trace("{} PUT body: {}", traceTag, body);
- return getWebClient() //
- .flatMap(client -> {
- RequestHeadersSpec<?> request = client.put() //
- .uri(uri) //
- .contentType(MediaType.APPLICATION_JSON) //
- .bodyValue(body);
- return retrieve(traceTag, request);
- });
+
+ RequestHeadersSpec<?> request = getWebClient() //
+ .put() //
+ .uri(uri) //
+ .contentType(MediaType.APPLICATION_JSON) //
+ .bodyValue(body);
+ return retrieve(traceTag, request);
}
public Mono<ResponseEntity<String>> putForEntity(String uri) {
Object traceTag = createTraceTag();
logger.debug("{} PUT uri = '{}{}''", traceTag, baseUrl, uri);
logger.trace("{} PUT body: <empty>", traceTag);
- return getWebClient() //
- .flatMap(client -> {
- RequestHeadersSpec<?> request = client.put() //
- .uri(uri);
- return retrieve(traceTag, request);
- });
+ RequestHeadersSpec<?> request = getWebClient() //
+ .put() //
+ .uri(uri);
+ return retrieve(traceTag, request);
}
public Mono<String> put(String uri, String body) {
return putForEntity(uri, body) //
- .flatMap(this::toBody);
+ .map(this::toBody);
}
public Mono<ResponseEntity<String>> getForEntity(String uri) {
Object traceTag = createTraceTag();
logger.debug("{} GET uri = '{}{}''", traceTag, baseUrl, uri);
- return getWebClient() //
- .flatMap(client -> {
- RequestHeadersSpec<?> request = client.get().uri(uri);
- return retrieve(traceTag, request);
- });
+ RequestHeadersSpec<?> request = getWebClient().get().uri(uri);
+ return retrieve(traceTag, request);
}
public Mono<String> get(String uri) {
return getForEntity(uri) //
- .flatMap(this::toBody);
+ .map(this::toBody);
}
public Mono<ResponseEntity<String>> deleteForEntity(String uri) {
Object traceTag = createTraceTag();
logger.debug("{} DELETE uri = '{}{}''", traceTag, baseUrl, uri);
- return getWebClient() //
- .flatMap(client -> {
- RequestHeadersSpec<?> request = client.delete().uri(uri);
- return retrieve(traceTag, request);
- });
+ RequestHeadersSpec<?> request = getWebClient().delete().uri(uri);
+ return retrieve(traceTag, request);
}
public Mono<String> delete(String uri) {
return deleteForEntity(uri) //
- .flatMap(this::toBody);
+ .map(this::toBody);
}
private Mono<ResponseEntity<String>> retrieve(Object traceTag, RequestHeadersSpec<?> request) {
}
}
- private Mono<String> toBody(ResponseEntity<String> entity) {
+ private String toBody(ResponseEntity<String> entity) {
if (entity.getBody() == null) {
- return Mono.just("");
+ return "";
} else {
- return Mono.just(entity.getBody());
+ return entity.getBody();
}
}
.build();
}
- private Mono<WebClient> getWebClient() {
+ private WebClient getWebClient() {
if (this.webClient == null) {
this.webClient = buildWebClient(baseUrl);
}
- return Mono.just(buildWebClient(baseUrl));
+ return this.webClient;
}
-
}
return validatePutEiJob(eiJobId, eiJobObject) //
.flatMap(this::startEiJob) //
.doOnNext(newEiJob -> this.eiJobs.put(newEiJob)) //
- .flatMap(newEiJob -> Mono.just(new ResponseEntity<>(isNewJob ? HttpStatus.CREATED : HttpStatus.OK)))
+ .map(newEiJob -> new ResponseEntity<>(isNewJob ? HttpStatus.CREATED : HttpStatus.OK)) //
.onErrorResume(throwable -> Mono.just(ErrorResponse.create(throwable, HttpStatus.INTERNAL_SERVER_ERROR)));
}
return this.producerCallbacks.startInfoSubscriptionJob(newEiJob, infoProducers) //
.doOnNext(noOfAcceptingProducers -> this.logger.debug(
"Started EI job {}, number of activated producers: {}", newEiJob.getId(), noOfAcceptingProducers)) //
- .flatMap(noOfAcceptingProducers -> Mono.just(newEiJob));
+ .map(noOfAcceptingProducers -> newEiJob);
}
private Mono<InfoJob> validatePutEiJob(String eiJobId, A1eEiJobInfo eiJobInfo) {
return validatePutInfoJob(jobId, informationJobObject, performTypeCheck) //
.flatMap(this::startInfoSubscriptionJob) //
.doOnNext(this.infoJobs::put) //
- .flatMap(newEiJob -> Mono.just(new ResponseEntity<>(isNewJob ? HttpStatus.CREATED : HttpStatus.OK)))
+ .map(newEiJob -> new ResponseEntity<>(isNewJob ? HttpStatus.CREATED : HttpStatus.OK)) //
.onErrorResume(throwable -> Mono.just(ErrorResponse.create(throwable, HttpStatus.NOT_FOUND)));
}
return this.producerCallbacks.startInfoSubscriptionJob(newInfoJob, infoProducers) //
.doOnNext(noOfAcceptingProducers -> this.logger.debug("Started job {}, number of activated producers: {}",
newInfoJob.getId(), noOfAcceptingProducers)) //
- .flatMap(noOfAcceptingProducers -> Mono.just(newInfoJob));
+ .map(noOfAcceptingProducers -> newInfoJob);
}
private Mono<InfoJob> validatePutInfoJob(String jobId, ConsumerJobInfo jobInfo, boolean performTypeCheck) {
return Flux.fromIterable(getProducersForJob(infoJob, infoProducers)) //
.flatMap(infoProducer -> startInfoJob(infoProducer, infoJob, retrySpec)) //
.collectList() //
- .flatMap(okResponses -> Mono.just(Integer.valueOf(okResponses.size()))); //
+ .map(okResponses -> Integer.valueOf(okResponses.size())); //
}
/**
private Mono<String> notifySubscriber(Function<? super SubscriptionInfo, Mono<String>> notifyFunc,
SubscriptionInfo subscriptionInfo) {
Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1));
- return Mono.just(1) //
- .flatMap(notUsed -> notifyFunc.apply(subscriptionInfo)) //
+ return notifyFunc.apply(subscriptionInfo) //
.retryWhen(retrySpec) //
.onErrorResume(throwable -> {
logger.warn("Consumer callback failed {}, removing subscription {}", throwable.getMessage(),
})//
.doOnNext(response -> handleRespondingProducer(response, producer))
.flatMap(response -> checkProducerJobs(producer)) //
- .flatMap(responses -> Mono.just(producer));
+ .map(responses -> producer);
}
private Mono<?> checkProducerJobs(InfoProducer producer) {
// Test that subscriptions are removed for a unresponsive consumer
// PUT a subscription with a junk callback
- final ConsumerTypeSubscriptionInfo info = new ConsumerTypeSubscriptionInfo(baseUrl() + "JUNK", "owner");
+ final ConsumerTypeSubscriptionInfo info = new ConsumerTypeSubscriptionInfo(baseUrl() + "/JUNK", "owner");
String body = gson.toJson(info);
restClient().putForEntity(typeSubscriptionUrl() + "/subscriptionId", body).block();
assertThat(this.infoTypeSubscriptions.size()).isEqualTo(1);