private static final Duration TIME_BETWEEN_DMAAP_RETRIES = Duration.ofSeconds(10);
private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumer.class);
private final ApplicationConfig applicationConfig;
private static final Duration TIME_BETWEEN_DMAAP_RETRIES = Duration.ofSeconds(10);
private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumer.class);
private final ApplicationConfig applicationConfig;
public DmaapMessageConsumer(ApplicationConfig applicationConfig, InfoType type, Jobs jobs) {
this.applicationConfig = applicationConfig;
AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
public DmaapMessageConsumer(ApplicationConfig applicationConfig, InfoType type, Jobs jobs) {
this.applicationConfig = applicationConfig;
AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
- this.restClient = restclientFactory.createRestClientNoHttpProxy("");
+ this.dmaapRestClient = restclientFactory.createRestClientNoHttpProxy("");
+ this.consumerRestClient = type.isUseHttpProxy() ? restclientFactory.createRestClientUseHttpProxy("")
+ : restclientFactory.createRestClientNoHttpProxy("");
protected Mono<String> getFromMessageRouter(String topicUrl) {
logger.trace("getFromMessageRouter {}", topicUrl);
protected Mono<String> getFromMessageRouter(String topicUrl) {
logger.trace("getFromMessageRouter {}", topicUrl);
.filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away.
.doOnNext(message -> logger.debug("Message from DMAAP topic: {} : {}", topicUrl, message)) //
.onErrorResume(this::handleDmaapErrorResponse); //
.filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away.
.doOnNext(message -> logger.debug("Message from DMAAP topic: {} : {}", topicUrl, message)) //
.onErrorResume(this::handleDmaapErrorResponse); //
// Distibute the body to all jobs for this type
return Flux.fromIterable(this.jobs.getJobsForType(this.type)) //
.doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl()))
// Distibute the body to all jobs for this type
return Flux.fromIterable(this.jobs.getJobsForType(this.type)) //
.doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl()))