From b0612ab177e14ffa133aae2538aa504d5cc10e99 Mon Sep 17 00:00:00 2001 From: PatrikBuhr Date: Fri, 3 Sep 2021 08:53:10 +0200 Subject: [PATCH] NONRTRIC - Enrichment Coordinator Service, making type availability subscriptions persistent The subscriptions created by the data consumer are stored persistently to survive a component restart. Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-577 Change-Id: Id7be3bc4f0e4264e1302e2f11a4f2bac82e13232 --- .../controllers/r1consumer/ConsumerCallbacks.java | 25 ++-- .../controllers/r1consumer/ConsumerController.java | 63 ++++----- .../repository/InfoTypeSubscriptions.java | 156 +++++++++++++++++++-- .../org/oransc/enrichment/ApplicationTest.java | 36 ++++- 4 files changed, 219 insertions(+), 61 deletions(-) diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/r1consumer/ConsumerCallbacks.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/r1consumer/ConsumerCallbacks.java index 0c44eb65..a57eeeb3 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/r1consumer/ConsumerCallbacks.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/r1consumer/ConsumerCallbacks.java @@ -36,35 +36,38 @@ import reactor.core.publisher.Mono; * Callbacks to the Consumer. Notifies consumer according to the API (which this * class adapts to) */ -@SuppressWarnings("java:S3457") // No need to call "toString()" method as formatting and string .. @Component -public class ConsumerCallbacks implements InfoTypeSubscriptions.Callbacks { +public class ConsumerCallbacks implements InfoTypeSubscriptions.ConsumerCallbackHandler { private static Gson gson = new GsonBuilder().create(); private final AsyncRestClient restClient; - public ConsumerCallbacks(@Autowired ApplicationConfig config) { + public static final String API_VERSION = "version_1"; + + public ConsumerCallbacks(@Autowired ApplicationConfig config, + @Autowired InfoTypeSubscriptions infoTypeSubscriptions) { AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config.getWebClientConfig()); this.restClient = restClientFactory.createRestClientNoHttpProxy(""); + infoTypeSubscriptions.registerCallbackhandler(this, API_VERSION); } @Override public Mono notifyTypeRegistered(InfoType type, InfoTypeSubscriptions.SubscriptionInfo subscriptionInfo) { - ConsumerTypeRegistrationInfo info = new ConsumerTypeRegistrationInfo(type.getJobDataSchema(), - ConsumerTypeRegistrationInfo.ConsumerTypeStatusValues.REGISTERED, type.getId()); - String body = gson.toJson(info); - + String body = body(type, ConsumerTypeRegistrationInfo.ConsumerTypeStatusValues.REGISTERED); return restClient.post(subscriptionInfo.getCallbackUrl(), body); - } @Override public Mono notifyTypeRemoved(InfoType type, InfoTypeSubscriptions.SubscriptionInfo subscriptionInfo) { - ConsumerTypeRegistrationInfo info = new ConsumerTypeRegistrationInfo(type.getJobDataSchema(), - ConsumerTypeRegistrationInfo.ConsumerTypeStatusValues.DEREGISTERED, type.getId()); - String body = gson.toJson(info); + String body = body(type, ConsumerTypeRegistrationInfo.ConsumerTypeStatusValues.DEREGISTERED); return restClient.post(subscriptionInfo.getCallbackUrl(), body); } + private String body(InfoType type, ConsumerTypeRegistrationInfo.ConsumerTypeStatusValues status) { + ConsumerTypeRegistrationInfo info = + new ConsumerTypeRegistrationInfo(type.getJobDataSchema(), status, type.getId()); + return gson.toJson(info); + } + } diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/r1consumer/ConsumerController.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/r1consumer/ConsumerController.java index fd1901e3..9de57d5b 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/r1consumer/ConsumerController.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/r1consumer/ConsumerController.java @@ -41,7 +41,6 @@ import java.util.Collection; import java.util.List; import org.json.JSONObject; -import org.oransc.enrichment.configuration.ApplicationConfig; import org.oransc.enrichment.controllers.ErrorResponse; import org.oransc.enrichment.controllers.VoidResponse; import org.oransc.enrichment.controllers.r1producer.ProducerCallbacks; @@ -70,36 +69,29 @@ import org.springframework.web.bind.annotation.RestController; import reactor.core.publisher.Mono; @SuppressWarnings("java:S3457") // No need to call "toString()" method as formatting and string .. -@RestController("Consumer registry") +@RestController("Consumer API") @Tag(name = ConsumerConsts.CONSUMER_API_NAME) @RequestMapping(path = ConsumerConsts.API_ROOT, produces = MediaType.APPLICATION_JSON_VALUE) public class ConsumerController { private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - - @Autowired - ApplicationConfig applicationConfig; - - @Autowired - private InfoJobs jobs; - - @Autowired - private InfoTypes infoTypes; - - @Autowired - private InfoProducers infoProducers; - - @Autowired - private ConsumerCallbacks consumerCallbacks; - - @Autowired - private ProducerCallbacks producerCallbacks; - - @Autowired - private InfoTypeSubscriptions infoTypeSubscriptions; - + private final InfoJobs infoJobs; + private final InfoTypes infoTypes; + private final InfoProducers infoProducers; + private final ProducerCallbacks producerCallbacks; + private final InfoTypeSubscriptions infoTypeSubscriptions; private static Gson gson = new GsonBuilder().create(); + public ConsumerController(@Autowired InfoJobs jobs, @Autowired InfoTypes infoTypes, + @Autowired InfoProducers infoProducers, @Autowired ProducerCallbacks producerCallbacks, + @Autowired InfoTypeSubscriptions infoTypeSubscriptions) { + this.infoProducers = infoProducers; + this.infoJobs = jobs; + this.infoTypeSubscriptions = infoTypeSubscriptions; + this.infoTypes = infoTypes; + this.producerCallbacks = producerCallbacks; + } + @GetMapping(path = "/info-types", produces = MediaType.APPLICATION_JSON_VALUE) @Operation(summary = "Information type identifiers", description = "") @ApiResponses( @@ -170,15 +162,15 @@ public class ConsumerController { try { List result = new ArrayList<>(); if (owner != null) { - for (InfoJob job : this.jobs.getJobsForOwner(owner)) { + for (InfoJob job : this.infoJobs.getJobsForOwner(owner)) { if (infoTypeId == null || job.getTypeId().equals(infoTypeId)) { result.add(job.getId()); } } } else if (infoTypeId != null) { - this.jobs.getJobsForType(infoTypeId).forEach(job -> result.add(job.getId())); + this.infoJobs.getJobsForType(infoTypeId).forEach(job -> result.add(job.getId())); } else { - this.jobs.getJobs().forEach(job -> result.add(job.getId())); + this.infoJobs.getJobs().forEach(job -> result.add(job.getId())); } return new ResponseEntity<>(gson.toJson(result), HttpStatus.OK); } catch ( @@ -204,7 +196,7 @@ public class ConsumerController { public ResponseEntity getIndividualEiJob( // @PathVariable("infoJobId") String infoJobId) { try { - InfoJob job = this.jobs.getJob(infoJobId); + InfoJob job = this.infoJobs.getJob(infoJobId); return new ResponseEntity<>(gson.toJson(toInfoJobInfo(job)), HttpStatus.OK); } catch (Exception e) { return ErrorResponse.create(e, HttpStatus.NOT_FOUND); @@ -227,7 +219,7 @@ public class ConsumerController { public ResponseEntity getEiJobStatus( // @PathVariable("infoJobId") String jobId) { try { - InfoJob job = this.jobs.getJob(jobId); + InfoJob job = this.infoJobs.getJob(jobId); return new ResponseEntity<>(gson.toJson(toInfoJobStatus(job)), HttpStatus.OK); } catch (Exception e) { return ErrorResponse.create(e, HttpStatus.NOT_FOUND); @@ -264,8 +256,8 @@ public class ConsumerController { public ResponseEntity deleteIndividualEiJob( // @PathVariable("infoJobId") String jobId) { try { - InfoJob job = this.jobs.getJob(jobId); - this.jobs.remove(job, this.infoProducers); + InfoJob job = this.infoJobs.getJob(jobId); + this.infoJobs.remove(job, this.infoProducers); return new ResponseEntity<>(HttpStatus.NO_CONTENT); } catch (Exception e) { return ErrorResponse.create(e, HttpStatus.NOT_FOUND); @@ -304,11 +296,11 @@ public class ConsumerController { defaultValue = "false") boolean performTypeCheck, @RequestBody ConsumerJobInfo informationJobObject) { - final boolean isNewJob = this.jobs.get(jobId) == null; + final boolean isNewJob = this.infoJobs.get(jobId) == null; return validatePutInfoJob(jobId, informationJobObject, performTypeCheck) // .flatMap(this::startInfoSubscriptionJob) // - .doOnNext(newEiJob -> this.jobs.put(newEiJob)) // + .doOnNext(this.infoJobs::put) // .flatMap(newEiJob -> Mono.just(new ResponseEntity<>(isNewJob ? HttpStatus.CREATED : HttpStatus.OK))) .onErrorResume(throwable -> Mono.just(ErrorResponse.create(throwable, HttpStatus.NOT_FOUND))); } @@ -432,11 +424,10 @@ public class ConsumerController { private InfoTypeSubscriptions.SubscriptionInfo toTypeSuscriptionInfo(ConsumerTypeSubscriptionInfo s, String subscriptionId) { return InfoTypeSubscriptions.SubscriptionInfo.builder() // - .callback(this.consumerCallbacks) // + .apiVersion(ConsumerCallbacks.API_VERSION) // .owner(s.owner) // .id(subscriptionId) // .callbackUrl(s.statusResultUri).build(); - } private Mono startInfoSubscriptionJob(InfoJob newInfoJob) { @@ -452,7 +443,7 @@ public class ConsumerController { InfoType infoType = this.infoTypes.getType(jobInfo.infoTypeId); validateJsonObjectAgainstSchema(infoType.getJobDataSchema(), jobInfo.jobDefinition); } - InfoJob existingEiJob = this.jobs.get(jobId); + InfoJob existingEiJob = this.infoJobs.get(jobId); validateUri(jobInfo.statusNotificationUri); validateUri(jobInfo.jobResultUri); diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypeSubscriptions.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypeSubscriptions.java index 7f73605d..60fe35f3 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypeSubscriptions.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypeSubscriptions.java @@ -20,7 +20,18 @@ package org.oransc.enrichment.repository; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; import java.lang.invoke.MethodHandles; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.time.Duration; import java.util.Collection; import java.util.HashMap; import java.util.Map; @@ -30,25 +41,32 @@ import java.util.function.Function; import lombok.Builder; import lombok.Getter; +import org.oransc.enrichment.configuration.ApplicationConfig; import org.oransc.enrichment.exceptions.ServiceException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.stereotype.Component; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Configuration; +import org.springframework.util.FileSystemUtils; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; +import reactor.util.retry.Retry; /** * Subscriptions of callbacks for type registrations */ @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally -@Component +@Configuration public class InfoTypeSubscriptions { private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final Map allSubscriptions = new HashMap<>(); private final MultiMap subscriptionsByOwner = new MultiMap<>(); + private final Gson gson = new GsonBuilder().create(); + private final ApplicationConfig config; + private final Map callbackHandlers = new HashMap<>(); - public interface Callbacks { + public interface ConsumerCallbackHandler { Mono notifyTypeRegistered(InfoType type, SubscriptionInfo subscriptionInfo); Mono notifyTypeRemoved(InfoType type, SubscriptionInfo subscriptionInfo); @@ -63,12 +81,27 @@ public class InfoTypeSubscriptions { private String owner; - private Callbacks callback; + private String apiVersion; + } + + public InfoTypeSubscriptions(@Autowired ApplicationConfig config) { + this.config = config; + + try { + this.restoreFromDatabase(); + } catch (IOException e) { + logger.error("Could not restore info type subscriptions from database {}", this.getDatabaseDirectory()); + } + } + + public void registerCallbackhandler(ConsumerCallbackHandler handler, String apiVersion) { + callbackHandlers.put(apiVersion, handler); } public synchronized void put(SubscriptionInfo subscription) { allSubscriptions.put(subscription.getId(), subscription); subscriptionsByOwner.put(subscription.owner, subscription.id, subscription); + storeInFile(subscription); logger.debug("Added type status subscription {}", subscription.id); } @@ -109,11 +142,19 @@ public class InfoTypeSubscriptions { public synchronized void clear() { allSubscriptions.clear(); subscriptionsByOwner.clear(); + clearDatabase(); } public void remove(SubscriptionInfo subscription) { allSubscriptions.remove(subscription.getId()); subscriptionsByOwner.remove(subscription.owner, subscription.id); + + try { + Files.delete(getPath(subscription)); + } catch (Exception e) { + logger.debug("Could not delete subscription from database: {}", e.getMessage()); + } + logger.debug("Removed type status subscription {}", subscription.id); } @@ -129,22 +170,113 @@ public class InfoTypeSubscriptions { } public synchronized void notifyTypeRegistered(InfoType type) { - notifyAllSubscribers(subscription -> subscription.callback.notifyTypeRegistered(type, subscription)); + notifyAllSubscribers( + subscription -> getCallbacksHandler(subscription.apiVersion).notifyTypeRegistered(type, subscription)); } public synchronized void notifyTypeRemoved(InfoType type) { - notifyAllSubscribers(subscription -> subscription.callback.notifyTypeRemoved(type, subscription)); + notifyAllSubscribers( + subscription -> getCallbacksHandler(subscription.apiVersion).notifyTypeRemoved(type, subscription)); + } + + private ConsumerCallbackHandler getCallbacksHandler(String apiVersion) { + ConsumerCallbackHandler callbackHandler = this.callbackHandlers.get(apiVersion); + if (callbackHandler != null) { + return callbackHandler; + } else { + return new ConsumerCallbackHandler() { + @Override + public Mono notifyTypeRegistered(InfoType type, SubscriptionInfo subscriptionInfo) { + return error(); + } + + @Override + public Mono notifyTypeRemoved(InfoType type, SubscriptionInfo subscriptionInfo) { + return error(); + } + + private Mono error() { + return Mono.error(new ServiceException( + "No notifyTypeRegistered handler found for interface version " + apiVersion)); + } + }; + } } private synchronized void notifyAllSubscribers(Function> notifyFunc) { - final int CONCURRENCY = 5; + final int MAX_CONCURRENCY = 5; Flux.fromIterable(allSubscriptions.values()) // - .flatMap(notifyFunc::apply, CONCURRENCY) // - .onErrorResume(throwable -> { - logger.warn("Post failed for consumer callback {}", throwable.getMessage()); - return Flux.empty(); - }) // + .flatMap(subscription -> notifySubscriber(notifyFunc, subscription), MAX_CONCURRENCY) // .subscribe(); } + /** + * Invoking one consumer. If the call fails after retries, the subscription is + * removed. + * + * @param notifyFunc + * @param subscriptionInfo + * @return + */ + private Mono notifySubscriber(Function> notifyFunc, + SubscriptionInfo subscriptionInfo) { + Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1)); + return Mono.just(1) // + .flatMap(notUsed -> notifyFunc.apply(subscriptionInfo)) // + .retryWhen(retrySpec) // + .onErrorResume(throwable -> { + logger.warn("Consumer callback failed {}, removing subscription {}", throwable.getMessage(), + subscriptionInfo.id); + this.remove(subscriptionInfo); + return Mono.empty(); + }); // + } + + private void clearDatabase() { + try { + FileSystemUtils.deleteRecursively(Path.of(getDatabaseDirectory())); + Files.createDirectories(Paths.get(getDatabaseDirectory())); + } catch (IOException e) { + logger.warn("Could not delete database : {}", e.getMessage()); + } + } + + private void storeInFile(SubscriptionInfo subscription) { + try { + try (PrintStream out = new PrintStream(new FileOutputStream(getFile(subscription)))) { + String json = gson.toJson(subscription); + out.print(json); + } + } catch (Exception e) { + logger.warn("Could not save subscription: {} {}", subscription.getId(), e.getMessage()); + } + } + + public synchronized void restoreFromDatabase() throws IOException { + Files.createDirectories(Paths.get(getDatabaseDirectory())); + File dbDir = new File(getDatabaseDirectory()); + + for (File file : dbDir.listFiles()) { + String json = Files.readString(file.toPath()); + SubscriptionInfo subscription = gson.fromJson(json, SubscriptionInfo.class); + this.allSubscriptions.put(subscription.getId(), subscription); + } + } + + private File getFile(SubscriptionInfo subscription) { + return getPath(subscription).toFile(); + } + + private Path getPath(SubscriptionInfo subscription) { + return getPath(subscription.getId()); + } + + private Path getPath(String subscriptionId) { + return Path.of(getDatabaseDirectory(), subscriptionId); + } + + private String getDatabaseDirectory() { + return config.getVardataDirectory() + "/database/infotypesubscriptions"; + } + } diff --git a/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/ApplicationTest.java b/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/ApplicationTest.java index 12d4e472..8a2b5280 100644 --- a/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/ApplicationTest.java +++ b/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/ApplicationTest.java @@ -162,6 +162,7 @@ class ApplicationTest { this.infoJobs.clear(); this.infoTypes.clear(); this.infoProducers.clear(); + this.infoTypeSubscriptions.clear(); this.producerSimulator.getTestResults().reset(); this.consumerSimulator.getTestResults().reset(); } @@ -475,7 +476,6 @@ class ApplicationTest { putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID); verifyJobStatus(EI_JOB_ID, "ENABLED"); - } @Test @@ -908,7 +908,6 @@ class ApplicationTest { InfoTypes types = new InfoTypes(this.applicationConfig); types.restoreTypesFromDatabase(); assertThat(types.size()).isEqualTo(1); - } { // Restore the jobs, no jobs in database @@ -922,6 +921,24 @@ class ApplicationTest { assertThat(this.infoJobs.size()).isZero(); } + @Test + void testConsumerTypeSubscriptionDatabase() { + final String callbackUrl = baseUrl() + ConsumerSimulatorController.getTypeStatusCallbackUrl(); + final ConsumerTypeSubscriptionInfo info = new ConsumerTypeSubscriptionInfo(callbackUrl, "owner"); + // PUT a subscription + String body = gson.toJson(info); + restClient().putForEntity(typeSubscriptionUrl() + "/subscriptionId", body).block(); + assertThat(this.infoTypeSubscriptions.size()).isEqualTo(1); + + InfoTypeSubscriptions restoredSubscriptions = new InfoTypeSubscriptions(this.applicationConfig); + assertThat(restoredSubscriptions.size()).isEqualTo(1); + + // Delete the subscription + restClient().deleteForEntity(typeSubscriptionUrl() + "/subscriptionId").block(); + restoredSubscriptions = new InfoTypeSubscriptions(this.applicationConfig); + assertThat(restoredSubscriptions.size()).isZero(); + } + @Test void testConsumerTypeSubscription() throws Exception { @@ -983,6 +1000,21 @@ class ApplicationTest { } } + @Test + void testRemovingNonWorkingSubscription() throws Exception { + // Test that subscriptions are removed for a unresponsive consumer + + // PUT a subscription with a junk callback + final ConsumerTypeSubscriptionInfo info = new ConsumerTypeSubscriptionInfo(baseUrl() + "JUNK", "owner"); + String body = gson.toJson(info); + restClient().putForEntity(typeSubscriptionUrl() + "/subscriptionId", body).block(); + assertThat(this.infoTypeSubscriptions.size()).isEqualTo(1); + + this.putInfoType(TYPE_ID); + // The callback will fail and the subscription will be removed + await().untilAsserted(() -> assertThat(this.infoTypeSubscriptions.size()).isZero()); + } + @Test void testTypeSubscriptionErrorCodes() throws Exception { -- 2.16.6