+ notifyAllSubscribers(subscription -> subscription.callback.notifyTypeRemoved(type, subscription));
+ }
+
+ private synchronized void notifyAllSubscribers(Function<? super SubscriptionInfo, Mono<String>> notifyFunc) {
+ final int CONCURRENCY = 5;
+ Flux.fromIterable(allSubscriptions.values()) //
+ .flatMap(notifyFunc::apply, CONCURRENCY) //
+ .onErrorResume(throwable -> {
+ logger.warn("Post failed for consumer callback {}", throwable.getMessage());
+ return Flux.empty();
+ }) //
+ .subscribe();