NONRTRIC - Enrichment Coordinator Service, making type availability subscriptions...
[nonrtric.git] / enrichment-coordinator-service / src / main / java / org / oransc / enrichment / repository / InfoTypeSubscriptions.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2019 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oransc.enrichment.repository;
22
23 import com.google.gson.Gson;
24 import com.google.gson.GsonBuilder;
25
26 import java.io.File;
27 import java.io.FileOutputStream;
28 import java.io.IOException;
29 import java.io.PrintStream;
30 import java.lang.invoke.MethodHandles;
31 import java.nio.file.Files;
32 import java.nio.file.Path;
33 import java.nio.file.Paths;
34 import java.time.Duration;
35 import java.util.Collection;
36 import java.util.HashMap;
37 import java.util.Map;
38 import java.util.Vector;
39 import java.util.function.Function;
40
41 import lombok.Builder;
42 import lombok.Getter;
43
44 import org.oransc.enrichment.configuration.ApplicationConfig;
45 import org.oransc.enrichment.exceptions.ServiceException;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48 import org.springframework.beans.factory.annotation.Autowired;
49 import org.springframework.context.annotation.Configuration;
50 import org.springframework.util.FileSystemUtils;
51
52 import reactor.core.publisher.Flux;
53 import reactor.core.publisher.Mono;
54 import reactor.util.retry.Retry;
55
56 /**
57  * Subscriptions of callbacks for type registrations
58  */
59 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
60 @Configuration
61 public class InfoTypeSubscriptions {
62     private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
63     private final Map<String, SubscriptionInfo> allSubscriptions = new HashMap<>();
64     private final MultiMap<SubscriptionInfo> subscriptionsByOwner = new MultiMap<>();
65     private final Gson gson = new GsonBuilder().create();
66     private final ApplicationConfig config;
67     private final Map<String, ConsumerCallbackHandler> callbackHandlers = new HashMap<>();
68
69     public interface ConsumerCallbackHandler {
70         Mono<String> notifyTypeRegistered(InfoType type, SubscriptionInfo subscriptionInfo);
71
72         Mono<String> notifyTypeRemoved(InfoType type, SubscriptionInfo subscriptionInfo);
73     }
74
75     @Builder
76     @Getter
77     public static class SubscriptionInfo {
78         private String id;
79
80         private String callbackUrl;
81
82         private String owner;
83
84         private String apiVersion;
85     }
86
87     public InfoTypeSubscriptions(@Autowired ApplicationConfig config) {
88         this.config = config;
89
90         try {
91             this.restoreFromDatabase();
92         } catch (IOException e) {
93             logger.error("Could not restore info type subscriptions from database {}", this.getDatabaseDirectory());
94         }
95     }
96
97     public void registerCallbackhandler(ConsumerCallbackHandler handler, String apiVersion) {
98         callbackHandlers.put(apiVersion, handler);
99     }
100
101     public synchronized void put(SubscriptionInfo subscription) {
102         allSubscriptions.put(subscription.getId(), subscription);
103         subscriptionsByOwner.put(subscription.owner, subscription.id, subscription);
104         storeInFile(subscription);
105         logger.debug("Added type status subscription {}", subscription.id);
106     }
107
108     public synchronized Collection<SubscriptionInfo> getAllSubscriptions() {
109         return new Vector<>(allSubscriptions.values());
110     }
111
112     /**
113      * Get a subscription and throw if not fond.
114      * 
115      * @param id the ID of the subscription to get.
116      * @return SubscriptionInfo
117      * @throws ServiceException if not found
118      */
119     public synchronized SubscriptionInfo getSubscription(String id) throws ServiceException {
120         SubscriptionInfo p = allSubscriptions.get(id);
121         if (p == null) {
122             throw new ServiceException("Could not find Information subscription: " + id);
123         }
124         return p;
125     }
126
127     /**
128      * Get a subscription or return null if not found. Equivalent to get in all java
129      * collections.
130      * 
131      * @param id the ID of the subscription to get.
132      * @return SubscriptionInfo
133      */
134     public synchronized SubscriptionInfo get(String id) {
135         return allSubscriptions.get(id);
136     }
137
138     public synchronized int size() {
139         return allSubscriptions.size();
140     }
141
142     public synchronized void clear() {
143         allSubscriptions.clear();
144         subscriptionsByOwner.clear();
145         clearDatabase();
146     }
147
148     public void remove(SubscriptionInfo subscription) {
149         allSubscriptions.remove(subscription.getId());
150         subscriptionsByOwner.remove(subscription.owner, subscription.id);
151
152         try {
153             Files.delete(getPath(subscription));
154         } catch (Exception e) {
155             logger.debug("Could not delete subscription from database: {}", e.getMessage());
156         }
157
158         logger.debug("Removed type status subscription {}", subscription.id);
159     }
160
161     /**
162      * returns all subscriptions for an owner. The colllection can contain 0..n
163      * subscriptions.
164      *
165      * @param owner
166      * @return
167      */
168     public synchronized Collection<SubscriptionInfo> getSubscriptionsForOwner(String owner) {
169         return subscriptionsByOwner.get(owner);
170     }
171
172     public synchronized void notifyTypeRegistered(InfoType type) {
173         notifyAllSubscribers(
174             subscription -> getCallbacksHandler(subscription.apiVersion).notifyTypeRegistered(type, subscription));
175     }
176
177     public synchronized void notifyTypeRemoved(InfoType type) {
178         notifyAllSubscribers(
179             subscription -> getCallbacksHandler(subscription.apiVersion).notifyTypeRemoved(type, subscription));
180     }
181
182     private ConsumerCallbackHandler getCallbacksHandler(String apiVersion) {
183         ConsumerCallbackHandler callbackHandler = this.callbackHandlers.get(apiVersion);
184         if (callbackHandler != null) {
185             return callbackHandler;
186         } else {
187             return new ConsumerCallbackHandler() {
188                 @Override
189                 public Mono<String> notifyTypeRegistered(InfoType type, SubscriptionInfo subscriptionInfo) {
190                     return error();
191                 }
192
193                 @Override
194                 public Mono<String> notifyTypeRemoved(InfoType type, SubscriptionInfo subscriptionInfo) {
195                     return error();
196                 }
197
198                 private Mono<String> error() {
199                     return Mono.error(new ServiceException(
200                         "No notifyTypeRegistered handler found for interface version " + apiVersion));
201                 }
202             };
203         }
204     }
205
206     private synchronized void notifyAllSubscribers(Function<? super SubscriptionInfo, Mono<String>> notifyFunc) {
207         final int MAX_CONCURRENCY = 5;
208         Flux.fromIterable(allSubscriptions.values()) //
209             .flatMap(subscription -> notifySubscriber(notifyFunc, subscription), MAX_CONCURRENCY) //
210             .subscribe();
211     }
212
213     /**
214      * Invoking one consumer. If the call fails after retries, the subscription is
215      * removed.
216      * 
217      * @param notifyFunc
218      * @param subscriptionInfo
219      * @return
220      */
221     private Mono<String> notifySubscriber(Function<? super SubscriptionInfo, Mono<String>> notifyFunc,
222         SubscriptionInfo subscriptionInfo) {
223         Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1));
224         return Mono.just(1) //
225             .flatMap(notUsed -> notifyFunc.apply(subscriptionInfo)) //
226             .retryWhen(retrySpec) //
227             .onErrorResume(throwable -> {
228                 logger.warn("Consumer callback failed {}, removing subscription {}", throwable.getMessage(),
229                     subscriptionInfo.id);
230                 this.remove(subscriptionInfo);
231                 return Mono.empty();
232             }); //
233     }
234
235     private void clearDatabase() {
236         try {
237             FileSystemUtils.deleteRecursively(Path.of(getDatabaseDirectory()));
238             Files.createDirectories(Paths.get(getDatabaseDirectory()));
239         } catch (IOException e) {
240             logger.warn("Could not delete database : {}", e.getMessage());
241         }
242     }
243
244     private void storeInFile(SubscriptionInfo subscription) {
245         try {
246             try (PrintStream out = new PrintStream(new FileOutputStream(getFile(subscription)))) {
247                 String json = gson.toJson(subscription);
248                 out.print(json);
249             }
250         } catch (Exception e) {
251             logger.warn("Could not save subscription: {} {}", subscription.getId(), e.getMessage());
252         }
253     }
254
255     public synchronized void restoreFromDatabase() throws IOException {
256         Files.createDirectories(Paths.get(getDatabaseDirectory()));
257         File dbDir = new File(getDatabaseDirectory());
258
259         for (File file : dbDir.listFiles()) {
260             String json = Files.readString(file.toPath());
261             SubscriptionInfo subscription = gson.fromJson(json, SubscriptionInfo.class);
262             this.allSubscriptions.put(subscription.getId(), subscription);
263         }
264     }
265
266     private File getFile(SubscriptionInfo subscription) {
267         return getPath(subscription).toFile();
268     }
269
270     private Path getPath(SubscriptionInfo subscription) {
271         return getPath(subscription.getId());
272     }
273
274     private Path getPath(String subscriptionId) {
275         return Path.of(getDatabaseDirectory(), subscriptionId);
276     }
277
278     private String getDatabaseDirectory() {
279         return config.getVardataDirectory() + "/database/infotypesubscriptions";
280     }
281
282 }