NONRTRIC - Implement DMaaP mediator producer service in Java
[nonrtric.git] / dmaap-adaptor-java / src / main / java / org / oran / dmaapadapter / tasks / ProducerRegstrationTask.java
index c9284b5..ec3f2b2 100644 (file)
@@ -42,6 +42,7 @@ import org.oran.dmaapadapter.repository.InfoTypes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
 import org.springframework.scheduling.annotation.EnableScheduling;
 import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
@@ -77,14 +78,17 @@ public class ProducerRegstrationTask {
     }
 
     @Scheduled(fixedRate = REGISTRATION_SUPERVISION_INTERVAL_MS)
-    public void supervisionTask() {
-        checkRegistration() //
+    public void runSupervisionTask() {
+        supervisionTask().subscribe( //
+                null, //
+                this::handleRegistrationFailure, //
+                this::handleRegistrationCompleted);
+    }
+
+    public Mono<String> supervisionTask() {
+        return checkRegistration() //
                 .filter(isRegistrationOk -> !isRegistrationOk || !this.isRegisteredInIcs) //
-                .flatMap(isRegisterred -> registerTypesAndProducer()) //
-                .subscribe( //
-                        null, //
-                        this::handleRegistrationFailure, //
-                        this::handleRegistrationCompleted);
+                .flatMap(isRegisterred -> registerTypesAndProducer());
     }
 
     private void handleRegistrationCompleted() {
@@ -153,7 +157,7 @@ public class ProducerRegstrationTask {
         InputStream in = getClass().getResourceAsStream(filePath);
         logger.debug("Reading application schema file from: {} with: {}", filePath, in);
         if (in == null) {
-            throw new ServiceException("Could not readfile: " + filePath);
+            throw new ServiceException("Could not readfile: " + filePath, HttpStatus.INTERNAL_SERVER_ERROR);
         }
         return CharStreams.toString(new InputStreamReader(in, StandardCharsets.UTF_8));
     }