X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=enrichment-coordinator-service%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fenrichment%2Frepository%2FInfoTypeSubscriptions.java;h=65978e15647039fec06eadfba68c9d8136f8941c;hb=5138a5a5b01c20dc8cc90eb8aec53cb53ff25292;hp=7f73605d750d5d2eeff2318f82a4e2688c73146f;hpb=90463293a218527b02eb9886664dbb40a669b127;p=nonrtric.git diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypeSubscriptions.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypeSubscriptions.java index 7f73605d..65978e15 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypeSubscriptions.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypeSubscriptions.java @@ -20,7 +20,18 @@ package org.oransc.enrichment.repository; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; import java.lang.invoke.MethodHandles; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -30,25 +41,33 @@ import java.util.function.Function; import lombok.Builder; import lombok.Getter; +import org.oransc.enrichment.configuration.ApplicationConfig; import org.oransc.enrichment.exceptions.ServiceException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; +import org.springframework.http.HttpStatus; +import org.springframework.util.FileSystemUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; /** * Subscriptions of callbacks for type registrations */ @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally -@Component +@Configuration public class InfoTypeSubscriptions { private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final Map allSubscriptions = new HashMap<>(); private final MultiMap subscriptionsByOwner = new MultiMap<>(); + private final Gson gson = new GsonBuilder().create(); + private final ApplicationConfig config; + private final Map callbackHandlers = new HashMap<>(); - public interface Callbacks { + public interface ConsumerCallbackHandler { Mono notifyTypeRegistered(InfoType type, SubscriptionInfo subscriptionInfo); Mono notifyTypeRemoved(InfoType type, SubscriptionInfo subscriptionInfo); @@ -63,12 +82,26 @@ public class InfoTypeSubscriptions { private String owner; - private Callbacks callback; + private String apiVersion; + } + + public InfoTypeSubscriptions(@Autowired ApplicationConfig config) { + this.config = config; + + try { + this.restoreFromDatabase(); + } catch (IOException e) { + logger.error("Could not restore info type subscriptions from database {}", this.getDatabaseDirectory()); + } + } + + public void registerCallbackhandler(ConsumerCallbackHandler handler, String apiVersion) { + callbackHandlers.put(apiVersion, handler); } public synchronized void put(SubscriptionInfo subscription) { - allSubscriptions.put(subscription.getId(), subscription); - subscriptionsByOwner.put(subscription.owner, subscription.id, subscription); + doPut(subscription); + storeInFile(subscription); logger.debug("Added type status subscription {}", subscription.id); } @@ -86,7 +119,7 @@ public class InfoTypeSubscriptions { public synchronized SubscriptionInfo getSubscription(String id) throws ServiceException { SubscriptionInfo p = allSubscriptions.get(id); if (p == null) { - throw new ServiceException("Could not find Information subscription: " + id); + throw new ServiceException("Could not find Information subscription: " + id, HttpStatus.NOT_FOUND); } return p; } @@ -109,11 +142,19 @@ public class InfoTypeSubscriptions { public synchronized void clear() { allSubscriptions.clear(); subscriptionsByOwner.clear(); + clearDatabase(); } public void remove(SubscriptionInfo subscription) { allSubscriptions.remove(subscription.getId()); subscriptionsByOwner.remove(subscription.owner, subscription.id); + + try { + Files.delete(getPath(subscription)); + } catch (Exception e) { + logger.debug("Could not delete subscription from database: {}", e.getMessage()); + } + logger.debug("Removed type status subscription {}", subscription.id); } @@ -129,22 +170,119 @@ public class InfoTypeSubscriptions { } public synchronized void notifyTypeRegistered(InfoType type) { - notifyAllSubscribers(subscription -> subscription.callback.notifyTypeRegistered(type, subscription)); + notifyAllSubscribers( + subscription -> getCallbacksHandler(subscription.apiVersion).notifyTypeRegistered(type, subscription)); } public synchronized void notifyTypeRemoved(InfoType type) { - notifyAllSubscribers(subscription -> subscription.callback.notifyTypeRemoved(type, subscription)); + notifyAllSubscribers( + subscription -> getCallbacksHandler(subscription.apiVersion).notifyTypeRemoved(type, subscription)); + } + + private ConsumerCallbackHandler getCallbacksHandler(String apiVersion) { + ConsumerCallbackHandler callbackHandler = this.callbackHandlers.get(apiVersion); + if (callbackHandler != null) { + return callbackHandler; + } else { + return new ConsumerCallbackHandler() { + @Override + public Mono notifyTypeRegistered(InfoType type, SubscriptionInfo subscriptionInfo) { + return error(); + } + + @Override + public Mono notifyTypeRemoved(InfoType type, SubscriptionInfo subscriptionInfo) { + return error(); + } + + private Mono error() { + return Mono.error(new ServiceException( + "No notifyTypeRegistered handler found for interface version " + apiVersion, + HttpStatus.INTERNAL_SERVER_ERROR)); + } + }; + } } private synchronized void notifyAllSubscribers(Function> notifyFunc) { - final int CONCURRENCY = 5; + final int MAX_CONCURRENCY = 5; Flux.fromIterable(allSubscriptions.values()) // - .flatMap(notifyFunc::apply, CONCURRENCY) // - .onErrorResume(throwable -> { - logger.warn("Post failed for consumer callback {}", throwable.getMessage()); - return Flux.empty(); - }) // + .flatMap(subscription -> notifySubscriber(notifyFunc, subscription), MAX_CONCURRENCY) // .subscribe(); } + /** + * Invoking one consumer. If the call fails after retries, the subscription is + * removed. + * + * @param notifyFunc + * @param subscriptionInfo + * @return + */ + private Mono notifySubscriber(Function> notifyFunc, + SubscriptionInfo subscriptionInfo) { + Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1)); + return Mono.just(1) // + .flatMap(notUsed -> notifyFunc.apply(subscriptionInfo)) // + .retryWhen(retrySpec) // + .onErrorResume(throwable -> { + logger.warn("Consumer callback failed {}, removing subscription {}", throwable.getMessage(), + subscriptionInfo.id); + this.remove(subscriptionInfo); + return Mono.empty(); + }); // + } + + private void clearDatabase() { + try { + FileSystemUtils.deleteRecursively(Path.of(getDatabaseDirectory())); + Files.createDirectories(Paths.get(getDatabaseDirectory())); + } catch (IOException e) { + logger.warn("Could not delete database : {}", e.getMessage()); + } + } + + private void storeInFile(SubscriptionInfo subscription) { + try { + try (PrintStream out = new PrintStream(new FileOutputStream(getFile(subscription)))) { + String json = gson.toJson(subscription); + out.print(json); + } + } catch (Exception e) { + logger.warn("Could not save subscription: {} {}", subscription.getId(), e.getMessage()); + } + } + + public synchronized void restoreFromDatabase() throws IOException { + Files.createDirectories(Paths.get(getDatabaseDirectory())); + File dbDir = new File(getDatabaseDirectory()); + + for (File file : dbDir.listFiles()) { + String json = Files.readString(file.toPath()); + SubscriptionInfo subscription = gson.fromJson(json, SubscriptionInfo.class); + doPut(subscription); + } + } + + private void doPut(SubscriptionInfo subscription) { + allSubscriptions.put(subscription.getId(), subscription); + subscriptionsByOwner.put(subscription.owner, subscription.id, subscription); + } + + private File getFile(SubscriptionInfo subscription) { + return getPath(subscription).toFile(); + } + + private Path getPath(SubscriptionInfo subscription) { + return getPath(subscription.getId()); + } + + private Path getPath(String subscriptionId) { + return Path.of(getDatabaseDirectory(), subscriptionId); + } + + private String getDatabaseDirectory() { + return config.getVardataDirectory() + "/database/infotypesubscriptions"; + } + }