import com.google.gson.JsonParser;
+import lombok.Getter;
+
import org.oran.dmaapadapter.clients.AsyncRestClient;
import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
-import org.oran.dmaapadapter.exceptions.ServiceException;
import org.oran.dmaapadapter.r1.ProducerInfoTypeInfo;
import org.oran.dmaapadapter.r1.ProducerRegistrationInfo;
import org.oran.dmaapadapter.repository.InfoType;
private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
private static final String PRODUCER_ID = "DmaapGenericInfoProducer";
+ @Getter
private boolean isRegisteredInEcs = false;
private static final int REGISTRATION_SUPERVISION_INTERVAL_MS = 1000 * 5;
@Scheduled(fixedRate = REGISTRATION_SUPERVISION_INTERVAL_MS)
public void supervisionTask() {
- logger.debug("Checking producers starting");
- createTask().subscribe(null, null, () -> logger.debug("Producer registration completed"));
+ checkRegistration() //
+ .filter(isRegisterred -> !isRegisterred) //
+ .flatMap(isRegisterred -> registerTypesAndProducer()) //
+ .subscribe( //
+ null, //
+ this::handleRegistrationFailure, //
+ this::handleRegistrationCompleted);
}
- public Mono<Object> createTask() {
- return checkProducerRegistration() //
- .doOnError(t -> isRegisteredInEcs = false) //
- .onErrorResume(t -> registerTypesAndProducer());
+ private void handleRegistrationCompleted() {
+ logger.debug("Registering types and producer succeeded");
+ isRegisteredInEcs = true;
}
- public boolean isRegisteredInEcs() {
- return this.isRegisteredInEcs;
+ private void handleRegistrationFailure(Throwable t) {
+ logger.warn("Registration failed {}", t.getMessage());
+ isRegisteredInEcs = false;
}
- private Mono<Object> checkProducerRegistration() {
+ private Mono<Boolean> checkRegistration() {
final String url = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
return restClient.get(url) //
- .flatMap(this::checkRegistrationInfo) //
- ;
+ .flatMap(this::isRegisterredInfoCorrect) //
+ .onErrorResume(t -> Mono.just(Boolean.FALSE));
+ }
+
+ private Mono<Boolean> isRegisterredInfoCorrect(String registerredInfoStr) {
+ ProducerRegistrationInfo registerredInfo = gson.fromJson(registerredInfoStr, ProducerRegistrationInfo.class);
+ if (isEqual(producerRegistrationInfo(), registerredInfo)) {
+ logger.trace("Already registered");
+ return Mono.just(Boolean.TRUE);
+ } else {
+ return Mono.just(Boolean.FALSE);
+ }
}
private String registerTypeUrl(InfoType type) {
- String url = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId();
- return url;
+ return applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId();
}
private Mono<String> registerTypesAndProducer() {
+ final int CONCURRENCY = 20;
final String producerUrl =
applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
return Flux.fromIterable(this.types.getAll()) //
.doOnNext(type -> logger.info("Registering type {}", type.getId())) //
- .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo()))) //
+ .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo())),
+ CONCURRENCY) //
.collectList() //
- .flatMap(resp -> restClient.put(producerUrl, gson.toJson(producerRegistrationInfo()))) //
- .onErrorResume(t -> {
- logger.warn("Registration failed {}", t.getMessage());
- isRegisteredInEcs = false;
- return Mono.empty();
- }) //
- .doOnNext(x -> logger.debug("Registering types and producer completed"));
+ .doOnNext(type -> logger.info("Registering producer")) //
+ .flatMap(resp -> restClient.put(producerUrl, gson.toJson(producerRegistrationInfo())));
}
private Object typeSpecifcInfoObject() {
}
}
- private Mono<String> checkRegistrationInfo(String resp) {
- ProducerRegistrationInfo info = gson.fromJson(resp, ProducerRegistrationInfo.class);
- if (isEqual(producerRegistrationInfo(), info)) {
- logger.debug("Already registered");
- this.isRegisteredInEcs = true;
- return Mono.empty();
- } else {
- return Mono.error(new ServiceException("Producer registration will be started"));
- }
- }
-
private boolean isEqual(ProducerRegistrationInfo a, ProducerRegistrationInfo b) {
return a.jobCallbackUrl.equals(b.jobCallbackUrl) //
&& a.producerSupervisionCallbackUrl.equals(b.producerSupervisionCallbackUrl) //
return ProducerRegistrationInfo.builder() //
.jobCallbackUrl(baseUrl() + ProducerCallbacksController.JOB_URL) //
.producerSupervisionCallbackUrl(baseUrl() + ProducerCallbacksController.SUPERVISION_URL) //
- .supportedTypeIds(types.typeIds()) //
+ .supportedTypeIds(this.types.typeIds()) //
.build();
}