NONRTRIC - Implement DMaaP mediator producer service in Java
[nonrtric.git] / dmaap-adaptor-java / src / main / java / org / oran / dmaapadapter / tasks / ProducerRegstrationTask.java
index b9a50b3..837ca32 100644 (file)
@@ -22,11 +22,12 @@ package org.oran.dmaapadapter.tasks;
 
 import com.google.gson.JsonParser;
 
+import lombok.Getter;
+
 import org.oran.dmaapadapter.clients.AsyncRestClient;
 import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
 import org.oran.dmaapadapter.configuration.ApplicationConfig;
 import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
-import org.oran.dmaapadapter.exceptions.ServiceException;
 import org.oran.dmaapadapter.r1.ProducerInfoTypeInfo;
 import org.oran.dmaapadapter.r1.ProducerRegistrationInfo;
 import org.oran.dmaapadapter.repository.InfoType;
@@ -56,6 +57,7 @@ public class ProducerRegstrationTask {
     private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
 
     private static final String PRODUCER_ID = "DmaapGenericInfoProducer";
+    @Getter
     private boolean isRegisteredInEcs = false;
     private static final int REGISTRATION_SUPERVISION_INTERVAL_MS = 1000 * 5;
 
@@ -68,47 +70,58 @@ public class ProducerRegstrationTask {
 
     @Scheduled(fixedRate = REGISTRATION_SUPERVISION_INTERVAL_MS)
     public void supervisionTask() {
-        logger.debug("Checking producers starting");
-        createTask().subscribe(null, null, () -> logger.debug("Producer registration completed"));
+        checkRegistration() //
+                .filter(isRegisterred -> !isRegisterred) //
+                .flatMap(isRegisterred -> registerTypesAndProducer()) //
+                .subscribe( //
+                        null, //
+                        this::handleRegistrationFailure, //
+                        this::handleRegistrationCompleted);
     }
 
-    public Mono<Object> createTask() {
-        return checkProducerRegistration() //
-                .doOnError(t -> isRegisteredInEcs = false) //
-                .onErrorResume(t -> registerTypesAndProducer());
+    private void handleRegistrationCompleted() {
+        logger.debug("Registering types and producer succeeded");
+        isRegisteredInEcs = true;
     }
 
-    public boolean isRegisteredInEcs() {
-        return this.isRegisteredInEcs;
+    private void handleRegistrationFailure(Throwable t) {
+        logger.warn("Registration failed {}", t.getMessage());
+        isRegisteredInEcs = false;
     }
 
-    private Mono<Object> checkProducerRegistration() {
+    private Mono<Boolean> checkRegistration() {
         final String url = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
         return restClient.get(url) //
-                .flatMap(this::checkRegistrationInfo) //
-        ;
+                .flatMap(this::isRegisterredInfoCorrect) //
+                .onErrorResume(t -> Mono.just(Boolean.FALSE));
+    }
+
+    private Mono<Boolean> isRegisterredInfoCorrect(String registerredInfoStr) {
+        ProducerRegistrationInfo registerredInfo = gson.fromJson(registerredInfoStr, ProducerRegistrationInfo.class);
+        if (isEqual(producerRegistrationInfo(), registerredInfo)) {
+            logger.trace("Already registered");
+            return Mono.just(Boolean.TRUE);
+        } else {
+            return Mono.just(Boolean.FALSE);
+        }
     }
 
     private String registerTypeUrl(InfoType type) {
-        String url = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId();
-        return url;
+        return applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId();
     }
 
     private Mono<String> registerTypesAndProducer() {
+        final int CONCURRENCY = 20;
         final String producerUrl =
                 applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
 
         return Flux.fromIterable(this.types.getAll()) //
                 .doOnNext(type -> logger.info("Registering type {}", type.getId())) //
-                .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo()))) //
+                .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo())),
+                        CONCURRENCY) //
                 .collectList() //
-                .flatMap(resp -> restClient.put(producerUrl, gson.toJson(producerRegistrationInfo()))) //
-                .onErrorResume(t -> {
-                    logger.warn("Registration failed {}", t.getMessage());
-                    isRegisteredInEcs = false;
-                    return Mono.empty();
-                }) //
-                .doOnNext(x -> logger.debug("Registering types and producer completed"));
+                .doOnNext(type -> logger.info("Registering producer")) //
+                .flatMap(resp -> restClient.put(producerUrl, gson.toJson(producerRegistrationInfo())));
     }
 
     private Object typeSpecifcInfoObject() {
@@ -138,17 +151,6 @@ public class ProducerRegstrationTask {
         }
     }
 
-    private Mono<String> checkRegistrationInfo(String resp) {
-        ProducerRegistrationInfo info = gson.fromJson(resp, ProducerRegistrationInfo.class);
-        if (isEqual(producerRegistrationInfo(), info)) {
-            logger.debug("Already registered");
-            this.isRegisteredInEcs = true;
-            return Mono.empty();
-        } else {
-            return Mono.error(new ServiceException("Producer registration will be started"));
-        }
-    }
-
     private boolean isEqual(ProducerRegistrationInfo a, ProducerRegistrationInfo b) {
         return a.jobCallbackUrl.equals(b.jobCallbackUrl) //
                 && a.producerSupervisionCallbackUrl.equals(b.producerSupervisionCallbackUrl) //
@@ -160,7 +162,7 @@ public class ProducerRegstrationTask {
         return ProducerRegistrationInfo.builder() //
                 .jobCallbackUrl(baseUrl() + ProducerCallbacksController.JOB_URL) //
                 .producerSupervisionCallbackUrl(baseUrl() + ProducerCallbacksController.SUPERVISION_URL) //
-                .supportedTypeIds(types.typeIds()) //
+                .supportedTypeIds(this.types.typeIds()) //
                 .build();
     }