From: Henrik Andersson Date: Mon, 30 Aug 2021 11:06:10 +0000 (+0000) Subject: Merge "Added function test of ecs type subscriptions" X-Git-Tag: 1.2.0~109 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=c9b469c6f038725d47b9622c5d11495c2194f581;hp=3cc0b58e9a786f619af2af318ef2028179ed2dab;p=nonrtric.git Merge "Added function test of ecs type subscriptions" --- diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/r1consumer/ConsumerCallbacks.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/r1consumer/ConsumerCallbacks.java index 97a829c4..0c44eb65 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/r1consumer/ConsumerCallbacks.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/r1consumer/ConsumerCallbacks.java @@ -23,17 +23,14 @@ package org.oransc.enrichment.controllers.r1consumer; import com.google.gson.Gson; import com.google.gson.GsonBuilder; -import java.lang.invoke.MethodHandles; - import org.oransc.enrichment.clients.AsyncRestClient; import org.oransc.enrichment.clients.AsyncRestClientFactory; import org.oransc.enrichment.configuration.ApplicationConfig; import org.oransc.enrichment.repository.InfoType; import org.oransc.enrichment.repository.InfoTypeSubscriptions; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; +import reactor.core.publisher.Mono; /** * Callbacks to the Consumer. Notifies consumer according to the API (which this @@ -43,7 +40,6 @@ import org.springframework.stereotype.Component; @Component public class ConsumerCallbacks implements InfoTypeSubscriptions.Callbacks { - private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private static Gson gson = new GsonBuilder().create(); private final AsyncRestClient restClient; @@ -54,28 +50,21 @@ public class ConsumerCallbacks implements InfoTypeSubscriptions.Callbacks { } @Override - public void notifyTypeRegistered(InfoType type, InfoTypeSubscriptions.SubscriptionInfo subscriptionInfo) { + public Mono notifyTypeRegistered(InfoType type, InfoTypeSubscriptions.SubscriptionInfo subscriptionInfo) { ConsumerTypeRegistrationInfo info = new ConsumerTypeRegistrationInfo(type.getJobDataSchema(), ConsumerTypeRegistrationInfo.ConsumerTypeStatusValues.REGISTERED, type.getId()); String body = gson.toJson(info); - post(subscriptionInfo.getCallbackUrl(), body); + return restClient.post(subscriptionInfo.getCallbackUrl(), body); } @Override - public void notifyTypeRemoved(InfoType type, InfoTypeSubscriptions.SubscriptionInfo subscriptionInfo) { + public Mono notifyTypeRemoved(InfoType type, InfoTypeSubscriptions.SubscriptionInfo subscriptionInfo) { ConsumerTypeRegistrationInfo info = new ConsumerTypeRegistrationInfo(type.getJobDataSchema(), ConsumerTypeRegistrationInfo.ConsumerTypeStatusValues.DEREGISTERED, type.getId()); String body = gson.toJson(info); - post(subscriptionInfo.getCallbackUrl(), body); - - } - - private void post(String url, String body) { - restClient.post(url, body) // - .subscribe(response -> logger.debug("Post OK {}", url), // - throwable -> logger.warn("Post failed for consumer callback {} {}", url, body), null); + return restClient.post(subscriptionInfo.getCallbackUrl(), body); } } diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypeSubscriptions.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypeSubscriptions.java index f54a8498..7f73605d 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypeSubscriptions.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypeSubscriptions.java @@ -25,6 +25,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Vector; +import java.util.function.Function; import lombok.Builder; import lombok.Getter; @@ -34,6 +35,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + /** * Subscriptions of callbacks for type registrations */ @@ -45,9 +49,9 @@ public class InfoTypeSubscriptions { private final MultiMap subscriptionsByOwner = new MultiMap<>(); public interface Callbacks { - void notifyTypeRegistered(InfoType type, SubscriptionInfo subscriptionInfo); + Mono notifyTypeRegistered(InfoType type, SubscriptionInfo subscriptionInfo); - void notifyTypeRemoved(InfoType type, SubscriptionInfo subscriptionInfo); + Mono notifyTypeRemoved(InfoType type, SubscriptionInfo subscriptionInfo); } @Builder @@ -103,35 +107,44 @@ public class InfoTypeSubscriptions { } public synchronized void clear() { - this.allSubscriptions.clear(); - this.subscriptionsByOwner.clear(); + allSubscriptions.clear(); + subscriptionsByOwner.clear(); } public void remove(SubscriptionInfo subscription) { allSubscriptions.remove(subscription.getId()); - this.subscriptionsByOwner.remove(subscription.owner, subscription.id); + subscriptionsByOwner.remove(subscription.owner, subscription.id); logger.debug("Removed type status subscription {}", subscription.id); } /** * returns all subscriptions for an owner. The colllection can contain 0..n * subscriptions. - * + * * @param owner * @return */ public synchronized Collection getSubscriptionsForOwner(String owner) { - return this.subscriptionsByOwner.get(owner); + return subscriptionsByOwner.get(owner); } public synchronized void notifyTypeRegistered(InfoType type) { - this.allSubscriptions - .forEach((id, subscription) -> subscription.callback.notifyTypeRegistered(type, subscription)); + notifyAllSubscribers(subscription -> subscription.callback.notifyTypeRegistered(type, subscription)); } public synchronized void notifyTypeRemoved(InfoType type) { - this.allSubscriptions - .forEach((id, subscription) -> subscription.callback.notifyTypeRemoved(type, subscription)); + notifyAllSubscribers(subscription -> subscription.callback.notifyTypeRemoved(type, subscription)); + } + + private synchronized void notifyAllSubscribers(Function> notifyFunc) { + final int CONCURRENCY = 5; + Flux.fromIterable(allSubscriptions.values()) // + .flatMap(notifyFunc::apply, CONCURRENCY) // + .onErrorResume(throwable -> { + logger.warn("Post failed for consumer callback {}", throwable.getMessage()); + return Flux.empty(); + }) // + .subscribe(); } } diff --git a/onap/oran b/onap/oran index b9a70169..633c8e32 160000 --- a/onap/oran +++ b/onap/oran @@ -1 +1 @@ -Subproject commit b9a7016920746a6537bf4c0165595e758716ea03 +Subproject commit 633c8e32702db13c28349152c76efea8e97fb252