import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Mono;
/**
- * Registers the types and this producer in ECS. This is done when needed.
+ * Registers the types and this producer in Innformation Coordinator Service.
+ * This is done when needed.
*/
@Component
@EnableScheduling
private static final String PRODUCER_ID = "DmaapGenericInfoProducer";
@Getter
- private boolean isRegisteredInEcs = false;
+ private boolean isRegisteredInIcs = false;
private static final int REGISTRATION_SUPERVISION_INTERVAL_MS = 1000 * 5;
public ProducerRegstrationTask(@Autowired ApplicationConfig applicationConfig, @Autowired InfoTypes types) {
}
@Scheduled(fixedRate = REGISTRATION_SUPERVISION_INTERVAL_MS)
- public void supervisionTask() {
- checkRegistration() //
- .filter(isRegistrationOk -> !isRegistrationOk || !this.isRegisteredInEcs) //
- .flatMap(isRegisterred -> registerTypesAndProducer()) //
- .subscribe( //
- null, //
- this::handleRegistrationFailure, //
- this::handleRegistrationCompleted);
+ public void runSupervisionTask() {
+ supervisionTask().subscribe( //
+ null, //
+ this::handleRegistrationFailure, //
+ this::handleRegistrationCompleted);
+ }
+
+ public Mono<String> supervisionTask() {
+ return checkRegistration() //
+ .filter(isRegistrationOk -> !isRegistrationOk || !this.isRegisteredInIcs) //
+ .flatMap(isRegisterred -> registerTypesAndProducer());
}
private void handleRegistrationCompleted() {
- isRegisteredInEcs = true;
+ isRegisteredInIcs = true;
}
private void handleRegistrationFailure(Throwable t) {
// Returns TRUE if registration is correct
private Mono<Boolean> checkRegistration() {
- final String url = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
+ final String url = applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
return restClient.get(url) //
.flatMap(this::isRegisterredInfoCorrect) //
.onErrorResume(t -> Mono.just(Boolean.FALSE));
private Mono<Boolean> isRegisterredInfoCorrect(String registerredInfoStr) {
ProducerRegistrationInfo registerredInfo = gson.fromJson(registerredInfoStr, ProducerRegistrationInfo.class);
if (isEqual(producerRegistrationInfo(), registerredInfo)) {
- logger.trace("Already registered in ECS");
+ logger.trace("Already registered in ICS");
return Mono.just(Boolean.TRUE);
} else {
return Mono.just(Boolean.FALSE);
}
private String registerTypeUrl(InfoType type) {
- return applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId();
+ return applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId();
}
private Mono<String> registerTypesAndProducer() {
final int CONCURRENCY = 20;
final String producerUrl =
- applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
+ applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
return Flux.fromIterable(this.types.getAll()) //
.doOnNext(type -> logger.info("Registering type {}", type.getId())) //
InputStream in = getClass().getResourceAsStream(filePath);
logger.debug("Reading application schema file from: {} with: {}", filePath, in);
if (in == null) {
- throw new ServiceException("Could not readfile: " + filePath);
+ throw new ServiceException("Could not readfile: " + filePath, HttpStatus.INTERNAL_SERVER_ERROR);
}
return CharStreams.toString(new InputStreamReader(in, StandardCharsets.UTF_8));
}