X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=enrichment-coordinator-service%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fenrichment%2Frepository%2FInfoTypeSubscriptions.java;h=7f73605d750d5d2eeff2318f82a4e2688c73146f;hb=c9b469c6f038725d47b9622c5d11495c2194f581;hp=f54a8498c3e8c29efac6951518df14bb5e242dc5;hpb=366bc97828bf62e39a41318c1407a2c7c8cb5b74;p=nonrtric.git diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypeSubscriptions.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypeSubscriptions.java index f54a8498..7f73605d 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypeSubscriptions.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypeSubscriptions.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Vector; +import java.util.function.Function; import lombok.Builder; import lombok.Getter; @@ -34,6 +35,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + /** * Subscriptions of callbacks for type registrations */ @@ -45,9 +49,9 @@ public class InfoTypeSubscriptions { private final MultiMap subscriptionsByOwner = new MultiMap<>(); public interface Callbacks { - void notifyTypeRegistered(InfoType type, SubscriptionInfo subscriptionInfo); + Mono notifyTypeRegistered(InfoType type, SubscriptionInfo subscriptionInfo); - void notifyTypeRemoved(InfoType type, SubscriptionInfo subscriptionInfo); + Mono notifyTypeRemoved(InfoType type, SubscriptionInfo subscriptionInfo); } @Builder @@ -103,35 +107,44 @@ public class InfoTypeSubscriptions { } public synchronized void clear() { - this.allSubscriptions.clear(); - this.subscriptionsByOwner.clear(); + allSubscriptions.clear(); + subscriptionsByOwner.clear(); } public void remove(SubscriptionInfo subscription) { allSubscriptions.remove(subscription.getId()); - this.subscriptionsByOwner.remove(subscription.owner, subscription.id); + subscriptionsByOwner.remove(subscription.owner, subscription.id); logger.debug("Removed type status subscription {}", subscription.id); } /** * returns all subscriptions for an owner. The colllection can contain 0..n * subscriptions. - * + * * @param owner * @return */ public synchronized Collection getSubscriptionsForOwner(String owner) { - return this.subscriptionsByOwner.get(owner); + return subscriptionsByOwner.get(owner); } public synchronized void notifyTypeRegistered(InfoType type) { - this.allSubscriptions - .forEach((id, subscription) -> subscription.callback.notifyTypeRegistered(type, subscription)); + notifyAllSubscribers(subscription -> subscription.callback.notifyTypeRegistered(type, subscription)); } public synchronized void notifyTypeRemoved(InfoType type) { - this.allSubscriptions - .forEach((id, subscription) -> subscription.callback.notifyTypeRemoved(type, subscription)); + notifyAllSubscribers(subscription -> subscription.callback.notifyTypeRemoved(type, subscription)); + } + + private synchronized void notifyAllSubscribers(Function> notifyFunc) { + final int CONCURRENCY = 5; + Flux.fromIterable(allSubscriptions.values()) // + .flatMap(notifyFunc::apply, CONCURRENCY) // + .onErrorResume(throwable -> { + logger.warn("Post failed for consumer callback {}", throwable.getMessage()); + return Flux.empty(); + }) // + .subscribe(); } }