private Mono<String> notifySubscriber(Function<? super SubscriptionInfo, Mono<String>> notifyFunc,
SubscriptionInfo subscriptionInfo) {
Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1));
private Mono<String> notifySubscriber(Function<? super SubscriptionInfo, Mono<String>> notifyFunc,
SubscriptionInfo subscriptionInfo) {
Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1));
.retryWhen(retrySpec) //
.onErrorResume(throwable -> {
logger.warn("Consumer callback failed {}, removing subscription {}", throwable.getMessage(),
.retryWhen(retrySpec) //
.onErrorResume(throwable -> {
logger.warn("Consumer callback failed {}, removing subscription {}", throwable.getMessage(),