private static final Duration TIME_BETWEEN_DMAAP_RETRIES = Duration.ofSeconds(10);
private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumer.class);
private final ApplicationConfig applicationConfig;
- private final AsyncRestClient restClient;
+ private final AsyncRestClient dmaapRestClient;
+ private final AsyncRestClient consumerRestClient;
private final InfoType type;
private final Jobs jobs;
private final InfiniteFlux infiniteSubmitter = new InfiniteFlux();
public DmaapMessageConsumer(ApplicationConfig applicationConfig, InfoType type, Jobs jobs) {
this.applicationConfig = applicationConfig;
AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
- this.restClient = restclientFactory.createRestClientNoHttpProxy("");
+ this.dmaapRestClient = restclientFactory.createRestClientNoHttpProxy("");
+ this.consumerRestClient = type.isUseHttpProxy() ? restclientFactory.createRestClientUseHttpProxy("")
+ : restclientFactory.createRestClientNoHttpProxy("");
this.type = type;
this.jobs = jobs;
}
public void start() {
- infiniteSubmitter.stop();
-
- createTask().subscribe(//
- value -> logger.debug("DmaapMessageConsumer next: {}", value), //
- throwable -> logger.error("DmaapMessageConsumer error: {}", throwable.getMessage()), //
- () -> logger.warn("DmaapMessageConsumer stopped") //
- );
- }
-
- protected Flux<String> createTask() {
- final int CONCURRENCY = 5;
- return infiniteSubmitter.start() //
+ infiniteSubmitter.start() //
.flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) //
- .doOnNext(message -> logger.debug("Message Reveived from DMAAP : {}", message)) //
- .flatMap(this::handleReceivedMessage, CONCURRENCY);
+ .flatMap(this::handleReceivedMessage, 5) //
+ .subscribe(//
+ value -> logger.debug("DmaapMessageConsumer next: {} {}", value, type.getId()), //
+ throwable -> logger.error("DmaapMessageConsumer error: {}", throwable.getMessage()), //
+ () -> logger.warn("DmaapMessageConsumer stopped {}", type.getId()) //
+ );
}
private String getDmaapUrl() {
+
return this.applicationConfig.getDmaapBaseUrl() + type.getDmaapTopicUrl();
}
- private Mono<String> handleErrorResponse(Throwable t) {
- logger.debug("error from DMAAP {}", t.getMessage());
+ private Mono<String> handleDmaapErrorResponse(Throwable t) {
+ logger.debug("error from DMAAP {} {}", t.getMessage(), type.getDmaapTopicUrl());
return Mono.delay(TIME_BETWEEN_DMAAP_RETRIES) //
.flatMap(notUsed -> Mono.empty());
}
+ private Mono<String> handleConsumerErrorResponse(Throwable t) {
+ logger.warn("error from CONSUMER {}", t.getMessage());
+ return Mono.empty();
+ }
+
protected Mono<String> getFromMessageRouter(String topicUrl) {
logger.trace("getFromMessageRouter {}", topicUrl);
- return restClient.get(topicUrl) //
- .onErrorResume(this::handleErrorResponse);
+ return dmaapRestClient.get(topicUrl) //
+ .filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away.
+ .doOnNext(message -> logger.debug("Message from DMAAP topic: {} : {}", topicUrl, message)) //
+ .onErrorResume(this::handleDmaapErrorResponse); //
}
protected Flux<String> handleReceivedMessage(String body) {
// Distibute the body to all jobs for this type
return Flux.fromIterable(this.jobs.getJobsForType(this.type)) //
.doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl()))
- .flatMap(job -> restClient.post(job.getCallbackUrl(), body), CONCURRENCY) //
- .onErrorResume(this::handleErrorResponse);
+ .flatMap(job -> consumerRestClient.post(job.getCallbackUrl(), body), CONCURRENCY) //
+ .onErrorResume(this::handleConsumerErrorResponse);
}
}