X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=dmaap-adaptor-java%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fdmaapadapter%2Ftasks%2FProducerRegstrationTask.java;h=8b5b6cfcedfc0d5fe4496e312bb28b61625040c6;hb=46a0fd717e5f49ebae6cb2c4fbcf54f0e329dc86;hp=b9a50b39ebd167e793aacd036674cd59d5a24ea6;hpb=f0e49a07dad877f94f635dda4ab477b9636536c8;p=nonrtric.git diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java index b9a50b39..8b5b6cfc 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java @@ -20,8 +20,16 @@ package org.oran.dmaapadapter.tasks; +import com.google.common.io.CharStreams; import com.google.gson.JsonParser; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; + +import lombok.Getter; + import org.oran.dmaapadapter.clients.AsyncRestClient; import org.oran.dmaapadapter.clients.AsyncRestClientFactory; import org.oran.dmaapadapter.configuration.ApplicationConfig; @@ -56,6 +64,7 @@ public class ProducerRegstrationTask { private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create(); private static final String PRODUCER_ID = "DmaapGenericInfoProducer"; + @Getter private boolean isRegisteredInEcs = false; private static final int REGISTRATION_SUPERVISION_INTERVAL_MS = 1000 * 5; @@ -68,65 +77,96 @@ public class ProducerRegstrationTask { @Scheduled(fixedRate = REGISTRATION_SUPERVISION_INTERVAL_MS) public void supervisionTask() { - logger.debug("Checking producers starting"); - createTask().subscribe(null, null, () -> logger.debug("Producer registration completed")); + checkRegistration() // + .filter(isRegistrationOk -> !isRegistrationOk || !this.isRegisteredInEcs) // + .flatMap(isRegisterred -> registerTypesAndProducer()) // + .subscribe( // + null, // + this::handleRegistrationFailure, // + this::handleRegistrationCompleted); } - public Mono createTask() { - return checkProducerRegistration() // - .doOnError(t -> isRegisteredInEcs = false) // - .onErrorResume(t -> registerTypesAndProducer()); + private void handleRegistrationCompleted() { + isRegisteredInEcs = true; } - public boolean isRegisteredInEcs() { - return this.isRegisteredInEcs; + private void handleRegistrationFailure(Throwable t) { + logger.warn("Registration of producer failed {}", t.getMessage()); } - private Mono checkProducerRegistration() { + // Returns TRUE if registration is correct + private Mono checkRegistration() { final String url = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID; return restClient.get(url) // - .flatMap(this::checkRegistrationInfo) // - ; + .flatMap(this::isRegisterredInfoCorrect) // + .onErrorResume(t -> Mono.just(Boolean.FALSE)); + } + + private Mono isRegisterredInfoCorrect(String registerredInfoStr) { + ProducerRegistrationInfo registerredInfo = gson.fromJson(registerredInfoStr, ProducerRegistrationInfo.class); + if (isEqual(producerRegistrationInfo(), registerredInfo)) { + logger.trace("Already registered in ECS"); + return Mono.just(Boolean.TRUE); + } else { + return Mono.just(Boolean.FALSE); + } } private String registerTypeUrl(InfoType type) { - String url = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId(); - return url; + return applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId(); } private Mono registerTypesAndProducer() { + final int CONCURRENCY = 20; final String producerUrl = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID; return Flux.fromIterable(this.types.getAll()) // .doOnNext(type -> logger.info("Registering type {}", type.getId())) // - .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo()))) // + .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo(type))), + CONCURRENCY) // .collectList() // - .flatMap(resp -> restClient.put(producerUrl, gson.toJson(producerRegistrationInfo()))) // - .onErrorResume(t -> { - logger.warn("Registration failed {}", t.getMessage()); - isRegisteredInEcs = false; - return Mono.empty(); - }) // - .doOnNext(x -> logger.debug("Registering types and producer completed")); + .doOnNext(type -> logger.info("Registering producer")) // + .flatMap(resp -> restClient.put(producerUrl, gson.toJson(producerRegistrationInfo()))); } private Object typeSpecifcInfoObject() { return jsonObject("{}"); } - private ProducerInfoTypeInfo typeRegistrationInfo() { - return new ProducerInfoTypeInfo(jsonSchemaObject(), typeSpecifcInfoObject()); + private ProducerInfoTypeInfo typeRegistrationInfo(InfoType type) { + try { + return new ProducerInfoTypeInfo(jsonSchemaObject(type), typeSpecifcInfoObject()); + } catch (Exception e) { + logger.error("Fatal error {}", e.getMessage()); + return null; + } } - private Object jsonSchemaObject() { - // An object with no properties - String schemaStr = "{" // - + "\"type\": \"object\"," // - + "\"properties\": {}," // - + "\"additionalProperties\": false" // - + "}"; // - return jsonObject(schemaStr); + private Object jsonSchemaObject(InfoType type) throws IOException, ServiceException { + + if (type.isKafkaTopicDefined()) { + String schemaStrKafka = readSchemaFile("/typeSchemaKafka.json"); + return jsonObject(schemaStrKafka); + } else { + // An object with no properties + String schemaStr = "{" // + + "\"type\": \"object\"," // + + "\"properties\": {}," // + + "\"additionalProperties\": false" // + + "}"; // + + return jsonObject(schemaStr); + } + } + + private String readSchemaFile(String filePath) throws IOException, ServiceException { + InputStream in = getClass().getResourceAsStream(filePath); + logger.debug("Reading application schema file from: {} with: {}", filePath, in); + if (in == null) { + throw new ServiceException("Could not readfile: " + filePath); + } + return CharStreams.toString(new InputStreamReader(in, StandardCharsets.UTF_8)); } private Object jsonObject(String json) { @@ -138,17 +178,6 @@ public class ProducerRegstrationTask { } } - private Mono checkRegistrationInfo(String resp) { - ProducerRegistrationInfo info = gson.fromJson(resp, ProducerRegistrationInfo.class); - if (isEqual(producerRegistrationInfo(), info)) { - logger.debug("Already registered"); - this.isRegisteredInEcs = true; - return Mono.empty(); - } else { - return Mono.error(new ServiceException("Producer registration will be started")); - } - } - private boolean isEqual(ProducerRegistrationInfo a, ProducerRegistrationInfo b) { return a.jobCallbackUrl.equals(b.jobCallbackUrl) // && a.producerSupervisionCallbackUrl.equals(b.producerSupervisionCallbackUrl) // @@ -160,7 +189,7 @@ public class ProducerRegstrationTask { return ProducerRegistrationInfo.builder() // .jobCallbackUrl(baseUrl() + ProducerCallbacksController.JOB_URL) // .producerSupervisionCallbackUrl(baseUrl() + ProducerCallbacksController.SUPERVISION_URL) // - .supportedTypeIds(types.typeIds()) // + .supportedTypeIds(this.types.typeIds()) // .build(); }