NONRTRIC - Implement DMaaP mediator producer service in Java
[nonrtric.git] / dmaap-adaptor-java / src / main / java / org / oran / dmaapadapter / tasks / ProducerRegstrationTask.java
index 4a68ab0..e8b236c 100644 (file)
 
 package org.oran.dmaapadapter.tasks;
 
+import com.google.common.io.CharStreams;
 import com.google.gson.JsonParser;
 
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+
 import lombok.Getter;
 
 import org.oran.dmaapadapter.clients.AsyncRestClient;
 import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
+import org.oran.dmaapadapter.exceptions.ServiceException;
 import org.oran.dmaapadapter.r1.ProducerInfoTypeInfo;
 import org.oran.dmaapadapter.r1.ProducerRegistrationInfo;
 import org.oran.dmaapadapter.repository.InfoType;
@@ -111,12 +118,12 @@ public class ProducerRegstrationTask {
 
     private Mono<String> registerTypesAndProducer() {
         final int CONCURRENCY = 20;
-        final String producerUrl =
-                applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
+        final String producerUrl = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/"
+                + PRODUCER_ID;
 
         return Flux.fromIterable(this.types.getAll()) //
                 .doOnNext(type -> logger.info("Registering type {}", type.getId())) //
-                .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo())),
+                .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo(type))),
                         CONCURRENCY) //
                 .collectList() //
                 .doOnNext(type -> logger.info("Registering producer")) //
@@ -127,18 +134,39 @@ public class ProducerRegstrationTask {
         return jsonObject("{}");
     }
 
-    private ProducerInfoTypeInfo typeRegistrationInfo() {
-        return new ProducerInfoTypeInfo(jsonSchemaObject(), typeSpecifcInfoObject());
+    private ProducerInfoTypeInfo typeRegistrationInfo(InfoType type) {
+        try {
+            return new ProducerInfoTypeInfo(jsonSchemaObject(type), typeSpecifcInfoObject());
+        } catch (Exception e) {
+            logger.error("Fatal error {}", e.getMessage());
+            return null;
+        }
+    }
+
+    private Object jsonSchemaObject(InfoType type) throws IOException, ServiceException {
+
+        if (type.isKafkaTopicDefined()) {
+            String schemaStrKafka = readSchemaFile("/typeSchemaKafka.json");
+            return jsonObject(schemaStrKafka);
+        } else {
+            // An object with no properties
+            String schemaStr = "{" //
+                    + "\"type\": \"object\"," //
+                    + "\"properties\": {}," //
+                    + "\"additionalProperties\": false" //
+                    + "}"; //
+
+            return jsonObject(schemaStr);
+        }
     }
 
-    private Object jsonSchemaObject() {
-        // An object with no properties
-        String schemaStr = "{" //
-                + "\"type\": \"object\"," //
-                + "\"properties\": {}," //
-                + "\"additionalProperties\": false" //
-                + "}"; //
-        return jsonObject(schemaStr);
+    private String readSchemaFile(String filePath) throws IOException, ServiceException {
+        InputStream in = getClass().getResourceAsStream(filePath);
+        logger.debug("Reading application schema file from: {} with: {}", filePath, in);
+        if (in == null) {
+            throw new ServiceException("Could not readfile: " + filePath);
+        }
+        return CharStreams.toString(new InputStreamReader(in, StandardCharsets.UTF_8));
     }
 
     private Object jsonObject(String json) {