private static final Duration TIME_BETWEEN_DMAAP_RETRIES = Duration.ofSeconds(10);
private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumer.class);
private final ApplicationConfig applicationConfig;
- private final AsyncRestClient restClient;
+ private final AsyncRestClient dmaapRestClient;
+ private final AsyncRestClient consumerRestClient;
private final InfoType type;
private final Jobs jobs;
private final InfiniteFlux infiniteSubmitter = new InfiniteFlux();
public DmaapMessageConsumer(ApplicationConfig applicationConfig, InfoType type, Jobs jobs) {
this.applicationConfig = applicationConfig;
AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
- this.restClient = restclientFactory.createRestClientNoHttpProxy("");
+ this.dmaapRestClient = restclientFactory.createRestClientNoHttpProxy("");
+ this.consumerRestClient = type.isUseHttpProxy() ? restclientFactory.createRestClientUseHttpProxy("")
+ : restclientFactory.createRestClientNoHttpProxy("");
this.type = type;
this.jobs = jobs;
}
protected Mono<String> getFromMessageRouter(String topicUrl) {
logger.trace("getFromMessageRouter {}", topicUrl);
- return restClient.get(topicUrl) //
+ return dmaapRestClient.get(topicUrl) //
.filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away.
.doOnNext(message -> logger.debug("Message from DMAAP topic: {} : {}", topicUrl, message)) //
.onErrorResume(this::handleDmaapErrorResponse); //
// Distibute the body to all jobs for this type
return Flux.fromIterable(this.jobs.getJobsForType(this.type)) //
.doOnNext(job -> logger.debug("Sending to consumer {}", job.getCallbackUrl()))
- .flatMap(job -> restClient.post(job.getCallbackUrl(), body), CONCURRENCY) //
+ .flatMap(job -> consumerRestClient.post(job.getCallbackUrl(), body), CONCURRENCY) //
.onErrorResume(this::handleConsumerErrorResponse);
}