2 * ========================LICENSE_START=================================
5 * Copyright (C) 2019 Nordix Foundation
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ========================LICENSE_END===================================
21 package org.oransc.enrichment.repository;
23 import com.google.gson.Gson;
24 import com.google.gson.GsonBuilder;
27 import java.io.FileOutputStream;
28 import java.io.IOException;
29 import java.io.PrintStream;
30 import java.lang.invoke.MethodHandles;
31 import java.nio.file.Files;
32 import java.nio.file.Path;
33 import java.nio.file.Paths;
34 import java.time.Duration;
35 import java.util.Collection;
36 import java.util.HashMap;
38 import java.util.Vector;
39 import java.util.function.Function;
41 import lombok.Builder;
44 import org.oransc.enrichment.configuration.ApplicationConfig;
45 import org.oransc.enrichment.exceptions.ServiceException;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48 import org.springframework.beans.factory.annotation.Autowired;
49 import org.springframework.context.annotation.Configuration;
50 import org.springframework.http.HttpStatus;
51 import org.springframework.util.FileSystemUtils;
53 import reactor.core.publisher.Flux;
54 import reactor.core.publisher.Mono;
55 import reactor.util.retry.Retry;
58 * Subscriptions of callbacks for type registrations
60 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
62 public class InfoTypeSubscriptions {
63 private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
64 private final Map<String, SubscriptionInfo> allSubscriptions = new HashMap<>();
65 private final MultiMap<SubscriptionInfo> subscriptionsByOwner = new MultiMap<>();
66 private final Gson gson = new GsonBuilder().create();
67 private final ApplicationConfig config;
68 private final Map<String, ConsumerCallbackHandler> callbackHandlers = new HashMap<>();
70 public interface ConsumerCallbackHandler {
71 Mono<String> notifyTypeRegistered(InfoType type, SubscriptionInfo subscriptionInfo);
73 Mono<String> notifyTypeRemoved(InfoType type, SubscriptionInfo subscriptionInfo);
78 public static class SubscriptionInfo {
81 private String callbackUrl;
85 private String apiVersion;
88 public InfoTypeSubscriptions(@Autowired ApplicationConfig config) {
92 this.restoreFromDatabase();
93 } catch (IOException e) {
94 logger.error("Could not restore info type subscriptions from database {}", this.getDatabaseDirectory());
98 public void registerCallbackhandler(ConsumerCallbackHandler handler, String apiVersion) {
99 callbackHandlers.put(apiVersion, handler);
102 public synchronized void put(SubscriptionInfo subscription) {
104 storeInFile(subscription);
105 logger.debug("Added type status subscription {}", subscription.id);
108 public synchronized Collection<SubscriptionInfo> getAllSubscriptions() {
109 return new Vector<>(allSubscriptions.values());
113 * Get a subscription and throw if not fond.
115 * @param id the ID of the subscription to get.
116 * @return SubscriptionInfo
117 * @throws ServiceException if not found
119 public synchronized SubscriptionInfo getSubscription(String id) throws ServiceException {
120 SubscriptionInfo p = allSubscriptions.get(id);
122 throw new ServiceException("Could not find Information subscription: " + id, HttpStatus.NOT_FOUND);
128 * Get a subscription or return null if not found. Equivalent to get in all java
131 * @param id the ID of the subscription to get.
132 * @return SubscriptionInfo
134 public synchronized SubscriptionInfo get(String id) {
135 return allSubscriptions.get(id);
138 public synchronized int size() {
139 return allSubscriptions.size();
142 public synchronized void clear() {
143 allSubscriptions.clear();
144 subscriptionsByOwner.clear();
148 public void remove(SubscriptionInfo subscription) {
149 allSubscriptions.remove(subscription.getId());
150 subscriptionsByOwner.remove(subscription.owner, subscription.id);
153 Files.delete(getPath(subscription));
154 } catch (Exception e) {
155 logger.debug("Could not delete subscription from database: {}", e.getMessage());
158 logger.debug("Removed type status subscription {}", subscription.id);
162 * returns all subscriptions for an owner. The colllection can contain 0..n
168 public synchronized Collection<SubscriptionInfo> getSubscriptionsForOwner(String owner) {
169 return subscriptionsByOwner.get(owner);
172 public synchronized void notifyTypeRegistered(InfoType type) {
173 notifyAllSubscribers(
174 subscription -> getCallbacksHandler(subscription.apiVersion).notifyTypeRegistered(type, subscription));
177 public synchronized void notifyTypeRemoved(InfoType type) {
178 notifyAllSubscribers(
179 subscription -> getCallbacksHandler(subscription.apiVersion).notifyTypeRemoved(type, subscription));
182 private ConsumerCallbackHandler getCallbacksHandler(String apiVersion) {
183 ConsumerCallbackHandler callbackHandler = this.callbackHandlers.get(apiVersion);
184 if (callbackHandler != null) {
185 return callbackHandler;
187 return new ConsumerCallbackHandler() {
189 public Mono<String> notifyTypeRegistered(InfoType type, SubscriptionInfo subscriptionInfo) {
194 public Mono<String> notifyTypeRemoved(InfoType type, SubscriptionInfo subscriptionInfo) {
198 private Mono<String> error() {
199 return Mono.error(new ServiceException(
200 "No notifyTypeRegistered handler found for interface version " + apiVersion,
201 HttpStatus.INTERNAL_SERVER_ERROR));
207 private synchronized void notifyAllSubscribers(Function<? super SubscriptionInfo, Mono<String>> notifyFunc) {
208 final int MAX_CONCURRENCY = 5;
209 Flux.fromIterable(allSubscriptions.values()) //
210 .flatMap(subscription -> notifySubscriber(notifyFunc, subscription), MAX_CONCURRENCY) //
215 * Invoking one consumer. If the call fails after retries, the subscription is
219 * @param subscriptionInfo
222 private Mono<String> notifySubscriber(Function<? super SubscriptionInfo, Mono<String>> notifyFunc,
223 SubscriptionInfo subscriptionInfo) {
224 Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1));
225 return Mono.just(1) //
226 .flatMap(notUsed -> notifyFunc.apply(subscriptionInfo)) //
227 .retryWhen(retrySpec) //
228 .onErrorResume(throwable -> {
229 logger.warn("Consumer callback failed {}, removing subscription {}", throwable.getMessage(),
230 subscriptionInfo.id);
231 this.remove(subscriptionInfo);
236 private void clearDatabase() {
238 FileSystemUtils.deleteRecursively(Path.of(getDatabaseDirectory()));
239 Files.createDirectories(Paths.get(getDatabaseDirectory()));
240 } catch (IOException e) {
241 logger.warn("Could not delete database : {}", e.getMessage());
245 private void storeInFile(SubscriptionInfo subscription) {
247 try (PrintStream out = new PrintStream(new FileOutputStream(getFile(subscription)))) {
248 String json = gson.toJson(subscription);
251 } catch (Exception e) {
252 logger.warn("Could not save subscription: {} {}", subscription.getId(), e.getMessage());
256 public synchronized void restoreFromDatabase() throws IOException {
257 Files.createDirectories(Paths.get(getDatabaseDirectory()));
258 File dbDir = new File(getDatabaseDirectory());
260 for (File file : dbDir.listFiles()) {
261 String json = Files.readString(file.toPath());
262 SubscriptionInfo subscription = gson.fromJson(json, SubscriptionInfo.class);
267 private void doPut(SubscriptionInfo subscription) {
268 allSubscriptions.put(subscription.getId(), subscription);
269 subscriptionsByOwner.put(subscription.owner, subscription.id, subscription);
272 private File getFile(SubscriptionInfo subscription) {
273 return getPath(subscription).toFile();
276 private Path getPath(SubscriptionInfo subscription) {
277 return getPath(subscription.getId());
280 private Path getPath(String subscriptionId) {
281 return Path.of(getDatabaseDirectory(), subscriptionId);
284 private String getDatabaseDirectory() {
285 return config.getVardataDirectory() + "/database/infotypesubscriptions";