package org.oran.dmaapadapter.tasks;
+import com.google.common.io.CharStreams;
import com.google.gson.JsonParser;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+
+import lombok.Getter;
+
import org.oran.dmaapadapter.clients.AsyncRestClient;
import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import reactor.core.publisher.Mono;
/**
- * Registers the types and this producer in ECS. This is done when needed.
+ * Registers the types and this producer in Innformation Coordinator Service.
+ * This is done when needed.
*/
@Component
@EnableScheduling
private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
private static final String PRODUCER_ID = "DmaapGenericInfoProducer";
- private boolean isRegisteredInEcs = false;
+ @Getter
+ private boolean isRegisteredInIcs = false;
private static final int REGISTRATION_SUPERVISION_INTERVAL_MS = 1000 * 5;
public ProducerRegstrationTask(@Autowired ApplicationConfig applicationConfig, @Autowired InfoTypes types) {
@Scheduled(fixedRate = REGISTRATION_SUPERVISION_INTERVAL_MS)
public void supervisionTask() {
- logger.debug("Checking producers starting");
- createTask().subscribe(null, null, () -> logger.debug("Producer registration completed"));
+ checkRegistration() //
+ .filter(isRegistrationOk -> !isRegistrationOk || !this.isRegisteredInIcs) //
+ .flatMap(isRegisterred -> registerTypesAndProducer()) //
+ .subscribe( //
+ null, //
+ this::handleRegistrationFailure, //
+ this::handleRegistrationCompleted);
}
- public Mono<Object> createTask() {
- return checkProducerRegistration() //
- .doOnError(t -> isRegisteredInEcs = false) //
- .onErrorResume(t -> registerTypesAndProducer());
+ private void handleRegistrationCompleted() {
+ isRegisteredInIcs = true;
}
- public boolean isRegisteredInEcs() {
- return this.isRegisteredInEcs;
+ private void handleRegistrationFailure(Throwable t) {
+ logger.warn("Registration of producer failed {}", t.getMessage());
}
- private Mono<Object> checkProducerRegistration() {
- final String url = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
+ // Returns TRUE if registration is correct
+ private Mono<Boolean> checkRegistration() {
+ final String url = applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
return restClient.get(url) //
- .flatMap(this::checkRegistrationInfo) //
- ;
+ .flatMap(this::isRegisterredInfoCorrect) //
+ .onErrorResume(t -> Mono.just(Boolean.FALSE));
+ }
+
+ private Mono<Boolean> isRegisterredInfoCorrect(String registerredInfoStr) {
+ ProducerRegistrationInfo registerredInfo = gson.fromJson(registerredInfoStr, ProducerRegistrationInfo.class);
+ if (isEqual(producerRegistrationInfo(), registerredInfo)) {
+ logger.trace("Already registered in ICS");
+ return Mono.just(Boolean.TRUE);
+ } else {
+ return Mono.just(Boolean.FALSE);
+ }
}
private String registerTypeUrl(InfoType type) {
- String url = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId();
- return url;
+ return applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId();
}
private Mono<String> registerTypesAndProducer() {
+ final int CONCURRENCY = 20;
final String producerUrl =
- applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
+ applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
return Flux.fromIterable(this.types.getAll()) //
.doOnNext(type -> logger.info("Registering type {}", type.getId())) //
- .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo()))) //
+ .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo(type))),
+ CONCURRENCY) //
.collectList() //
- .flatMap(resp -> restClient.put(producerUrl, gson.toJson(producerRegistrationInfo()))) //
- .onErrorResume(t -> {
- logger.warn("Registration failed {}", t.getMessage());
- isRegisteredInEcs = false;
- return Mono.empty();
- }) //
- .doOnNext(x -> logger.debug("Registering types and producer completed"));
+ .doOnNext(type -> logger.info("Registering producer")) //
+ .flatMap(resp -> restClient.put(producerUrl, gson.toJson(producerRegistrationInfo())));
}
private Object typeSpecifcInfoObject() {
return jsonObject("{}");
}
- private ProducerInfoTypeInfo typeRegistrationInfo() {
- return new ProducerInfoTypeInfo(jsonSchemaObject(), typeSpecifcInfoObject());
+ private ProducerInfoTypeInfo typeRegistrationInfo(InfoType type) {
+ try {
+ return new ProducerInfoTypeInfo(jsonSchemaObject(type), typeSpecifcInfoObject());
+ } catch (Exception e) {
+ logger.error("Fatal error {}", e.getMessage());
+ return null;
+ }
}
- private Object jsonSchemaObject() {
- // An object with no properties
- String schemaStr = "{" //
- + "\"type\": \"object\"," //
- + "\"properties\": {}," //
- + "\"additionalProperties\": false" //
- + "}"; //
- return jsonObject(schemaStr);
+ private Object jsonSchemaObject(InfoType type) throws IOException, ServiceException {
+ String schemaFile = type.isKafkaTopicDefined() ? "/typeSchemaKafka.json" : "/typeSchemaDmaap.json";
+ return jsonObject(readSchemaFile(schemaFile));
}
+ private String readSchemaFile(String filePath) throws IOException, ServiceException {
+ InputStream in = getClass().getResourceAsStream(filePath);
+ logger.debug("Reading application schema file from: {} with: {}", filePath, in);
+ if (in == null) {
+ throw new ServiceException("Could not readfile: " + filePath);
+ }
+ return CharStreams.toString(new InputStreamReader(in, StandardCharsets.UTF_8));
+ }
+
+ @SuppressWarnings("java:S2139") // Log exception
private Object jsonObject(String json) {
try {
return JsonParser.parseString(json).getAsJsonObject();
} catch (Exception e) {
- logger.error("Bug, error in JSON: {}", json);
- throw new NullPointerException(e.toString());
- }
- }
-
- private Mono<String> checkRegistrationInfo(String resp) {
- ProducerRegistrationInfo info = gson.fromJson(resp, ProducerRegistrationInfo.class);
- if (isEqual(producerRegistrationInfo(), info)) {
- logger.debug("Already registered");
- this.isRegisteredInEcs = true;
- return Mono.empty();
- } else {
- return Mono.error(new ServiceException("Producer registration will be started"));
+ logger.error("Bug, error in JSON: {} {}", json, e.getMessage());
+ throw new NullPointerException(e.getMessage());
}
}
}
private ProducerRegistrationInfo producerRegistrationInfo() {
-
return ProducerRegistrationInfo.builder() //
.jobCallbackUrl(baseUrl() + ProducerCallbacksController.JOB_URL) //
.producerSupervisionCallbackUrl(baseUrl() + ProducerCallbacksController.SUPERVISION_URL) //
- .supportedTypeIds(types.typeIds()) //
+ .supportedTypeIds(this.types.typeIds()) //
.build();
}