X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=dmaap-adaptor-java%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fdmaapadapter%2Ftasks%2FProducerRegstrationTask.java;h=ec3f2b29cb3536f8daa0ce7dfe808510f0fbda50;hb=342864a6e1822ac77355e170307969c47555728d;hp=8b5b6cfcedfc0d5fe4496e312bb28b61625040c6;hpb=ce1d9f2d3e1d2713289dc4d2b5c246f99ec65160;p=nonrtric.git diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java index 8b5b6cfc..ec3f2b29 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java @@ -42,6 +42,7 @@ import org.oran.dmaapadapter.repository.InfoTypes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; @@ -50,7 +51,8 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** - * Registers the types and this producer in ECS. This is done when needed. + * Registers the types and this producer in Innformation Coordinator Service. + * This is done when needed. */ @Component @EnableScheduling @@ -65,7 +67,7 @@ public class ProducerRegstrationTask { private static final String PRODUCER_ID = "DmaapGenericInfoProducer"; @Getter - private boolean isRegisteredInEcs = false; + private boolean isRegisteredInIcs = false; private static final int REGISTRATION_SUPERVISION_INTERVAL_MS = 1000 * 5; public ProducerRegstrationTask(@Autowired ApplicationConfig applicationConfig, @Autowired InfoTypes types) { @@ -76,18 +78,21 @@ public class ProducerRegstrationTask { } @Scheduled(fixedRate = REGISTRATION_SUPERVISION_INTERVAL_MS) - public void supervisionTask() { - checkRegistration() // - .filter(isRegistrationOk -> !isRegistrationOk || !this.isRegisteredInEcs) // - .flatMap(isRegisterred -> registerTypesAndProducer()) // - .subscribe( // - null, // - this::handleRegistrationFailure, // - this::handleRegistrationCompleted); + public void runSupervisionTask() { + supervisionTask().subscribe( // + null, // + this::handleRegistrationFailure, // + this::handleRegistrationCompleted); + } + + public Mono supervisionTask() { + return checkRegistration() // + .filter(isRegistrationOk -> !isRegistrationOk || !this.isRegisteredInIcs) // + .flatMap(isRegisterred -> registerTypesAndProducer()); } private void handleRegistrationCompleted() { - isRegisteredInEcs = true; + isRegisteredInIcs = true; } private void handleRegistrationFailure(Throwable t) { @@ -96,7 +101,7 @@ public class ProducerRegstrationTask { // Returns TRUE if registration is correct private Mono checkRegistration() { - final String url = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID; + final String url = applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID; return restClient.get(url) // .flatMap(this::isRegisterredInfoCorrect) // .onErrorResume(t -> Mono.just(Boolean.FALSE)); @@ -105,7 +110,7 @@ public class ProducerRegstrationTask { private Mono isRegisterredInfoCorrect(String registerredInfoStr) { ProducerRegistrationInfo registerredInfo = gson.fromJson(registerredInfoStr, ProducerRegistrationInfo.class); if (isEqual(producerRegistrationInfo(), registerredInfo)) { - logger.trace("Already registered in ECS"); + logger.trace("Already registered in ICS"); return Mono.just(Boolean.TRUE); } else { return Mono.just(Boolean.FALSE); @@ -113,13 +118,13 @@ public class ProducerRegstrationTask { } private String registerTypeUrl(InfoType type) { - return applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId(); + return applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId(); } private Mono registerTypesAndProducer() { final int CONCURRENCY = 20; final String producerUrl = - applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID; + applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID; return Flux.fromIterable(this.types.getAll()) // .doOnNext(type -> logger.info("Registering type {}", type.getId())) // @@ -144,37 +149,26 @@ public class ProducerRegstrationTask { } private Object jsonSchemaObject(InfoType type) throws IOException, ServiceException { - - if (type.isKafkaTopicDefined()) { - String schemaStrKafka = readSchemaFile("/typeSchemaKafka.json"); - return jsonObject(schemaStrKafka); - } else { - // An object with no properties - String schemaStr = "{" // - + "\"type\": \"object\"," // - + "\"properties\": {}," // - + "\"additionalProperties\": false" // - + "}"; // - - return jsonObject(schemaStr); - } + String schemaFile = type.isKafkaTopicDefined() ? "/typeSchemaKafka.json" : "/typeSchemaDmaap.json"; + return jsonObject(readSchemaFile(schemaFile)); } private String readSchemaFile(String filePath) throws IOException, ServiceException { InputStream in = getClass().getResourceAsStream(filePath); logger.debug("Reading application schema file from: {} with: {}", filePath, in); if (in == null) { - throw new ServiceException("Could not readfile: " + filePath); + throw new ServiceException("Could not readfile: " + filePath, HttpStatus.INTERNAL_SERVER_ERROR); } return CharStreams.toString(new InputStreamReader(in, StandardCharsets.UTF_8)); } + @SuppressWarnings("java:S2139") // Log exception private Object jsonObject(String json) { try { return JsonParser.parseString(json).getAsJsonObject(); } catch (Exception e) { - logger.error("Bug, error in JSON: {}", json); - throw new NullPointerException(e.toString()); + logger.error("Bug, error in JSON: {} {}", json, e.getMessage()); + throw new NullPointerException(e.getMessage()); } } @@ -185,7 +179,6 @@ public class ProducerRegstrationTask { } private ProducerRegistrationInfo producerRegistrationInfo() { - return ProducerRegistrationInfo.builder() // .jobCallbackUrl(baseUrl() + ProducerCallbacksController.JOB_URL) // .producerSupervisionCallbackUrl(baseUrl() + ProducerCallbacksController.SUPERVISION_URL) //