import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
storeInFile(subscription);
logger.debug("Added type status subscription {}", subscription.id);
}
storeInFile(subscription);
logger.debug("Added type status subscription {}", subscription.id);
}
public synchronized SubscriptionInfo getSubscription(String id) throws ServiceException {
SubscriptionInfo p = allSubscriptions.get(id);
if (p == null) {
public synchronized SubscriptionInfo getSubscription(String id) throws ServiceException {
SubscriptionInfo p = allSubscriptions.get(id);
if (p == null) {
private Mono<String> notifySubscriber(Function<? super SubscriptionInfo, Mono<String>> notifyFunc,
SubscriptionInfo subscriptionInfo) {
Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1));
private Mono<String> notifySubscriber(Function<? super SubscriptionInfo, Mono<String>> notifyFunc,
SubscriptionInfo subscriptionInfo) {
Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1));
.retryWhen(retrySpec) //
.onErrorResume(throwable -> {
logger.warn("Consumer callback failed {}, removing subscription {}", throwable.getMessage(),
.retryWhen(retrySpec) //
.onErrorResume(throwable -> {
logger.warn("Consumer callback failed {}, removing subscription {}", throwable.getMessage(),
for (File file : dbDir.listFiles()) {
String json = Files.readString(file.toPath());
SubscriptionInfo subscription = gson.fromJson(json, SubscriptionInfo.class);
for (File file : dbDir.listFiles()) {
String json = Files.readString(file.toPath());
SubscriptionInfo subscription = gson.fromJson(json, SubscriptionInfo.class);
+ private void doPut(SubscriptionInfo subscription) {
+ allSubscriptions.put(subscription.getId(), subscription);
+ subscriptionsByOwner.put(subscription.owner, subscription.id, subscription);
+ }
+