package org.oran.dmaapadapter.tasks;
+import com.google.common.io.CharStreams;
import com.google.gson.JsonParser;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+
import lombok.Getter;
import org.oran.dmaapadapter.clients.AsyncRestClient;
import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
+import org.oran.dmaapadapter.exceptions.ServiceException;
import org.oran.dmaapadapter.r1.ProducerInfoTypeInfo;
import org.oran.dmaapadapter.r1.ProducerRegistrationInfo;
import org.oran.dmaapadapter.repository.InfoType;
private Mono<String> registerTypesAndProducer() {
final int CONCURRENCY = 20;
- final String producerUrl =
- applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
+ final String producerUrl = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/"
+ + PRODUCER_ID;
return Flux.fromIterable(this.types.getAll()) //
.doOnNext(type -> logger.info("Registering type {}", type.getId())) //
- .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo())),
+ .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo(type))),
CONCURRENCY) //
.collectList() //
.doOnNext(type -> logger.info("Registering producer")) //
return jsonObject("{}");
}
- private ProducerInfoTypeInfo typeRegistrationInfo() {
- return new ProducerInfoTypeInfo(jsonSchemaObject(), typeSpecifcInfoObject());
+ private ProducerInfoTypeInfo typeRegistrationInfo(InfoType type) {
+ try {
+ return new ProducerInfoTypeInfo(jsonSchemaObject(type), typeSpecifcInfoObject());
+ } catch (Exception e) {
+ logger.error("Fatal error {}", e.getMessage());
+ return null;
+ }
+ }
+
+ private Object jsonSchemaObject(InfoType type) throws IOException, ServiceException {
+
+ if (type.isKafkaTopicDefined()) {
+ String schemaStrKafka = readSchemaFile("/typeSchemaKafka.json");
+ return jsonObject(schemaStrKafka);
+ } else {
+ // An object with no properties
+ String schemaStr = "{" //
+ + "\"type\": \"object\"," //
+ + "\"properties\": {}," //
+ + "\"additionalProperties\": false" //
+ + "}"; //
+
+ return jsonObject(schemaStr);
+ }
}
- private Object jsonSchemaObject() {
- // An object with no properties
- String schemaStr = "{" //
- + "\"type\": \"object\"," //
- + "\"properties\": {}," //
- + "\"additionalProperties\": false" //
- + "}"; //
- return jsonObject(schemaStr);
+ private String readSchemaFile(String filePath) throws IOException, ServiceException {
+ InputStream in = getClass().getResourceAsStream(filePath);
+ logger.debug("Reading application schema file from: {} with: {}", filePath, in);
+ if (in == null) {
+ throw new ServiceException("Could not readfile: " + filePath);
+ }
+ return CharStreams.toString(new InputStreamReader(in, StandardCharsets.UTF_8));
}
private Object jsonObject(String json) {