NONRTRIC - Implement DMaaP mediator producer service in Java
[nonrtric.git] / dmaap-adaptor-java / src / main / java / org / oran / dmaapadapter / tasks / ProducerRegstrationTask.java
index 837ca32..ec3f2b2 100644 (file)
 
 package org.oran.dmaapadapter.tasks;
 
+import com.google.common.io.CharStreams;
 import com.google.gson.JsonParser;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+
 import lombok.Getter;
 
 import org.oran.dmaapadapter.clients.AsyncRestClient;
 import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
+import org.oran.dmaapadapter.exceptions.ServiceException;
 import org.oran.dmaapadapter.r1.ProducerInfoTypeInfo;
 import org.oran.dmaapadapter.r1.ProducerRegistrationInfo;
 import org.oran.dmaapadapter.repository.InfoType;
@@ -35,6 +42,7 @@ import org.oran.dmaapadapter.repository.InfoTypes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
 import org.springframework.scheduling.annotation.EnableScheduling;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
@@ -43,7 +51,8 @@ import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 /**
- * Registers the types and this producer in ECS. This is done when needed.
+ * Registers the types and this producer in Innformation Coordinator Service.
+ * This is done when needed.
  */
 @Component
 @EnableScheduling
@@ -58,7 +67,7 @@ public class ProducerRegstrationTask {
 
     private static final String PRODUCER_ID = "DmaapGenericInfoProducer";
     @Getter
-    private boolean isRegisteredInEcs = false;
+    private boolean isRegisteredInIcs = false;
     private static final int REGISTRATION_SUPERVISION_INTERVAL_MS = 1000 * 5;
 
     public ProducerRegstrationTask(@Autowired ApplicationConfig applicationConfig, @Autowired InfoTypes types) {
@@ -69,28 +78,30 @@ public class ProducerRegstrationTask {
     }
 
     @Scheduled(fixedRate = REGISTRATION_SUPERVISION_INTERVAL_MS)
-    public void supervisionTask() {
-        checkRegistration() //
-                .filter(isRegisterred -> !isRegisterred) //
-                .flatMap(isRegisterred -> registerTypesAndProducer()) //
-                .subscribe( //
-                        null, //
-                        this::handleRegistrationFailure, //
-                        this::handleRegistrationCompleted);
+    public void runSupervisionTask() {
+        supervisionTask().subscribe( //
+                null, //
+                this::handleRegistrationFailure, //
+                this::handleRegistrationCompleted);
+    }
+
+    public Mono<String> supervisionTask() {
+        return checkRegistration() //
+                .filter(isRegistrationOk -> !isRegistrationOk || !this.isRegisteredInIcs) //
+                .flatMap(isRegisterred -> registerTypesAndProducer());
     }
 
     private void handleRegistrationCompleted() {
-        logger.debug("Registering types and producer succeeded");
-        isRegisteredInEcs = true;
+        isRegisteredInIcs = true;
     }
 
     private void handleRegistrationFailure(Throwable t) {
-        logger.warn("Registration failed {}", t.getMessage());
-        isRegisteredInEcs = false;
+        logger.warn("Registration of producer failed {}", t.getMessage());
     }
 
+    // Returns TRUE if registration is correct
     private Mono<Boolean> checkRegistration() {
-        final String url = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
+        final String url = applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
         return restClient.get(url) //
                 .flatMap(this::isRegisterredInfoCorrect) //
                 .onErrorResume(t -> Mono.just(Boolean.FALSE));
@@ -99,7 +110,7 @@ public class ProducerRegstrationTask {
     private Mono<Boolean> isRegisterredInfoCorrect(String registerredInfoStr) {
         ProducerRegistrationInfo registerredInfo = gson.fromJson(registerredInfoStr, ProducerRegistrationInfo.class);
         if (isEqual(producerRegistrationInfo(), registerredInfo)) {
-            logger.trace("Already registered");
+            logger.trace("Already registered in ICS");
             return Mono.just(Boolean.TRUE);
         } else {
             return Mono.just(Boolean.FALSE);
@@ -107,17 +118,17 @@ public class ProducerRegstrationTask {
     }
 
     private String registerTypeUrl(InfoType type) {
-        return applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId();
+        return applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId();
     }
 
     private Mono<String> registerTypesAndProducer() {
         final int CONCURRENCY = 20;
         final String producerUrl =
-                applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
+                applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
 
         return Flux.fromIterable(this.types.getAll()) //
                 .doOnNext(type -> logger.info("Registering type {}", type.getId())) //
-                .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo())),
+                .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo(type))),
                         CONCURRENCY) //
                 .collectList() //
                 .doOnNext(type -> logger.info("Registering producer")) //
@@ -128,26 +139,36 @@ public class ProducerRegstrationTask {
         return jsonObject("{}");
     }
 
-    private ProducerInfoTypeInfo typeRegistrationInfo() {
-        return new ProducerInfoTypeInfo(jsonSchemaObject(), typeSpecifcInfoObject());
+    private ProducerInfoTypeInfo typeRegistrationInfo(InfoType type) {
+        try {
+            return new ProducerInfoTypeInfo(jsonSchemaObject(type), typeSpecifcInfoObject());
+        } catch (Exception e) {
+            logger.error("Fatal error {}", e.getMessage());
+            return null;
+        }
+    }
+
+    private Object jsonSchemaObject(InfoType type) throws IOException, ServiceException {
+        String schemaFile = type.isKafkaTopicDefined() ? "/typeSchemaKafka.json" : "/typeSchemaDmaap.json";
+        return jsonObject(readSchemaFile(schemaFile));
     }
 
-    private Object jsonSchemaObject() {
-        // An object with no properties
-        String schemaStr = "{" //
-                + "\"type\": \"object\"," //
-                + "\"properties\": {}," //
-                + "\"additionalProperties\": false" //
-                + "}"; //
-        return jsonObject(schemaStr);
+    private String readSchemaFile(String filePath) throws IOException, ServiceException {
+        InputStream in = getClass().getResourceAsStream(filePath);
+        logger.debug("Reading application schema file from: {} with: {}", filePath, in);
+        if (in == null) {
+            throw new ServiceException("Could not readfile: " + filePath, HttpStatus.INTERNAL_SERVER_ERROR);
+        }
+        return CharStreams.toString(new InputStreamReader(in, StandardCharsets.UTF_8));
     }
 
+    @SuppressWarnings("java:S2139") // Log exception
     private Object jsonObject(String json) {
         try {
             return JsonParser.parseString(json).getAsJsonObject();
         } catch (Exception e) {
-            logger.error("Bug, error in JSON: {}", json);
-            throw new NullPointerException(e.toString());
+            logger.error("Bug, error in JSON: {} {}", json, e.getMessage());
+            throw new NullPointerException(e.getMessage());
         }
     }
 
@@ -158,7 +179,6 @@ public class ProducerRegstrationTask {
     }
 
     private ProducerRegistrationInfo producerRegistrationInfo() {
-
         return ProducerRegistrationInfo.builder() //
                 .jobCallbackUrl(baseUrl() + ProducerCallbacksController.JOB_URL) //
                 .producerSupervisionCallbackUrl(baseUrl() + ProducerCallbacksController.SUPERVISION_URL) //