Fixed so that one registration is always done after a component restart.
Discarding empty array from DMAAP
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-597
Change-Id: Iafac2e7a6dd11fe74a5e8c6c6214858a8ff4e6d3
final int CONCURRENCY = 5;
return infiniteSubmitter.start() //
.flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) //
final int CONCURRENCY = 5;
return infiniteSubmitter.start() //
.flatMap(notUsed -> getFromMessageRouter(getDmaapUrl()), 1) //
+ .filter(body -> body.length() > 3) // DMAAP will return "[]" sometimes. That is thrown away.
.doOnNext(message -> logger.debug("Message Reveived from DMAAP : {}", message)) //
.flatMap(this::handleReceivedMessage, CONCURRENCY);
}
.doOnNext(message -> logger.debug("Message Reveived from DMAAP : {}", message)) //
.flatMap(this::handleReceivedMessage, CONCURRENCY);
}
@Scheduled(fixedRate = REGISTRATION_SUPERVISION_INTERVAL_MS)
public void supervisionTask() {
checkRegistration() //
@Scheduled(fixedRate = REGISTRATION_SUPERVISION_INTERVAL_MS)
public void supervisionTask() {
checkRegistration() //
- .filter(isRegisterred -> !isRegisterred) //
+ .filter(isRegistrationOk -> !isRegistrationOk || !this.isRegisteredInEcs) //
.flatMap(isRegisterred -> registerTypesAndProducer()) //
.subscribe( //
null, //
.flatMap(isRegisterred -> registerTypesAndProducer()) //
.subscribe( //
null, //
}
private void handleRegistrationCompleted() {
}
private void handleRegistrationCompleted() {
- logger.debug("Registering types and producer succeeded");
+ logger.debug("Registering types and producer completed");
isRegisteredInEcs = true;
}
private void handleRegistrationFailure(Throwable t) {
isRegisteredInEcs = true;
}
private void handleRegistrationFailure(Throwable t) {
- logger.warn("Registration failed {}", t.getMessage());
- isRegisteredInEcs = false;
+ logger.warn("Registration of producer failed {}", t.getMessage());
}
private Mono<Boolean> checkRegistration() {
}
private Mono<Boolean> checkRegistration() {
private Mono<String> registerTypesAndProducer() {
final int CONCURRENCY = 20;
private Mono<String> registerTypesAndProducer() {
final int CONCURRENCY = 20;
- final String producerUrl =
- applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
+ final String producerUrl = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/"
+ + PRODUCER_ID;
return Flux.fromIterable(this.types.getAll()) //
.doOnNext(type -> logger.info("Registering type {}", type.getId())) //
return Flux.fromIterable(this.types.getAll()) //
.doOnNext(type -> logger.info("Registering type {}", type.getId())) //