+ /**
+ * Invoking one consumer. If the call fails after retries, the subscription is
+ * removed.
+ *
+ * @param notifyFunc
+ * @param subscriptionInfo
+ * @return
+ */
+ private Mono<String> notifySubscriber(Function<? super SubscriptionInfo, Mono<String>> notifyFunc,
+ SubscriptionInfo subscriptionInfo) {
+ Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1));
+ return Mono.just(1) //
+ .flatMap(notUsed -> notifyFunc.apply(subscriptionInfo)) //
+ .retryWhen(retrySpec) //
+ .onErrorResume(throwable -> {
+ logger.warn("Consumer callback failed {}, removing subscription {}", throwable.getMessage(),
+ subscriptionInfo.id);
+ this.remove(subscriptionInfo);
+ return Mono.empty();
+ }); //
+ }
+
+ private void clearDatabase() {
+ try {
+ FileSystemUtils.deleteRecursively(Path.of(getDatabaseDirectory()));
+ Files.createDirectories(Paths.get(getDatabaseDirectory()));
+ } catch (IOException e) {
+ logger.warn("Could not delete database : {}", e.getMessage());
+ }
+ }
+
+ private void storeInFile(SubscriptionInfo subscription) {
+ try {
+ try (PrintStream out = new PrintStream(new FileOutputStream(getFile(subscription)))) {
+ String json = gson.toJson(subscription);
+ out.print(json);
+ }
+ } catch (Exception e) {
+ logger.warn("Could not save subscription: {} {}", subscription.getId(), e.getMessage());
+ }
+ }
+
+ public synchronized void restoreFromDatabase() throws IOException {
+ Files.createDirectories(Paths.get(getDatabaseDirectory()));
+ File dbDir = new File(getDatabaseDirectory());
+
+ for (File file : dbDir.listFiles()) {
+ String json = Files.readString(file.toPath());
+ SubscriptionInfo subscription = gson.fromJson(json, SubscriptionInfo.class);
+ this.allSubscriptions.put(subscription.getId(), subscription);
+ }
+ }
+
+ private File getFile(SubscriptionInfo subscription) {
+ return getPath(subscription).toFile();
+ }
+
+ private Path getPath(SubscriptionInfo subscription) {
+ return getPath(subscription.getId());
+ }
+
+ private Path getPath(String subscriptionId) {
+ return Path.of(getDatabaseDirectory(), subscriptionId);
+ }
+
+ private String getDatabaseDirectory() {
+ return config.getVardataDirectory() + "/database/infotypesubscriptions";
+ }
+