Merge "Kafka now works in kube for calls outside its namespace"
[nonrtric.git] / information-coordinator-service / src / main / java / org / oransc / ics / repository / InfoTypeSubscriptions.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2019 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oransc.ics.repository;
22
23 import com.google.gson.Gson;
24 import com.google.gson.GsonBuilder;
25
26 import java.io.File;
27 import java.io.FileOutputStream;
28 import java.io.IOException;
29 import java.io.PrintStream;
30 import java.lang.invoke.MethodHandles;
31 import java.nio.file.Files;
32 import java.nio.file.Path;
33 import java.nio.file.Paths;
34 import java.time.Duration;
35 import java.util.Collection;
36 import java.util.HashMap;
37 import java.util.Map;
38 import java.util.Vector;
39 import java.util.function.Function;
40
41 import lombok.Builder;
42 import lombok.Getter;
43
44 import org.oransc.ics.configuration.ApplicationConfig;
45 import org.oransc.ics.exceptions.ServiceException;
46 import org.slf4j.Logger;
47 import org.slf4j.LoggerFactory;
48 import org.springframework.beans.factory.annotation.Autowired;
49 import org.springframework.context.annotation.Configuration;
50 import org.springframework.http.HttpStatus;
51 import org.springframework.util.FileSystemUtils;
52
53 import reactor.core.publisher.Flux;
54 import reactor.core.publisher.Mono;
55 import reactor.util.retry.Retry;
56
57 /**
58  * Subscriptions of callbacks for type registrations
59  */
60 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
61 @Configuration
62 public class InfoTypeSubscriptions {
63     private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
64     private final Map<String, SubscriptionInfo> allSubscriptions = new HashMap<>();
65     private final MultiMap<SubscriptionInfo> subscriptionsByOwner = new MultiMap<>();
66     private final Gson gson = new GsonBuilder().create();
67     private final ApplicationConfig config;
68     private final Map<String, ConsumerCallbackHandler> callbackHandlers = new HashMap<>();
69
70     public interface ConsumerCallbackHandler {
71         Mono<String> notifyTypeRegistered(InfoType type, SubscriptionInfo subscriptionInfo);
72
73         Mono<String> notifyTypeRemoved(InfoType type, SubscriptionInfo subscriptionInfo);
74     }
75
76     @Builder
77     @Getter
78     public static class SubscriptionInfo {
79         private String id;
80
81         private String callbackUrl;
82
83         private String owner;
84
85         private String apiVersion;
86     }
87
88     public InfoTypeSubscriptions(@Autowired ApplicationConfig config) {
89         this.config = config;
90
91         try {
92             this.restoreFromDatabase();
93         } catch (IOException e) {
94             logger.error("Could not restore info type subscriptions from database {}", this.getDatabaseDirectory());
95         }
96     }
97
98     public void registerCallbackhandler(ConsumerCallbackHandler handler, String apiVersion) {
99         callbackHandlers.put(apiVersion, handler);
100     }
101
102     public synchronized void put(SubscriptionInfo subscription) {
103         doPut(subscription);
104         storeInFile(subscription);
105         logger.debug("Added type status subscription {}", subscription.id);
106     }
107
108     public synchronized Collection<SubscriptionInfo> getAllSubscriptions() {
109         return new Vector<>(allSubscriptions.values());
110     }
111
112     /**
113      * Get a subscription and throw if not fond.
114      * 
115      * @param id the ID of the subscription to get.
116      * @return SubscriptionInfo
117      * @throws ServiceException if not found
118      */
119     public synchronized SubscriptionInfo getSubscription(String id) throws ServiceException {
120         SubscriptionInfo p = allSubscriptions.get(id);
121         if (p == null) {
122             throw new ServiceException("Could not find Information subscription: " + id, HttpStatus.NOT_FOUND);
123         }
124         return p;
125     }
126
127     /**
128      * Get a subscription or return null if not found. Equivalent to get in all java
129      * collections.
130      * 
131      * @param id the ID of the subscription to get.
132      * @return SubscriptionInfo
133      */
134     public synchronized SubscriptionInfo get(String id) {
135         return allSubscriptions.get(id);
136     }
137
138     public synchronized int size() {
139         return allSubscriptions.size();
140     }
141
142     public synchronized void clear() {
143         allSubscriptions.clear();
144         subscriptionsByOwner.clear();
145         clearDatabase();
146     }
147
148     public void remove(SubscriptionInfo subscription) {
149         allSubscriptions.remove(subscription.getId());
150         subscriptionsByOwner.remove(subscription.owner, subscription.id);
151
152         try {
153             Files.delete(getPath(subscription));
154         } catch (Exception e) {
155             logger.debug("Could not delete subscription from database: {}", e.getMessage());
156         }
157
158         logger.debug("Removed type status subscription {}", subscription.id);
159     }
160
161     /**
162      * returns all subscriptions for an owner. The colllection can contain 0..n
163      * subscriptions.
164      *
165      * @param owner
166      * @return
167      */
168     public synchronized Collection<SubscriptionInfo> getSubscriptionsForOwner(String owner) {
169         return subscriptionsByOwner.get(owner);
170     }
171
172     public synchronized void notifyTypeRegistered(InfoType type) {
173         notifyAllSubscribers(
174             subscription -> getCallbacksHandler(subscription.apiVersion).notifyTypeRegistered(type, subscription));
175     }
176
177     public synchronized void notifyTypeRemoved(InfoType type) {
178         notifyAllSubscribers(
179             subscription -> getCallbacksHandler(subscription.apiVersion).notifyTypeRemoved(type, subscription));
180     }
181
182     private ConsumerCallbackHandler getCallbacksHandler(String apiVersion) {
183         ConsumerCallbackHandler callbackHandler = this.callbackHandlers.get(apiVersion);
184         if (callbackHandler != null) {
185             return callbackHandler;
186         } else {
187             return new ConsumerCallbackHandler() {
188                 @Override
189                 public Mono<String> notifyTypeRegistered(InfoType type, SubscriptionInfo subscriptionInfo) {
190                     return error();
191                 }
192
193                 @Override
194                 public Mono<String> notifyTypeRemoved(InfoType type, SubscriptionInfo subscriptionInfo) {
195                     return error();
196                 }
197
198                 private Mono<String> error() {
199                     return Mono.error(new ServiceException(
200                         "No notifyTypeRegistered handler found for interface version " + apiVersion,
201                         HttpStatus.INTERNAL_SERVER_ERROR));
202                 }
203             };
204         }
205     }
206
207     private synchronized void notifyAllSubscribers(Function<? super SubscriptionInfo, Mono<String>> notifyFunc) {
208         final int MAX_CONCURRENCY = 5;
209         Flux.fromIterable(allSubscriptions.values()) //
210             .flatMap(subscription -> notifySubscriber(notifyFunc, subscription), MAX_CONCURRENCY) //
211             .subscribe();
212     }
213
214     /**
215      * Invoking one consumer. If the call fails after retries, the subscription is
216      * removed.
217      * 
218      * @param notifyFunc
219      * @param subscriptionInfo
220      * @return
221      */
222     private Mono<String> notifySubscriber(Function<? super SubscriptionInfo, Mono<String>> notifyFunc,
223         SubscriptionInfo subscriptionInfo) {
224         Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1));
225         return notifyFunc.apply(subscriptionInfo) //
226             .retryWhen(retrySpec) //
227             .onErrorResume(throwable -> {
228                 logger.warn("Consumer callback failed {}, removing subscription {}", throwable.getMessage(),
229                     subscriptionInfo.id);
230                 this.remove(subscriptionInfo);
231                 return Mono.empty();
232             }); //
233     }
234
235     private void clearDatabase() {
236         try {
237             FileSystemUtils.deleteRecursively(Path.of(getDatabaseDirectory()));
238             Files.createDirectories(Paths.get(getDatabaseDirectory()));
239         } catch (IOException e) {
240             logger.warn("Could not delete database : {}", e.getMessage());
241         }
242     }
243
244     private void storeInFile(SubscriptionInfo subscription) {
245         try {
246             try (PrintStream out = new PrintStream(new FileOutputStream(getFile(subscription)))) {
247                 String json = gson.toJson(subscription);
248                 out.print(json);
249             }
250         } catch (Exception e) {
251             logger.warn("Could not save subscription: {} {}", subscription.getId(), e.getMessage());
252         }
253     }
254
255     public synchronized void restoreFromDatabase() throws IOException {
256         Files.createDirectories(Paths.get(getDatabaseDirectory()));
257         File dbDir = new File(getDatabaseDirectory());
258
259         for (File file : dbDir.listFiles()) {
260             String json = Files.readString(file.toPath());
261             SubscriptionInfo subscription = gson.fromJson(json, SubscriptionInfo.class);
262             doPut(subscription);
263         }
264     }
265
266     private void doPut(SubscriptionInfo subscription) {
267         allSubscriptions.put(subscription.getId(), subscription);
268         subscriptionsByOwner.put(subscription.owner, subscription.id, subscription);
269     }
270
271     private File getFile(SubscriptionInfo subscription) {
272         return getPath(subscription).toFile();
273     }
274
275     private Path getPath(SubscriptionInfo subscription) {
276         return getPath(subscription.getId());
277     }
278
279     private Path getPath(String subscriptionId) {
280         return Path.of(getDatabaseDirectory(), subscriptionId);
281     }
282
283     private String getDatabaseDirectory() {
284         return config.getVardataDirectory() + "/database/infotypesubscriptions";
285     }
286
287 }