NONRTRIC - Enrichment Coordinator Service, making type availability subscriptions... 50/6650/1
authorPatrikBuhr <patrik.buhr@est.tech>
Fri, 3 Sep 2021 06:53:10 +0000 (08:53 +0200)
committerPatrikBuhr <patrik.buhr@est.tech>
Fri, 3 Sep 2021 13:15:23 +0000 (15:15 +0200)
The subscriptions created by the data consumer are stored persistently to survive a component restart.

Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-577
Change-Id: Id7be3bc4f0e4264e1302e2f11a4f2bac82e13232

enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/r1consumer/ConsumerCallbacks.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/r1consumer/ConsumerController.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/InfoTypeSubscriptions.java
enrichment-coordinator-service/src/test/java/org/oransc/enrichment/ApplicationTest.java

index 0c44eb6..a57eeeb 100644 (file)
@@ -36,35 +36,38 @@ import reactor.core.publisher.Mono;
  * Callbacks to the Consumer. Notifies consumer according to the API (which this
  * class adapts to)
  */
-@SuppressWarnings("java:S3457") // No need to call "toString()" method as formatting and string ..
 @Component
-public class ConsumerCallbacks implements InfoTypeSubscriptions.Callbacks {
+public class ConsumerCallbacks implements InfoTypeSubscriptions.ConsumerCallbackHandler {
 
     private static Gson gson = new GsonBuilder().create();
 
     private final AsyncRestClient restClient;
 
-    public ConsumerCallbacks(@Autowired ApplicationConfig config) {
+    public static final String API_VERSION = "version_1";
+
+    public ConsumerCallbacks(@Autowired ApplicationConfig config,
+        @Autowired InfoTypeSubscriptions infoTypeSubscriptions) {
         AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config.getWebClientConfig());
         this.restClient = restClientFactory.createRestClientNoHttpProxy("");
+        infoTypeSubscriptions.registerCallbackhandler(this, API_VERSION);
     }
 
     @Override
     public Mono<String> notifyTypeRegistered(InfoType type, InfoTypeSubscriptions.SubscriptionInfo subscriptionInfo) {
-        ConsumerTypeRegistrationInfo info = new ConsumerTypeRegistrationInfo(type.getJobDataSchema(),
-            ConsumerTypeRegistrationInfo.ConsumerTypeStatusValues.REGISTERED, type.getId());
-        String body = gson.toJson(info);
-
+        String body = body(type, ConsumerTypeRegistrationInfo.ConsumerTypeStatusValues.REGISTERED);
         return restClient.post(subscriptionInfo.getCallbackUrl(), body);
-
     }
 
     @Override
     public Mono<String> notifyTypeRemoved(InfoType type, InfoTypeSubscriptions.SubscriptionInfo subscriptionInfo) {
-        ConsumerTypeRegistrationInfo info = new ConsumerTypeRegistrationInfo(type.getJobDataSchema(),
-            ConsumerTypeRegistrationInfo.ConsumerTypeStatusValues.DEREGISTERED, type.getId());
-        String body = gson.toJson(info);
+        String body = body(type, ConsumerTypeRegistrationInfo.ConsumerTypeStatusValues.DEREGISTERED);
         return restClient.post(subscriptionInfo.getCallbackUrl(), body);
     }
 
+    private String body(InfoType type, ConsumerTypeRegistrationInfo.ConsumerTypeStatusValues status) {
+        ConsumerTypeRegistrationInfo info =
+            new ConsumerTypeRegistrationInfo(type.getJobDataSchema(), status, type.getId());
+        return gson.toJson(info);
+    }
+
 }
index fd1901e..9de57d5 100644 (file)
@@ -41,7 +41,6 @@ import java.util.Collection;
 import java.util.List;
 
 import org.json.JSONObject;
-import org.oransc.enrichment.configuration.ApplicationConfig;
 import org.oransc.enrichment.controllers.ErrorResponse;
 import org.oransc.enrichment.controllers.VoidResponse;
 import org.oransc.enrichment.controllers.r1producer.ProducerCallbacks;
@@ -70,36 +69,29 @@ import org.springframework.web.bind.annotation.RestController;
 import reactor.core.publisher.Mono;
 
 @SuppressWarnings("java:S3457") // No need to call "toString()" method as formatting and string ..
-@RestController("Consumer registry")
+@RestController("Consumer API")
 @Tag(name = ConsumerConsts.CONSUMER_API_NAME)
 @RequestMapping(path = ConsumerConsts.API_ROOT, produces = MediaType.APPLICATION_JSON_VALUE)
 public class ConsumerController {
 
     private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
-
-    @Autowired
-    ApplicationConfig applicationConfig;
-
-    @Autowired
-    private InfoJobs jobs;
-
-    @Autowired
-    private InfoTypes infoTypes;
-
-    @Autowired
-    private InfoProducers infoProducers;
-
-    @Autowired
-    private ConsumerCallbacks consumerCallbacks;
-
-    @Autowired
-    private ProducerCallbacks producerCallbacks;
-
-    @Autowired
-    private InfoTypeSubscriptions infoTypeSubscriptions;
-
+    private final InfoJobs infoJobs;
+    private final InfoTypes infoTypes;
+    private final InfoProducers infoProducers;
+    private final ProducerCallbacks producerCallbacks;
+    private final InfoTypeSubscriptions infoTypeSubscriptions;
     private static Gson gson = new GsonBuilder().create();
 
+    public ConsumerController(@Autowired InfoJobs jobs, @Autowired InfoTypes infoTypes,
+        @Autowired InfoProducers infoProducers, @Autowired ProducerCallbacks producerCallbacks,
+        @Autowired InfoTypeSubscriptions infoTypeSubscriptions) {
+        this.infoProducers = infoProducers;
+        this.infoJobs = jobs;
+        this.infoTypeSubscriptions = infoTypeSubscriptions;
+        this.infoTypes = infoTypes;
+        this.producerCallbacks = producerCallbacks;
+    }
+
     @GetMapping(path = "/info-types", produces = MediaType.APPLICATION_JSON_VALUE)
     @Operation(summary = "Information type identifiers", description = "")
     @ApiResponses(
@@ -170,15 +162,15 @@ public class ConsumerController {
         try {
             List<String> result = new ArrayList<>();
             if (owner != null) {
-                for (InfoJob job : this.jobs.getJobsForOwner(owner)) {
+                for (InfoJob job : this.infoJobs.getJobsForOwner(owner)) {
                     if (infoTypeId == null || job.getTypeId().equals(infoTypeId)) {
                         result.add(job.getId());
                     }
                 }
             } else if (infoTypeId != null) {
-                this.jobs.getJobsForType(infoTypeId).forEach(job -> result.add(job.getId()));
+                this.infoJobs.getJobsForType(infoTypeId).forEach(job -> result.add(job.getId()));
             } else {
-                this.jobs.getJobs().forEach(job -> result.add(job.getId()));
+                this.infoJobs.getJobs().forEach(job -> result.add(job.getId()));
             }
             return new ResponseEntity<>(gson.toJson(result), HttpStatus.OK);
         } catch (
@@ -204,7 +196,7 @@ public class ConsumerController {
     public ResponseEntity<Object> getIndividualEiJob( //
         @PathVariable("infoJobId") String infoJobId) {
         try {
-            InfoJob job = this.jobs.getJob(infoJobId);
+            InfoJob job = this.infoJobs.getJob(infoJobId);
             return new ResponseEntity<>(gson.toJson(toInfoJobInfo(job)), HttpStatus.OK);
         } catch (Exception e) {
             return ErrorResponse.create(e, HttpStatus.NOT_FOUND);
@@ -227,7 +219,7 @@ public class ConsumerController {
     public ResponseEntity<Object> getEiJobStatus( //
         @PathVariable("infoJobId") String jobId) {
         try {
-            InfoJob job = this.jobs.getJob(jobId);
+            InfoJob job = this.infoJobs.getJob(jobId);
             return new ResponseEntity<>(gson.toJson(toInfoJobStatus(job)), HttpStatus.OK);
         } catch (Exception e) {
             return ErrorResponse.create(e, HttpStatus.NOT_FOUND);
@@ -264,8 +256,8 @@ public class ConsumerController {
     public ResponseEntity<Object> deleteIndividualEiJob( //
         @PathVariable("infoJobId") String jobId) {
         try {
-            InfoJob job = this.jobs.getJob(jobId);
-            this.jobs.remove(job, this.infoProducers);
+            InfoJob job = this.infoJobs.getJob(jobId);
+            this.infoJobs.remove(job, this.infoProducers);
             return new ResponseEntity<>(HttpStatus.NO_CONTENT);
         } catch (Exception e) {
             return ErrorResponse.create(e, HttpStatus.NOT_FOUND);
@@ -304,11 +296,11 @@ public class ConsumerController {
             defaultValue = "false") boolean performTypeCheck,
         @RequestBody ConsumerJobInfo informationJobObject) {
 
-        final boolean isNewJob = this.jobs.get(jobId) == null;
+        final boolean isNewJob = this.infoJobs.get(jobId) == null;
 
         return validatePutInfoJob(jobId, informationJobObject, performTypeCheck) //
             .flatMap(this::startInfoSubscriptionJob) //
-            .doOnNext(newEiJob -> this.jobs.put(newEiJob)) //
+            .doOnNext(this.infoJobs::put) //
             .flatMap(newEiJob -> Mono.just(new ResponseEntity<>(isNewJob ? HttpStatus.CREATED : HttpStatus.OK)))
             .onErrorResume(throwable -> Mono.just(ErrorResponse.create(throwable, HttpStatus.NOT_FOUND)));
     }
@@ -432,11 +424,10 @@ public class ConsumerController {
     private InfoTypeSubscriptions.SubscriptionInfo toTypeSuscriptionInfo(ConsumerTypeSubscriptionInfo s,
         String subscriptionId) {
         return InfoTypeSubscriptions.SubscriptionInfo.builder() //
-            .callback(this.consumerCallbacks) //
+            .apiVersion(ConsumerCallbacks.API_VERSION) //
             .owner(s.owner) //
             .id(subscriptionId) //
             .callbackUrl(s.statusResultUri).build();
-
     }
 
     private Mono<InfoJob> startInfoSubscriptionJob(InfoJob newInfoJob) {
@@ -452,7 +443,7 @@ public class ConsumerController {
                 InfoType infoType = this.infoTypes.getType(jobInfo.infoTypeId);
                 validateJsonObjectAgainstSchema(infoType.getJobDataSchema(), jobInfo.jobDefinition);
             }
-            InfoJob existingEiJob = this.jobs.get(jobId);
+            InfoJob existingEiJob = this.infoJobs.get(jobId);
             validateUri(jobInfo.statusNotificationUri);
             validateUri(jobInfo.jobResultUri);
 
index 7f73605..60fe35f 100644 (file)
 
 package org.oransc.enrichment.repository;
 
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
 import java.lang.invoke.MethodHandles;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.time.Duration;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
@@ -30,25 +41,32 @@ import java.util.function.Function;
 import lombok.Builder;
 import lombok.Getter;
 
+import org.oransc.enrichment.configuration.ApplicationConfig;
 import org.oransc.enrichment.exceptions.ServiceException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.stereotype.Component;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.util.FileSystemUtils;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
 
 /**
  * Subscriptions of callbacks for type registrations
  */
 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
-@Component
+@Configuration
 public class InfoTypeSubscriptions {
     private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
     private final Map<String, SubscriptionInfo> allSubscriptions = new HashMap<>();
     private final MultiMap<SubscriptionInfo> subscriptionsByOwner = new MultiMap<>();
+    private final Gson gson = new GsonBuilder().create();
+    private final ApplicationConfig config;
+    private final Map<String, ConsumerCallbackHandler> callbackHandlers = new HashMap<>();
 
-    public interface Callbacks {
+    public interface ConsumerCallbackHandler {
         Mono<String> notifyTypeRegistered(InfoType type, SubscriptionInfo subscriptionInfo);
 
         Mono<String> notifyTypeRemoved(InfoType type, SubscriptionInfo subscriptionInfo);
@@ -63,12 +81,27 @@ public class InfoTypeSubscriptions {
 
         private String owner;
 
-        private Callbacks callback;
+        private String apiVersion;
+    }
+
+    public InfoTypeSubscriptions(@Autowired ApplicationConfig config) {
+        this.config = config;
+
+        try {
+            this.restoreFromDatabase();
+        } catch (IOException e) {
+            logger.error("Could not restore info type subscriptions from database {}", this.getDatabaseDirectory());
+        }
+    }
+
+    public void registerCallbackhandler(ConsumerCallbackHandler handler, String apiVersion) {
+        callbackHandlers.put(apiVersion, handler);
     }
 
     public synchronized void put(SubscriptionInfo subscription) {
         allSubscriptions.put(subscription.getId(), subscription);
         subscriptionsByOwner.put(subscription.owner, subscription.id, subscription);
+        storeInFile(subscription);
         logger.debug("Added type status subscription {}", subscription.id);
     }
 
@@ -109,11 +142,19 @@ public class InfoTypeSubscriptions {
     public synchronized void clear() {
         allSubscriptions.clear();
         subscriptionsByOwner.clear();
+        clearDatabase();
     }
 
     public void remove(SubscriptionInfo subscription) {
         allSubscriptions.remove(subscription.getId());
         subscriptionsByOwner.remove(subscription.owner, subscription.id);
+
+        try {
+            Files.delete(getPath(subscription));
+        } catch (Exception e) {
+            logger.debug("Could not delete subscription from database: {}", e.getMessage());
+        }
+
         logger.debug("Removed type status subscription {}", subscription.id);
     }
 
@@ -129,22 +170,113 @@ public class InfoTypeSubscriptions {
     }
 
     public synchronized void notifyTypeRegistered(InfoType type) {
-        notifyAllSubscribers(subscription -> subscription.callback.notifyTypeRegistered(type, subscription));
+        notifyAllSubscribers(
+            subscription -> getCallbacksHandler(subscription.apiVersion).notifyTypeRegistered(type, subscription));
     }
 
     public synchronized void notifyTypeRemoved(InfoType type) {
-        notifyAllSubscribers(subscription -> subscription.callback.notifyTypeRemoved(type, subscription));
+        notifyAllSubscribers(
+            subscription -> getCallbacksHandler(subscription.apiVersion).notifyTypeRemoved(type, subscription));
+    }
+
+    private ConsumerCallbackHandler getCallbacksHandler(String apiVersion) {
+        ConsumerCallbackHandler callbackHandler = this.callbackHandlers.get(apiVersion);
+        if (callbackHandler != null) {
+            return callbackHandler;
+        } else {
+            return new ConsumerCallbackHandler() {
+                @Override
+                public Mono<String> notifyTypeRegistered(InfoType type, SubscriptionInfo subscriptionInfo) {
+                    return error();
+                }
+
+                @Override
+                public Mono<String> notifyTypeRemoved(InfoType type, SubscriptionInfo subscriptionInfo) {
+                    return error();
+                }
+
+                private Mono<String> error() {
+                    return Mono.error(new ServiceException(
+                        "No notifyTypeRegistered handler found for interface version " + apiVersion));
+                }
+            };
+        }
     }
 
     private synchronized void notifyAllSubscribers(Function<? super SubscriptionInfo, Mono<String>> notifyFunc) {
-        final int CONCURRENCY = 5;
+        final int MAX_CONCURRENCY = 5;
         Flux.fromIterable(allSubscriptions.values()) //
-            .flatMap(notifyFunc::apply, CONCURRENCY) //
-            .onErrorResume(throwable -> {
-                logger.warn("Post failed for consumer callback {}", throwable.getMessage());
-                return Flux.empty();
-            }) //
+            .flatMap(subscription -> notifySubscriber(notifyFunc, subscription), MAX_CONCURRENCY) //
             .subscribe();
     }
 
+    /**
+     * Invoking one consumer. If the call fails after retries, the subscription is
+     * removed.
+     * 
+     * @param notifyFunc
+     * @param subscriptionInfo
+     * @return
+     */
+    private Mono<String> notifySubscriber(Function<? super SubscriptionInfo, Mono<String>> notifyFunc,
+        SubscriptionInfo subscriptionInfo) {
+        Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1));
+        return Mono.just(1) //
+            .flatMap(notUsed -> notifyFunc.apply(subscriptionInfo)) //
+            .retryWhen(retrySpec) //
+            .onErrorResume(throwable -> {
+                logger.warn("Consumer callback failed {}, removing subscription {}", throwable.getMessage(),
+                    subscriptionInfo.id);
+                this.remove(subscriptionInfo);
+                return Mono.empty();
+            }); //
+    }
+
+    private void clearDatabase() {
+        try {
+            FileSystemUtils.deleteRecursively(Path.of(getDatabaseDirectory()));
+            Files.createDirectories(Paths.get(getDatabaseDirectory()));
+        } catch (IOException e) {
+            logger.warn("Could not delete database : {}", e.getMessage());
+        }
+    }
+
+    private void storeInFile(SubscriptionInfo subscription) {
+        try {
+            try (PrintStream out = new PrintStream(new FileOutputStream(getFile(subscription)))) {
+                String json = gson.toJson(subscription);
+                out.print(json);
+            }
+        } catch (Exception e) {
+            logger.warn("Could not save subscription: {} {}", subscription.getId(), e.getMessage());
+        }
+    }
+
+    public synchronized void restoreFromDatabase() throws IOException {
+        Files.createDirectories(Paths.get(getDatabaseDirectory()));
+        File dbDir = new File(getDatabaseDirectory());
+
+        for (File file : dbDir.listFiles()) {
+            String json = Files.readString(file.toPath());
+            SubscriptionInfo subscription = gson.fromJson(json, SubscriptionInfo.class);
+            this.allSubscriptions.put(subscription.getId(), subscription);
+        }
+    }
+
+    private File getFile(SubscriptionInfo subscription) {
+        return getPath(subscription).toFile();
+    }
+
+    private Path getPath(SubscriptionInfo subscription) {
+        return getPath(subscription.getId());
+    }
+
+    private Path getPath(String subscriptionId) {
+        return Path.of(getDatabaseDirectory(), subscriptionId);
+    }
+
+    private String getDatabaseDirectory() {
+        return config.getVardataDirectory() + "/database/infotypesubscriptions";
+    }
+
 }
index 12d4e47..8a2b528 100644 (file)
@@ -162,6 +162,7 @@ class ApplicationTest {
         this.infoJobs.clear();
         this.infoTypes.clear();
         this.infoProducers.clear();
+        this.infoTypeSubscriptions.clear();
         this.producerSimulator.getTestResults().reset();
         this.consumerSimulator.getTestResults().reset();
     }
@@ -475,7 +476,6 @@ class ApplicationTest {
         putInfoProducerWithOneType(PRODUCER_ID, TYPE_ID);
 
         verifyJobStatus(EI_JOB_ID, "ENABLED");
-
     }
 
     @Test
@@ -908,7 +908,6 @@ class ApplicationTest {
             InfoTypes types = new InfoTypes(this.applicationConfig);
             types.restoreTypesFromDatabase();
             assertThat(types.size()).isEqualTo(1);
-
         }
         {
             // Restore the jobs, no jobs in database
@@ -922,6 +921,24 @@ class ApplicationTest {
         assertThat(this.infoJobs.size()).isZero();
     }
 
+    @Test
+    void testConsumerTypeSubscriptionDatabase() {
+        final String callbackUrl = baseUrl() + ConsumerSimulatorController.getTypeStatusCallbackUrl();
+        final ConsumerTypeSubscriptionInfo info = new ConsumerTypeSubscriptionInfo(callbackUrl, "owner");
+        // PUT a subscription
+        String body = gson.toJson(info);
+        restClient().putForEntity(typeSubscriptionUrl() + "/subscriptionId", body).block();
+        assertThat(this.infoTypeSubscriptions.size()).isEqualTo(1);
+
+        InfoTypeSubscriptions restoredSubscriptions = new InfoTypeSubscriptions(this.applicationConfig);
+        assertThat(restoredSubscriptions.size()).isEqualTo(1);
+
+        // Delete the subscription
+        restClient().deleteForEntity(typeSubscriptionUrl() + "/subscriptionId").block();
+        restoredSubscriptions = new InfoTypeSubscriptions(this.applicationConfig);
+        assertThat(restoredSubscriptions.size()).isZero();
+    }
+
     @Test
     void testConsumerTypeSubscription() throws Exception {
 
@@ -983,6 +1000,21 @@ class ApplicationTest {
         }
     }
 
+    @Test
+    void testRemovingNonWorkingSubscription() throws Exception {
+        // Test that subscriptions are removed for a unresponsive consumer
+
+        // PUT a subscription with a junk callback
+        final ConsumerTypeSubscriptionInfo info = new ConsumerTypeSubscriptionInfo(baseUrl() + "JUNK", "owner");
+        String body = gson.toJson(info);
+        restClient().putForEntity(typeSubscriptionUrl() + "/subscriptionId", body).block();
+        assertThat(this.infoTypeSubscriptions.size()).isEqualTo(1);
+
+        this.putInfoType(TYPE_ID);
+        // The callback will fail and the subscription will be removed
+        await().untilAsserted(() -> assertThat(this.infoTypeSubscriptions.size()).isZero());
+    }
+
     @Test
     void testTypeSubscriptionErrorCodes() throws Exception {