From 90463293a218527b02eb9886664dbb40a669b127 Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Fri, 27 Aug 2021 12:54:11 +0200 Subject: [PATCH] ECS, support for notification of available information types Improvement. Made the callback reactive and limited the paralellity to 5, which makes it scale to a huge number of subscribers. Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-570 Change-Id: I810b5c7514a1899fde42a02d2965b09903032b49 --- .../controllers/r1consumer/ConsumerCallbacks.java | 21 +++++------------- .../repository/InfoTypeSubscriptions.java | 25 +++++++++++++++++----- 2 files changed, 25 insertions(+), 21 deletions(-) diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/r1consumer/ConsumerCallbacks.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/r1consumer/ConsumerCallbacks.java index 97a829c4..0c44eb65 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/r1consumer/ConsumerCallbacks.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/r1consumer/ConsumerCallbacks.java @@ -23,17 +23,14 @@ package org.oransc.enrichment.controllers.r1consumer; import com.google.gson.Gson; import com.google.gson.GsonBuilder; -import java.lang.invoke.MethodHandles; - import org.oransc.enrichment.clients.AsyncRestClient; import org.oransc.enrichment.clients.AsyncRestClientFactory; import org.oransc.enrichment.configuration.ApplicationConfig; import org.oransc.enrichment.repository.InfoType; import org.oransc.enrichment.repository.InfoTypeSubscriptions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import reactor.core.publisher.Mono; /** * Callbacks to the Consumer. Notifies consumer according to the API (which this @@ -43,7 +40,6 @@ import org.springframework.stereotype.Component; @Component public class ConsumerCallbacks implements InfoTypeSubscriptions.Callbacks { - private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static Gson gson = new GsonBuilder().create(); private final AsyncRestClient restClient; @@ -54,28 +50,21 @@ public class ConsumerCallbacks implements InfoTypeSubscriptions.Callbacks { } @Override - public void notifyTypeRegistered(InfoType type, InfoTypeSubscriptions.SubscriptionInfo subscriptionInfo) { + public Mono notifyTypeRegistered(InfoType type, InfoTypeSubscriptions.SubscriptionInfo subscriptionInfo) { ConsumerTypeRegistrationInfo info = new ConsumerTypeRegistrationInfo(type.getJobDataSchema(), ConsumerTypeRegistrationInfo.ConsumerTypeStatusValues.REGISTERED, type.getId()); String body = gson.toJson(info); - post(subscriptionInfo.getCallbackUrl(), body); + return restClient.post(subscriptionInfo.getCallbackUrl(), body); } @Override - public void notifyTypeRemoved(InfoType type, InfoTypeSubscriptions.SubscriptionInfo subscriptionInfo) { + public Mono notifyTypeRemoved(InfoType type, InfoTypeSubscriptions.SubscriptionInfo subscriptionInfo) { ConsumerTypeRegistrationInfo info = new ConsumerTypeRegistrationInfo(type.getJobDataSchema(), ConsumerTypeRegistrationInfo.ConsumerTypeStatusValues.DEREGISTERED, type.getId()); String body = gson.toJson(info); - post(subscriptionInfo.getCallbackUrl(), body); - - } - - private void post(String url, String body) { - restClient.post(url, body) // - .subscribe(response -> logger.debug("Post OK {}", url), // - throwable -> logger.warn("Post failed for consumer callback {} {}", url, body), null); + return restClient.post(subscriptionInfo.getCallbackUrl(), body); } } diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypeSubscriptions.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypeSubscriptions.java index bdf0e8e4..7f73605d 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypeSubscriptions.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypeSubscriptions.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Vector; +import java.util.function.Function; import lombok.Builder; import lombok.Getter; @@ -34,6 +35,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + /** * Subscriptions of callbacks for type registrations */ @@ -45,9 +49,9 @@ public class InfoTypeSubscriptions { private final MultiMap subscriptionsByOwner = new MultiMap<>(); public interface Callbacks { - void notifyTypeRegistered(InfoType type, SubscriptionInfo subscriptionInfo); + Mono notifyTypeRegistered(InfoType type, SubscriptionInfo subscriptionInfo); - void notifyTypeRemoved(InfoType type, SubscriptionInfo subscriptionInfo); + Mono notifyTypeRemoved(InfoType type, SubscriptionInfo subscriptionInfo); } @Builder @@ -116,7 +120,7 @@ public class InfoTypeSubscriptions { /** * returns all subscriptions for an owner. The colllection can contain 0..n * subscriptions. - * + * * @param owner * @return */ @@ -125,11 +129,22 @@ public class InfoTypeSubscriptions { } public synchronized void notifyTypeRegistered(InfoType type) { - allSubscriptions.forEach((id, subscription) -> subscription.callback.notifyTypeRegistered(type, subscription)); + notifyAllSubscribers(subscription -> subscription.callback.notifyTypeRegistered(type, subscription)); } public synchronized void notifyTypeRemoved(InfoType type) { - allSubscriptions.forEach((id, subscription) -> subscription.callback.notifyTypeRemoved(type, subscription)); + notifyAllSubscribers(subscription -> subscription.callback.notifyTypeRemoved(type, subscription)); + } + + private synchronized void notifyAllSubscribers(Function> notifyFunc) { + final int CONCURRENCY = 5; + Flux.fromIterable(allSubscriptions.values()) // + .flatMap(notifyFunc::apply, CONCURRENCY) // + .onErrorResume(throwable -> { + logger.warn("Post failed for consumer callback {}", throwable.getMessage()); + return Flux.empty(); + }) // + .subscribe(); } } -- 2.16.6