X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=dmaap-adaptor-java%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fdmaapadapter%2Ftasks%2FProducerRegstrationTask.java;h=306cc6b4565e06c539969b76226841c1ec92fcf5;hb=f07ca52477eb610cf49c13b2d34f7e2c2eab3f74;hp=837ca323d95b0ee44e7ccb0406cbdf0cea32ec9e;hpb=cc74808f1f5835a0b42c77ced5a585648d6d434c;p=nonrtric.git diff --git a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java index 837ca323..306cc6b4 100644 --- a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java +++ b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java @@ -20,14 +20,21 @@ package org.oran.dmaapadapter.tasks; +import com.google.common.io.CharStreams; import com.google.gson.JsonParser; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; + import lombok.Getter; import org.oran.dmaapadapter.clients.AsyncRestClient; import org.oran.dmaapadapter.clients.AsyncRestClientFactory; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.controllers.ProducerCallbacksController; +import org.oran.dmaapadapter.exceptions.ServiceException; import org.oran.dmaapadapter.r1.ProducerInfoTypeInfo; import org.oran.dmaapadapter.r1.ProducerRegistrationInfo; import org.oran.dmaapadapter.repository.InfoType; @@ -71,7 +78,7 @@ public class ProducerRegstrationTask { @Scheduled(fixedRate = REGISTRATION_SUPERVISION_INTERVAL_MS) public void supervisionTask() { checkRegistration() // - .filter(isRegisterred -> !isRegisterred) // + .filter(isRegistrationOk -> !isRegistrationOk || !this.isRegisteredInEcs) // .flatMap(isRegisterred -> registerTypesAndProducer()) // .subscribe( // null, // @@ -80,15 +87,14 @@ public class ProducerRegstrationTask { } private void handleRegistrationCompleted() { - logger.debug("Registering types and producer succeeded"); isRegisteredInEcs = true; } private void handleRegistrationFailure(Throwable t) { - logger.warn("Registration failed {}", t.getMessage()); - isRegisteredInEcs = false; + logger.warn("Registration of producer failed {}", t.getMessage()); } + // Returns TRUE if registration is correct private Mono checkRegistration() { final String url = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID; return restClient.get(url) // @@ -99,7 +105,7 @@ public class ProducerRegstrationTask { private Mono isRegisterredInfoCorrect(String registerredInfoStr) { ProducerRegistrationInfo registerredInfo = gson.fromJson(registerredInfoStr, ProducerRegistrationInfo.class); if (isEqual(producerRegistrationInfo(), registerredInfo)) { - logger.trace("Already registered"); + logger.trace("Already registered in ECS"); return Mono.just(Boolean.TRUE); } else { return Mono.just(Boolean.FALSE); @@ -117,7 +123,7 @@ public class ProducerRegstrationTask { return Flux.fromIterable(this.types.getAll()) // .doOnNext(type -> logger.info("Registering type {}", type.getId())) // - .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo())), + .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo(type))), CONCURRENCY) // .collectList() // .doOnNext(type -> logger.info("Registering producer")) // @@ -128,26 +134,36 @@ public class ProducerRegstrationTask { return jsonObject("{}"); } - private ProducerInfoTypeInfo typeRegistrationInfo() { - return new ProducerInfoTypeInfo(jsonSchemaObject(), typeSpecifcInfoObject()); + private ProducerInfoTypeInfo typeRegistrationInfo(InfoType type) { + try { + return new ProducerInfoTypeInfo(jsonSchemaObject(type), typeSpecifcInfoObject()); + } catch (Exception e) { + logger.error("Fatal error {}", e.getMessage()); + return null; + } + } + + private Object jsonSchemaObject(InfoType type) throws IOException, ServiceException { + String schemaFile = type.isKafkaTopicDefined() ? "/typeSchemaKafka.json" : "/typeSchemaDmaap.json"; + return jsonObject(readSchemaFile(schemaFile)); } - private Object jsonSchemaObject() { - // An object with no properties - String schemaStr = "{" // - + "\"type\": \"object\"," // - + "\"properties\": {}," // - + "\"additionalProperties\": false" // - + "}"; // - return jsonObject(schemaStr); + private String readSchemaFile(String filePath) throws IOException, ServiceException { + InputStream in = getClass().getResourceAsStream(filePath); + logger.debug("Reading application schema file from: {} with: {}", filePath, in); + if (in == null) { + throw new ServiceException("Could not readfile: " + filePath); + } + return CharStreams.toString(new InputStreamReader(in, StandardCharsets.UTF_8)); } + @SuppressWarnings("java:S2139") // Log exception private Object jsonObject(String json) { try { return JsonParser.parseString(json).getAsJsonObject(); } catch (Exception e) { - logger.error("Bug, error in JSON: {}", json); - throw new NullPointerException(e.toString()); + logger.error("Bug, error in JSON: {} {}", json, e.getMessage()); + throw new NullPointerException(e.getMessage()); } } @@ -158,7 +174,6 @@ public class ProducerRegstrationTask { } private ProducerRegistrationInfo producerRegistrationInfo() { - return ProducerRegistrationInfo.builder() // .jobCallbackUrl(baseUrl() + ProducerCallbacksController.JOB_URL) // .producerSupervisionCallbackUrl(baseUrl() + ProducerCallbacksController.SUPERVISION_URL) //