NONRTRIC - Implement DMaaP mediator producer service in Java
[nonrtric.git] / enrichment-coordinator-service / src / main / java / org / oransc / enrichment / repository / InfoTypeSubscriptions.java
index 2d6da4f..533199f 100644 (file)
@@ -47,6 +47,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.http.HttpStatus;
 import org.springframework.util.FileSystemUtils;
 
 import reactor.core.publisher.Flux;
@@ -118,7 +119,7 @@ public class InfoTypeSubscriptions {
     public synchronized SubscriptionInfo getSubscription(String id) throws ServiceException {
         SubscriptionInfo p = allSubscriptions.get(id);
         if (p == null) {
-            throw new ServiceException("Could not find Information subscription: " + id);
+            throw new ServiceException("Could not find Information subscription: " + id, HttpStatus.NOT_FOUND);
         }
         return p;
     }
@@ -196,7 +197,8 @@ public class InfoTypeSubscriptions {
 
                 private Mono<String> error() {
                     return Mono.error(new ServiceException(
-                        "No notifyTypeRegistered handler found for interface version " + apiVersion));
+                        "No notifyTypeRegistered handler found for interface version " + apiVersion,
+                        HttpStatus.INTERNAL_SERVER_ERROR));
                 }
             };
         }
@@ -220,8 +222,7 @@ public class InfoTypeSubscriptions {
     private Mono<String> notifySubscriber(Function<? super SubscriptionInfo, Mono<String>> notifyFunc,
         SubscriptionInfo subscriptionInfo) {
         Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1));
-        return Mono.just(1) //
-            .flatMap(notUsed -> notifyFunc.apply(subscriptionInfo)) //
+        return notifyFunc.apply(subscriptionInfo) //
             .retryWhen(retrySpec) //
             .onErrorResume(throwable -> {
                 logger.warn("Consumer callback failed {}, removing subscription {}", throwable.getMessage(),