}
public void start() {
- infiniteSubmitter.stop();
-
- createTask().subscribe(//
- value -> logger.debug("DmaapMessageConsumer next: {}", value), //
- throwable -> logger.error("DmaapMessageConsumer error: {}", throwable.getMessage()), //
- () -> logger.warn("DmaapMessageConsumer stopped") //
- );
- }
-
- protected Flux<String> createTask() {
- final int CONCURRENCY = 5;
- return infiniteSubmitter.start() //
+ infiniteSubmitter.start() //
.flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) //
- .filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away.
- .doOnNext(message -> logger.debug("Message Reveived from DMAAP : {}", message)) //
- .flatMap(this::handleReceivedMessage, CONCURRENCY);
+ .flatMap(this::handleReceivedMessage, 5) //
+ .subscribe(//
+ value -> logger.debug("DmaapMessageConsumer next: {} {}", value, type.getId()), //
+ throwable -> logger.error("DmaapMessageConsumer error: {}", throwable.getMessage()), //
+ () -> logger.warn("DmaapMessageConsumer stopped {}", type.getId()) //
+ );
}
private String getDmaapUrl() {
+
return this.applicationConfig.getDmaapBaseUrl() + type.getDmaapTopicUrl();
}
- private Mono<String> handleErrorResponse(Throwable t) {
- logger.debug("error from DMAAP {}", t.getMessage());
+ private Mono<String> handleDmaapErrorResponse(Throwable t) {
+ logger.debug("error from DMAAP {} {}", t.getMessage(), type.getDmaapTopicUrl());
return Mono.delay(TIME_BETWEEN_DMAAP_RETRIES) //
.flatMap(notUsed -> Mono.empty());
}
+ private Mono<String> handleConsumerErrorResponse(Throwable t) {
+ logger.warn("error from CONSUMER {}", t.getMessage());
+ return Mono.empty();
+ }
+
protected Mono<String> getFromMessageRouter(String topicUrl) {
logger.trace("getFromMessageRouter {}", topicUrl);
return restClient.get(topicUrl) //
- .onErrorResume(this::handleErrorResponse);
+ .filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away.
+ .doOnNext(message -> logger.debug("Message from DMAAP topic: {} : {}", topicUrl, message)) //
+ .onErrorResume(this::handleDmaapErrorResponse); //
}
protected Flux<String> handleReceivedMessage(String body) {
return Flux.fromIterable(this.jobs.getJobsForType(this.type)) //
.doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl()))
.flatMap(job -> restClient.post(job.getCallbackUrl(), body), CONCURRENCY) //
- .onErrorResume(this::handleErrorResponse);
+ .onErrorResume(this::handleConsumerErrorResponse);
}
}