package org.oran.dmaapadapter.tasks;
+import com.google.common.io.CharStreams;
import com.google.gson.JsonParser;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+
import lombok.Getter;
import org.oran.dmaapadapter.clients.AsyncRestClient;
import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
+import org.oran.dmaapadapter.exceptions.ServiceException;
import org.oran.dmaapadapter.r1.ProducerInfoTypeInfo;
import org.oran.dmaapadapter.r1.ProducerRegistrationInfo;
import org.oran.dmaapadapter.repository.InfoType;
@Scheduled(fixedRate = REGISTRATION_SUPERVISION_INTERVAL_MS)
public void supervisionTask() {
checkRegistration() //
- .filter(isRegisterred -> !isRegisterred) //
+ .filter(isRegistrationOk -> !isRegistrationOk || !this.isRegisteredInEcs) //
.flatMap(isRegisterred -> registerTypesAndProducer()) //
.subscribe( //
null, //
}
private void handleRegistrationCompleted() {
- logger.debug("Registering types and producer succeeded");
isRegisteredInEcs = true;
}
private void handleRegistrationFailure(Throwable t) {
- logger.warn("Registration failed {}", t.getMessage());
- isRegisteredInEcs = false;
+ logger.warn("Registration of producer failed {}", t.getMessage());
}
+ // Returns TRUE if registration is correct
private Mono<Boolean> checkRegistration() {
final String url = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
return restClient.get(url) //
private Mono<Boolean> isRegisterredInfoCorrect(String registerredInfoStr) {
ProducerRegistrationInfo registerredInfo = gson.fromJson(registerredInfoStr, ProducerRegistrationInfo.class);
if (isEqual(producerRegistrationInfo(), registerredInfo)) {
- logger.trace("Already registered");
+ logger.trace("Already registered in ECS");
return Mono.just(Boolean.TRUE);
} else {
return Mono.just(Boolean.FALSE);
return Flux.fromIterable(this.types.getAll()) //
.doOnNext(type -> logger.info("Registering type {}", type.getId())) //
- .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo())),
+ .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo(type))),
CONCURRENCY) //
.collectList() //
.doOnNext(type -> logger.info("Registering producer")) //
return jsonObject("{}");
}
- private ProducerInfoTypeInfo typeRegistrationInfo() {
- return new ProducerInfoTypeInfo(jsonSchemaObject(), typeSpecifcInfoObject());
+ private ProducerInfoTypeInfo typeRegistrationInfo(InfoType type) {
+ try {
+ return new ProducerInfoTypeInfo(jsonSchemaObject(type), typeSpecifcInfoObject());
+ } catch (Exception e) {
+ logger.error("Fatal error {}", e.getMessage());
+ return null;
+ }
+ }
+
+ private Object jsonSchemaObject(InfoType type) throws IOException, ServiceException {
+ String schemaFile = type.isKafkaTopicDefined() ? "/typeSchemaKafka.json" : "/typeSchemaDmaap.json";
+ return jsonObject(readSchemaFile(schemaFile));
}
- private Object jsonSchemaObject() {
- // An object with no properties
- String schemaStr = "{" //
- + "\"type\": \"object\"," //
- + "\"properties\": {}," //
- + "\"additionalProperties\": false" //
- + "}"; //
- return jsonObject(schemaStr);
+ private String readSchemaFile(String filePath) throws IOException, ServiceException {
+ InputStream in = getClass().getResourceAsStream(filePath);
+ logger.debug("Reading application schema file from: {} with: {}", filePath, in);
+ if (in == null) {
+ throw new ServiceException("Could not readfile: " + filePath);
+ }
+ return CharStreams.toString(new InputStreamReader(in, StandardCharsets.UTF_8));
}
+ @SuppressWarnings("java:S2139") // Log exception
private Object jsonObject(String json) {
try {
return JsonParser.parseString(json).getAsJsonObject();
} catch (Exception e) {
- logger.error("Bug, error in JSON: {}", json);
- throw new NullPointerException(e.toString());
+ logger.error("Bug, error in JSON: {} {}", json, e.getMessage());
+ throw new NullPointerException(e.getMessage());
}
}
}
private ProducerRegistrationInfo producerRegistrationInfo() {
-
return ProducerRegistrationInfo.builder() //
.jobCallbackUrl(baseUrl() + ProducerCallbacksController.JOB_URL) //
.producerSupervisionCallbackUrl(baseUrl() + ProducerCallbacksController.SUPERVISION_URL) //