2 * ========================LICENSE_START=================================
5 * Copyright (C) 2019 Nordix Foundation
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ========================LICENSE_END===================================
21 package org.oransc.enrichment.repository;
23 import com.google.gson.Gson;
24 import com.google.gson.GsonBuilder;
27 import java.io.FileOutputStream;
28 import java.io.IOException;
29 import java.io.PrintStream;
30 import java.lang.invoke.MethodHandles;
31 import java.nio.file.Files;
32 import java.nio.file.Path;
33 import java.nio.file.Paths;
34 import java.time.Duration;
35 import java.util.Collection;
36 import java.util.HashMap;
38 import java.util.Vector;
39 import java.util.function.Function;
41 import lombok.Builder;
44 import org.oransc.enrichment.configuration.ApplicationConfig;
45 import org.oransc.enrichment.exceptions.ServiceException;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48 import org.springframework.beans.factory.annotation.Autowired;
49 import org.springframework.context.annotation.Configuration;
50 import org.springframework.util.FileSystemUtils;
52 import reactor.core.publisher.Flux;
53 import reactor.core.publisher.Mono;
54 import reactor.util.retry.Retry;
57 * Subscriptions of callbacks for type registrations
59 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
61 public class InfoTypeSubscriptions {
62 private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
63 private final Map<String, SubscriptionInfo> allSubscriptions = new HashMap<>();
64 private final MultiMap<SubscriptionInfo> subscriptionsByOwner = new MultiMap<>();
65 private final Gson gson = new GsonBuilder().create();
66 private final ApplicationConfig config;
67 private final Map<String, ConsumerCallbackHandler> callbackHandlers = new HashMap<>();
69 public interface ConsumerCallbackHandler {
70 Mono<String> notifyTypeRegistered(InfoType type, SubscriptionInfo subscriptionInfo);
72 Mono<String> notifyTypeRemoved(InfoType type, SubscriptionInfo subscriptionInfo);
77 public static class SubscriptionInfo {
80 private String callbackUrl;
84 private String apiVersion;
87 public InfoTypeSubscriptions(@Autowired ApplicationConfig config) {
91 this.restoreFromDatabase();
92 } catch (IOException e) {
93 logger.error("Could not restore info type subscriptions from database {}", this.getDatabaseDirectory());
97 public void registerCallbackhandler(ConsumerCallbackHandler handler, String apiVersion) {
98 callbackHandlers.put(apiVersion, handler);
101 public synchronized void put(SubscriptionInfo subscription) {
103 storeInFile(subscription);
104 logger.debug("Added type status subscription {}", subscription.id);
107 public synchronized Collection<SubscriptionInfo> getAllSubscriptions() {
108 return new Vector<>(allSubscriptions.values());
112 * Get a subscription and throw if not fond.
114 * @param id the ID of the subscription to get.
115 * @return SubscriptionInfo
116 * @throws ServiceException if not found
118 public synchronized SubscriptionInfo getSubscription(String id) throws ServiceException {
119 SubscriptionInfo p = allSubscriptions.get(id);
121 throw new ServiceException("Could not find Information subscription: " + id);
127 * Get a subscription or return null if not found. Equivalent to get in all java
130 * @param id the ID of the subscription to get.
131 * @return SubscriptionInfo
133 public synchronized SubscriptionInfo get(String id) {
134 return allSubscriptions.get(id);
137 public synchronized int size() {
138 return allSubscriptions.size();
141 public synchronized void clear() {
142 allSubscriptions.clear();
143 subscriptionsByOwner.clear();
147 public void remove(SubscriptionInfo subscription) {
148 allSubscriptions.remove(subscription.getId());
149 subscriptionsByOwner.remove(subscription.owner, subscription.id);
152 Files.delete(getPath(subscription));
153 } catch (Exception e) {
154 logger.debug("Could not delete subscription from database: {}", e.getMessage());
157 logger.debug("Removed type status subscription {}", subscription.id);
161 * returns all subscriptions for an owner. The colllection can contain 0..n
167 public synchronized Collection<SubscriptionInfo> getSubscriptionsForOwner(String owner) {
168 return subscriptionsByOwner.get(owner);
171 public synchronized void notifyTypeRegistered(InfoType type) {
172 notifyAllSubscribers(
173 subscription -> getCallbacksHandler(subscription.apiVersion).notifyTypeRegistered(type, subscription));
176 public synchronized void notifyTypeRemoved(InfoType type) {
177 notifyAllSubscribers(
178 subscription -> getCallbacksHandler(subscription.apiVersion).notifyTypeRemoved(type, subscription));
181 private ConsumerCallbackHandler getCallbacksHandler(String apiVersion) {
182 ConsumerCallbackHandler callbackHandler = this.callbackHandlers.get(apiVersion);
183 if (callbackHandler != null) {
184 return callbackHandler;
186 return new ConsumerCallbackHandler() {
188 public Mono<String> notifyTypeRegistered(InfoType type, SubscriptionInfo subscriptionInfo) {
193 public Mono<String> notifyTypeRemoved(InfoType type, SubscriptionInfo subscriptionInfo) {
197 private Mono<String> error() {
198 return Mono.error(new ServiceException(
199 "No notifyTypeRegistered handler found for interface version " + apiVersion));
205 private synchronized void notifyAllSubscribers(Function<? super SubscriptionInfo, Mono<String>> notifyFunc) {
206 final int MAX_CONCURRENCY = 5;
207 Flux.fromIterable(allSubscriptions.values()) //
208 .flatMap(subscription -> notifySubscriber(notifyFunc, subscription), MAX_CONCURRENCY) //
213 * Invoking one consumer. If the call fails after retries, the subscription is
217 * @param subscriptionInfo
220 private Mono<String> notifySubscriber(Function<? super SubscriptionInfo, Mono<String>> notifyFunc,
221 SubscriptionInfo subscriptionInfo) {
222 Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1));
223 return Mono.just(1) //
224 .flatMap(notUsed -> notifyFunc.apply(subscriptionInfo)) //
225 .retryWhen(retrySpec) //
226 .onErrorResume(throwable -> {
227 logger.warn("Consumer callback failed {}, removing subscription {}", throwable.getMessage(),
228 subscriptionInfo.id);
229 this.remove(subscriptionInfo);
234 private void clearDatabase() {
236 FileSystemUtils.deleteRecursively(Path.of(getDatabaseDirectory()));
237 Files.createDirectories(Paths.get(getDatabaseDirectory()));
238 } catch (IOException e) {
239 logger.warn("Could not delete database : {}", e.getMessage());
243 private void storeInFile(SubscriptionInfo subscription) {
245 try (PrintStream out = new PrintStream(new FileOutputStream(getFile(subscription)))) {
246 String json = gson.toJson(subscription);
249 } catch (Exception e) {
250 logger.warn("Could not save subscription: {} {}", subscription.getId(), e.getMessage());
254 public synchronized void restoreFromDatabase() throws IOException {
255 Files.createDirectories(Paths.get(getDatabaseDirectory()));
256 File dbDir = new File(getDatabaseDirectory());
258 for (File file : dbDir.listFiles()) {
259 String json = Files.readString(file.toPath());
260 SubscriptionInfo subscription = gson.fromJson(json, SubscriptionInfo.class);
265 private void doPut(SubscriptionInfo subscription) {
266 allSubscriptions.put(subscription.getId(), subscription);
267 subscriptionsByOwner.put(subscription.owner, subscription.id, subscription);
270 private File getFile(SubscriptionInfo subscription) {
271 return getPath(subscription).toFile();
274 private Path getPath(SubscriptionInfo subscription) {
275 return getPath(subscription.getId());
278 private Path getPath(String subscriptionId) {
279 return Path.of(getDatabaseDirectory(), subscriptionId);
282 private String getDatabaseDirectory() {
283 return config.getVardataDirectory() + "/database/infotypesubscriptions";