2d6da4f561414a8ecb7ba503732c1363d4c67e6c
[nonrtric.git] / enrichment-coordinator-service / src / main / java / org / oransc / enrichment / repository / InfoTypeSubscriptions.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2019 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oransc.enrichment.repository;
22
23 import com.google.gson.Gson;
24 import com.google.gson.GsonBuilder;
25
26 import java.io.File;
27 import java.io.FileOutputStream;
28 import java.io.IOException;
29 import java.io.PrintStream;
30 import java.lang.invoke.MethodHandles;
31 import java.nio.file.Files;
32 import java.nio.file.Path;
33 import java.nio.file.Paths;
34 import java.time.Duration;
35 import java.util.Collection;
36 import java.util.HashMap;
37 import java.util.Map;
38 import java.util.Vector;
39 import java.util.function.Function;
40
41 import lombok.Builder;
42 import lombok.Getter;
43
44 import org.oransc.enrichment.configuration.ApplicationConfig;
45 import org.oransc.enrichment.exceptions.ServiceException;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48 import org.springframework.beans.factory.annotation.Autowired;
49 import org.springframework.context.annotation.Configuration;
50 import org.springframework.util.FileSystemUtils;
51
52 import reactor.core.publisher.Flux;
53 import reactor.core.publisher.Mono;
54 import reactor.util.retry.Retry;
55
56 /**
57  * Subscriptions of callbacks for type registrations
58  */
59 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
60 @Configuration
61 public class InfoTypeSubscriptions {
62     private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
63     private final Map<String, SubscriptionInfo> allSubscriptions = new HashMap<>();
64     private final MultiMap<SubscriptionInfo> subscriptionsByOwner = new MultiMap<>();
65     private final Gson gson = new GsonBuilder().create();
66     private final ApplicationConfig config;
67     private final Map<String, ConsumerCallbackHandler> callbackHandlers = new HashMap<>();
68
69     public interface ConsumerCallbackHandler {
70         Mono<String> notifyTypeRegistered(InfoType type, SubscriptionInfo subscriptionInfo);
71
72         Mono<String> notifyTypeRemoved(InfoType type, SubscriptionInfo subscriptionInfo);
73     }
74
75     @Builder
76     @Getter
77     public static class SubscriptionInfo {
78         private String id;
79
80         private String callbackUrl;
81
82         private String owner;
83
84         private String apiVersion;
85     }
86
87     public InfoTypeSubscriptions(@Autowired ApplicationConfig config) {
88         this.config = config;
89
90         try {
91             this.restoreFromDatabase();
92         } catch (IOException e) {
93             logger.error("Could not restore info type subscriptions from database {}", this.getDatabaseDirectory());
94         }
95     }
96
97     public void registerCallbackhandler(ConsumerCallbackHandler handler, String apiVersion) {
98         callbackHandlers.put(apiVersion, handler);
99     }
100
101     public synchronized void put(SubscriptionInfo subscription) {
102         doPut(subscription);
103         storeInFile(subscription);
104         logger.debug("Added type status subscription {}", subscription.id);
105     }
106
107     public synchronized Collection<SubscriptionInfo> getAllSubscriptions() {
108         return new Vector<>(allSubscriptions.values());
109     }
110
111     /**
112      * Get a subscription and throw if not fond.
113      * 
114      * @param id the ID of the subscription to get.
115      * @return SubscriptionInfo
116      * @throws ServiceException if not found
117      */
118     public synchronized SubscriptionInfo getSubscription(String id) throws ServiceException {
119         SubscriptionInfo p = allSubscriptions.get(id);
120         if (p == null) {
121             throw new ServiceException("Could not find Information subscription: " + id);
122         }
123         return p;
124     }
125
126     /**
127      * Get a subscription or return null if not found. Equivalent to get in all java
128      * collections.
129      * 
130      * @param id the ID of the subscription to get.
131      * @return SubscriptionInfo
132      */
133     public synchronized SubscriptionInfo get(String id) {
134         return allSubscriptions.get(id);
135     }
136
137     public synchronized int size() {
138         return allSubscriptions.size();
139     }
140
141     public synchronized void clear() {
142         allSubscriptions.clear();
143         subscriptionsByOwner.clear();
144         clearDatabase();
145     }
146
147     public void remove(SubscriptionInfo subscription) {
148         allSubscriptions.remove(subscription.getId());
149         subscriptionsByOwner.remove(subscription.owner, subscription.id);
150
151         try {
152             Files.delete(getPath(subscription));
153         } catch (Exception e) {
154             logger.debug("Could not delete subscription from database: {}", e.getMessage());
155         }
156
157         logger.debug("Removed type status subscription {}", subscription.id);
158     }
159
160     /**
161      * returns all subscriptions for an owner. The colllection can contain 0..n
162      * subscriptions.
163      *
164      * @param owner
165      * @return
166      */
167     public synchronized Collection<SubscriptionInfo> getSubscriptionsForOwner(String owner) {
168         return subscriptionsByOwner.get(owner);
169     }
170
171     public synchronized void notifyTypeRegistered(InfoType type) {
172         notifyAllSubscribers(
173             subscription -> getCallbacksHandler(subscription.apiVersion).notifyTypeRegistered(type, subscription));
174     }
175
176     public synchronized void notifyTypeRemoved(InfoType type) {
177         notifyAllSubscribers(
178             subscription -> getCallbacksHandler(subscription.apiVersion).notifyTypeRemoved(type, subscription));
179     }
180
181     private ConsumerCallbackHandler getCallbacksHandler(String apiVersion) {
182         ConsumerCallbackHandler callbackHandler = this.callbackHandlers.get(apiVersion);
183         if (callbackHandler != null) {
184             return callbackHandler;
185         } else {
186             return new ConsumerCallbackHandler() {
187                 @Override
188                 public Mono<String> notifyTypeRegistered(InfoType type, SubscriptionInfo subscriptionInfo) {
189                     return error();
190                 }
191
192                 @Override
193                 public Mono<String> notifyTypeRemoved(InfoType type, SubscriptionInfo subscriptionInfo) {
194                     return error();
195                 }
196
197                 private Mono<String> error() {
198                     return Mono.error(new ServiceException(
199                         "No notifyTypeRegistered handler found for interface version " + apiVersion));
200                 }
201             };
202         }
203     }
204
205     private synchronized void notifyAllSubscribers(Function<? super SubscriptionInfo, Mono<String>> notifyFunc) {
206         final int MAX_CONCURRENCY = 5;
207         Flux.fromIterable(allSubscriptions.values()) //
208             .flatMap(subscription -> notifySubscriber(notifyFunc, subscription), MAX_CONCURRENCY) //
209             .subscribe();
210     }
211
212     /**
213      * Invoking one consumer. If the call fails after retries, the subscription is
214      * removed.
215      * 
216      * @param notifyFunc
217      * @param subscriptionInfo
218      * @return
219      */
220     private Mono<String> notifySubscriber(Function<? super SubscriptionInfo, Mono<String>> notifyFunc,
221         SubscriptionInfo subscriptionInfo) {
222         Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1));
223         return Mono.just(1) //
224             .flatMap(notUsed -> notifyFunc.apply(subscriptionInfo)) //
225             .retryWhen(retrySpec) //
226             .onErrorResume(throwable -> {
227                 logger.warn("Consumer callback failed {}, removing subscription {}", throwable.getMessage(),
228                     subscriptionInfo.id);
229                 this.remove(subscriptionInfo);
230                 return Mono.empty();
231             }); //
232     }
233
234     private void clearDatabase() {
235         try {
236             FileSystemUtils.deleteRecursively(Path.of(getDatabaseDirectory()));
237             Files.createDirectories(Paths.get(getDatabaseDirectory()));
238         } catch (IOException e) {
239             logger.warn("Could not delete database : {}", e.getMessage());
240         }
241     }
242
243     private void storeInFile(SubscriptionInfo subscription) {
244         try {
245             try (PrintStream out = new PrintStream(new FileOutputStream(getFile(subscription)))) {
246                 String json = gson.toJson(subscription);
247                 out.print(json);
248             }
249         } catch (Exception e) {
250             logger.warn("Could not save subscription: {} {}", subscription.getId(), e.getMessage());
251         }
252     }
253
254     public synchronized void restoreFromDatabase() throws IOException {
255         Files.createDirectories(Paths.get(getDatabaseDirectory()));
256         File dbDir = new File(getDatabaseDirectory());
257
258         for (File file : dbDir.listFiles()) {
259             String json = Files.readString(file.toPath());
260             SubscriptionInfo subscription = gson.fromJson(json, SubscriptionInfo.class);
261             doPut(subscription);
262         }
263     }
264
265     private void doPut(SubscriptionInfo subscription) {
266         allSubscriptions.put(subscription.getId(), subscription);
267         subscriptionsByOwner.put(subscription.owner, subscription.id, subscription);
268     }
269
270     private File getFile(SubscriptionInfo subscription) {
271         return getPath(subscription).toFile();
272     }
273
274     private Path getPath(SubscriptionInfo subscription) {
275         return getPath(subscription.getId());
276     }
277
278     private Path getPath(String subscriptionId) {
279         return Path.of(getDatabaseDirectory(), subscriptionId);
280     }
281
282     private String getDatabaseDirectory() {
283         return config.getVardataDirectory() + "/database/infotypesubscriptions";
284     }
285
286 }