final int CONCURRENCY = 5;
return infiniteSubmitter.start() //
.flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) //
+ .filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away.
.doOnNext(message -> logger.debug("Message Reveived from DMAAP : {}", message)) //
.flatMap(this::handleReceivedMessage, CONCURRENCY);
}
@Scheduled(fixedRate = REGISTRATION_SUPERVISION_INTERVAL_MS)
public void supervisionTask() {
checkRegistration() //
- .filter(isRegisterred -> !isRegisterred) //
+ .filter(isRegistrationOk -> !isRegistrationOk || !this.isRegisteredInEcs) //
.flatMap(isRegisterred -> registerTypesAndProducer()) //
.subscribe( //
null, //
}
private void handleRegistrationCompleted() {
- logger.debug("Registering types and producer succeeded");
+ logger.debug("Registering types and producer completed");
isRegisteredInEcs = true;
}
private void handleRegistrationFailure(Throwable t) {
- logger.warn("Registration failed {}", t.getMessage());
- isRegisteredInEcs = false;
+ logger.warn("Registration of producer failed {}", t.getMessage());
}
private Mono<Boolean> checkRegistration() {
private Mono<String> registerTypesAndProducer() {
final int CONCURRENCY = 20;
- final String producerUrl =
- applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
+ final String producerUrl = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/"
+ + PRODUCER_ID;
return Flux.fromIterable(this.types.getAll()) //
.doOnNext(type -> logger.info("Registering type {}", type.getId())) //