X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=enrichment-coordinator-service%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fenrichment%2Frepository%2FInfoTypeSubscriptions.java;h=7f73605d750d5d2eeff2318f82a4e2688c73146f;hb=c9b469c6f038725d47b9622c5d11495c2194f581;hp=bdf0e8e4ca884b6576b80459cd841732d5b1e615;hpb=383f47b1ae6e767d0bd8334f4f45e4cf83bd7c32;p=nonrtric.git diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypeSubscriptions.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypeSubscriptions.java index bdf0e8e4..7f73605d 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypeSubscriptions.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypeSubscriptions.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Vector; +import java.util.function.Function; import lombok.Builder; import lombok.Getter; @@ -34,6 +35,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + /** * Subscriptions of callbacks for type registrations */ @@ -45,9 +49,9 @@ public class InfoTypeSubscriptions { private final MultiMap subscriptionsByOwner = new MultiMap<>(); public interface Callbacks { - void notifyTypeRegistered(InfoType type, SubscriptionInfo subscriptionInfo); + Mono notifyTypeRegistered(InfoType type, SubscriptionInfo subscriptionInfo); - void notifyTypeRemoved(InfoType type, SubscriptionInfo subscriptionInfo); + Mono notifyTypeRemoved(InfoType type, SubscriptionInfo subscriptionInfo); } @Builder @@ -116,7 +120,7 @@ public class InfoTypeSubscriptions { /** * returns all subscriptions for an owner. The colllection can contain 0..n * subscriptions. - * + * * @param owner * @return */ @@ -125,11 +129,22 @@ public class InfoTypeSubscriptions { } public synchronized void notifyTypeRegistered(InfoType type) { - allSubscriptions.forEach((id, subscription) -> subscription.callback.notifyTypeRegistered(type, subscription)); + notifyAllSubscribers(subscription -> subscription.callback.notifyTypeRegistered(type, subscription)); } public synchronized void notifyTypeRemoved(InfoType type) { - allSubscriptions.forEach((id, subscription) -> subscription.callback.notifyTypeRemoved(type, subscription)); + notifyAllSubscribers(subscription -> subscription.callback.notifyTypeRemoved(type, subscription)); + } + + private synchronized void notifyAllSubscribers(Function> notifyFunc) { + final int CONCURRENCY = 5; + Flux.fromIterable(allSubscriptions.values()) // + .flatMap(notifyFunc::apply, CONCURRENCY) // + .onErrorResume(throwable -> { + logger.warn("Post failed for consumer callback {}", throwable.getMessage()); + return Flux.empty(); + }) // + .subscribe(); } }