Refactor datamodel 93/5393/3
authorPatrikBuhr <patrik.buhr@est.tech>
Thu, 7 Jan 2021 08:48:21 +0000 (09:48 +0100)
committerPatrikBuhr <patrik.buhr@est.tech>
Fri, 8 Jan 2021 08:33:37 +0000 (09:33 +0100)
Refactor so that types can exist without producer.
A consumer can then create a job before any producer is available.
EiTypes are made persistent.

Change-Id: Id2cd0be5d2b6fe81ae6367d57fa3cc7c4c8c44c6
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-317

13 files changed:
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/BeanFactory.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/configuration/ApplicationConfig.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerCallbacks.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerController.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerCallbacks.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerController.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiJobs.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiProducers.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiType.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiTypes.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/MultiMap.java
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java
enrichment-coordinator-service/src/test/java/org/oransc/enrichment/ApplicationTest.java

index c5d2bec..785ddfc 100644 (file)
@@ -26,6 +26,7 @@ import java.lang.invoke.MethodHandles;
 
 import org.apache.catalina.connector.Connector;
 import org.oransc.enrichment.configuration.ApplicationConfig;
+import org.oransc.enrichment.controllers.producer.ProducerCallbacks;
 import org.oransc.enrichment.repository.EiJobs;
 import org.oransc.enrichment.repository.EiProducers;
 import org.oransc.enrichment.repository.EiTypes;
@@ -46,6 +47,11 @@ class BeanFactory {
     private final ApplicationConfig applicationConfig = new ApplicationConfig();
     private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
+    private ProducerCallbacks producerCallbacks;
+    private EiTypes eiTypes;
+    private EiJobs eiJobs;
+    private EiProducers eiProducers;
+
     @Bean
     public ObjectMapper mapper() {
         return new ObjectMapper();
@@ -62,23 +68,36 @@ class BeanFactory {
 
     @Bean
     public EiJobs eiJobs() {
-        EiJobs jobs = new EiJobs(getApplicationConfig());
-        try {
-            jobs.restoreJobsFromDatabase();
-        } catch (Exception e) {
-            logger.error("Could not restore jobs from database: {}", e.getMessage());
+        if (eiJobs == null) {
+            eiJobs = new EiJobs(getApplicationConfig(), producerCallbacks());
+            try {
+                eiJobs.restoreJobsFromDatabase();
+            } catch (Exception e) {
+                logger.error("Could not restore jobs from database: {}", e.getMessage());
+            }
         }
-        return jobs;
+        return eiJobs;
     }
 
     @Bean
     public EiTypes eiTypes() {
-        return new EiTypes();
+        if (this.eiTypes == null) {
+            eiTypes = new EiTypes(getApplicationConfig());
+            try {
+                eiTypes.restoreTypesFromDatabase();
+            } catch (Exception e) {
+                logger.error("Could not restore EI types from database: {}", e.getMessage());
+            }
+        }
+        return eiTypes;
     }
 
     @Bean
-    public EiProducers eiProducers() {
-        return new EiProducers();
+    public ProducerCallbacks producerCallbacks() {
+        if (this.producerCallbacks == null) {
+            producerCallbacks = new ProducerCallbacks(getApplicationConfig());
+        }
+        return this.producerCallbacks;
     }
 
     @Bean
index fce9e22..db4201b 100644 (file)
@@ -62,21 +62,26 @@ public class ApplicationConfig {
     @Value("${app.webclient.http.proxy-port:0}")
     private int httpProxyPort = 0;
 
+    private WebClientConfig webClientConfig = null;
+
     public WebClientConfig getWebClientConfig() {
-        HttpProxyConfig httpProxyConfig = ImmutableHttpProxyConfig.builder() //
-            .httpProxyHost(this.httpProxyHost) //
-            .httpProxyPort(this.httpProxyPort) //
-            .build();
-        return ImmutableWebClientConfig.builder() //
-            .keyStoreType(this.sslKeyStoreType) //
-            .keyStorePassword(this.sslKeyStorePassword) //
-            .keyStore(this.sslKeyStore) //
-            .keyPassword(this.sslKeyPassword) //
-            .isTrustStoreUsed(this.sslTrustStoreUsed) //
-            .trustStore(this.sslTrustStore) //
-            .trustStorePassword(this.sslTrustStorePassword) //
-            .httpProxyConfig(httpProxyConfig) //
-            .build();
+        if (this.webClientConfig == null) {
+            HttpProxyConfig httpProxyConfig = ImmutableHttpProxyConfig.builder() //
+                .httpProxyHost(this.httpProxyHost) //
+                .httpProxyPort(this.httpProxyPort) //
+                .build();
+            this.webClientConfig = ImmutableWebClientConfig.builder() //
+                .keyStoreType(this.sslKeyStoreType) //
+                .keyStorePassword(this.sslKeyStorePassword) //
+                .keyStore(this.sslKeyStore) //
+                .keyPassword(this.sslKeyPassword) //
+                .isTrustStoreUsed(this.sslTrustStoreUsed) //
+                .trustStore(this.sslTrustStore) //
+                .trustStorePassword(this.sslTrustStorePassword) //
+                .httpProxyConfig(httpProxyConfig) //
+                .build();
+        }
+        return this.webClientConfig;
     }
 
 }
index c222cfa..eb85d37 100644 (file)
@@ -31,8 +31,8 @@ import org.oransc.enrichment.configuration.ApplicationConfig;
 import org.oransc.enrichment.repository.EiJob;
 import org.oransc.enrichment.repository.EiJobs;
 import org.oransc.enrichment.repository.EiProducer;
+import org.oransc.enrichment.repository.EiProducers;
 import org.oransc.enrichment.repository.EiType;
-import org.oransc.enrichment.repository.EiTypes;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -49,21 +49,21 @@ public class ConsumerCallbacks {
     private static Gson gson = new GsonBuilder().create();
 
     private final AsyncRestClient restClient;
-    private final EiTypes eiTypes;
     private final EiJobs eiJobs;
+    private final EiProducers eiProducers;
 
     @Autowired
-    public ConsumerCallbacks(ApplicationConfig config, EiTypes eiTypes, EiJobs eiJobs) {
+    public ConsumerCallbacks(ApplicationConfig config, EiJobs eiJobs, EiProducers eiProducers) {
         AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config.getWebClientConfig());
         this.restClient = restClientFactory.createRestClientUseHttpProxy("");
-        this.eiTypes = eiTypes;
         this.eiJobs = eiJobs;
+        this.eiProducers = eiProducers;
     }
 
     public void notifyConsumersProducerDeleted(EiProducer eiProducer) {
         for (EiType type : eiProducer.getEiTypes()) {
-            if (this.eiTypes.get(type.getId()) == null) {
-                // The type is removed
+            if (this.eiProducers.getProducersForType(type).isEmpty()) {
+                // No producers left for the type
                 for (EiJob job : this.eiJobs.getJobsForType(type)) {
                     if (job.isLastStatusReportedEnabled()) {
                         noifyJobOwner(job, new ConsumerEiJobStatus(ConsumerEiJobStatus.EiJobStatusValues.DISABLED));
index f45ff73..6e9438d 100644 (file)
@@ -33,7 +33,6 @@ import io.swagger.annotations.ApiResponses;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
-import java.util.Vector;
 
 import org.everit.json.schema.Schema;
 import org.everit.json.schema.loader.SchemaLoader;
@@ -46,6 +45,7 @@ import org.oransc.enrichment.exceptions.ServiceException;
 import org.oransc.enrichment.repository.EiJob;
 import org.oransc.enrichment.repository.EiJobs;
 import org.oransc.enrichment.repository.EiProducer;
+import org.oransc.enrichment.repository.EiProducers;
 import org.oransc.enrichment.repository.EiType;
 import org.oransc.enrichment.repository.EiTypes;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -77,6 +77,9 @@ public class ConsumerController {
     @Autowired
     private EiTypes eiTypes;
 
+    @Autowired
+    private EiProducers eiProducers;
+
     @Autowired
     ProducerCallbacks producerCallbacks;
 
@@ -206,11 +209,7 @@ public class ConsumerController {
     }
 
     private Collection<EiProducer> getProducers(EiJob eiJob) {
-        try {
-            return this.eiTypes.getType(eiJob.getTypeId()).getProducers();
-        } catch (Exception e) {
-            return new Vector<>();
-        }
+        return this.eiProducers.getProducersForType(eiJob.getTypeId());
     }
 
     private ConsumerEiJobStatus toEiJobStatus(EiJob job) {
@@ -235,8 +234,7 @@ public class ConsumerController {
         @PathVariable("eiJobId") String eiJobId) {
         try {
             EiJob job = this.eiJobs.getJob(eiJobId);
-            this.eiJobs.remove(job);
-            this.producerCallbacks.notifyProducersJobDeleted(job);
+            this.eiJobs.remove(job, this.eiProducers);
             return new ResponseEntity<>(HttpStatus.NO_CONTENT);
         } catch (Exception e) {
             return ErrorResponse.create(e, HttpStatus.NOT_FOUND);
@@ -263,14 +261,14 @@ public class ConsumerController {
         final boolean isNewJob = this.eiJobs.get(eiJobId) == null;
 
         return validatePutEiJob(eiJobId, eiJobObject) //
-            .flatMap(this::notifyProducersNewJob) //
+            .flatMap(this::startEiJob) //
             .doOnNext(newEiJob -> this.eiJobs.put(newEiJob)) //
             .flatMap(newEiJob -> Mono.just(new ResponseEntity<>(isNewJob ? HttpStatus.CREATED : HttpStatus.OK)))
             .onErrorResume(throwable -> Mono.just(ErrorResponse.create(throwable, HttpStatus.NOT_FOUND)));
     }
 
-    private Mono<EiJob> notifyProducersNewJob(EiJob newEiJob) {
-        return this.producerCallbacks.notifyProducersJobStarted(newEiJob) //
+    private Mono<EiJob> startEiJob(EiJob newEiJob) {
+        return this.producerCallbacks.startEiJob(newEiJob, eiProducers) //
             .flatMap(noOfAcceptingProducers -> {
                 if (noOfAcceptingProducers.intValue() > 0) {
                     return Mono.just(newEiJob);
index 00d9c14..45b4475 100644 (file)
@@ -26,7 +26,6 @@ import com.google.gson.GsonBuilder;
 import java.lang.invoke.MethodHandles;
 import java.time.Duration;
 import java.util.Collection;
-import java.util.Vector;
 
 import org.oransc.enrichment.clients.AsyncRestClient;
 import org.oransc.enrichment.clients.AsyncRestClientFactory;
@@ -34,11 +33,9 @@ import org.oransc.enrichment.configuration.ApplicationConfig;
 import org.oransc.enrichment.repository.EiJob;
 import org.oransc.enrichment.repository.EiJobs;
 import org.oransc.enrichment.repository.EiProducer;
-import org.oransc.enrichment.repository.EiTypes;
+import org.oransc.enrichment.repository.EiProducers;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
@@ -47,7 +44,6 @@ import reactor.util.retry.Retry;
 /**
  * Callbacks to the EiProducer
  */
-@Component
 @SuppressWarnings("java:S3457") // No need to call "toString()" method as formatting and string ..
 public class ProducerCallbacks {
 
@@ -55,17 +51,14 @@ public class ProducerCallbacks {
     private static Gson gson = new GsonBuilder().create();
 
     private final AsyncRestClient restClient;
-    private final EiTypes eiTypes;
 
-    @Autowired
-    public ProducerCallbacks(ApplicationConfig config, EiTypes eiTypes) {
+    public ProducerCallbacks(ApplicationConfig config) {
         AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config.getWebClientConfig());
         this.restClient = restClientFactory.createRestClientNoHttpProxy("");
-        this.eiTypes = eiTypes;
     }
 
-    public void notifyProducersJobDeleted(EiJob eiJob) {
-        for (EiProducer producer : getProducers(eiJob)) {
+    public void stopEiJob(EiJob eiJob, EiProducers eiProducers) {
+        for (EiProducer producer : getProducersForJob(eiJob, eiProducers)) {
             String url = producer.getJobCallbackUrl() + "/" + eiJob.getId();
             restClient.delete(url) //
                 .subscribe(notUsed -> logger.debug("Producer job deleted OK {}", producer.getId()), //
@@ -81,10 +74,10 @@ public class ProducerCallbacks {
      * @param eiJob an EI job
      * @return the number of producers that returned OK
      */
-    public Mono<Integer> notifyProducersJobStarted(EiJob eiJob) {
+    public Mono<Integer> startEiJob(EiJob eiJob, EiProducers eiProducers) {
         Retry retrySpec = Retry.fixedDelay(1, Duration.ofSeconds(1));
-        return Flux.fromIterable(getProducers(eiJob)) //
-            .flatMap(eiProducer -> notifyProducerJobStarted(eiProducer, eiJob, retrySpec)) //
+        return Flux.fromIterable(getProducersForJob(eiJob, eiProducers)) //
+            .flatMap(eiProducer -> postStartEiJob(eiProducer, eiJob, retrySpec)) //
             .collectList() //
             .flatMap(okResponses -> Mono.just(Integer.valueOf(okResponses.size()))); //
     }
@@ -95,13 +88,13 @@ public class ProducerCallbacks {
      * @param producer
      * @param eiJobs
      */
-    public void restartJobs(EiProducer producer, EiJobs eiJobs) {
+    public void restartEiJobs(EiProducer producer, EiJobs eiJobs) {
         final int maxNoOfParalellRequests = 10;
         Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1));
 
         Flux.fromIterable(producer.getEiTypes()) //
             .flatMap(type -> Flux.fromIterable(eiJobs.getJobsForType(type))) //
-            .flatMap(job -> notifyProducerJobStarted(producer, job, retrySpec), maxNoOfParalellRequests) //
+            .flatMap(job -> postStartEiJob(producer, job, retrySpec), maxNoOfParalellRequests) //
             .onErrorResume(t -> {
                 logger.error("Could not restart EI Job for producer: {}, reason :{}", producer.getId(), t.getMessage());
                 return Flux.empty();
@@ -109,7 +102,7 @@ public class ProducerCallbacks {
             .subscribe();
     }
 
-    private Mono<String> notifyProducerJobStarted(EiProducer producer, EiJob eiJob, Retry retrySpec) {
+    private Mono<String> postStartEiJob(EiProducer producer, EiJob eiJob, Retry retrySpec) {
         ProducerJobInfo request = new ProducerJobInfo(eiJob);
         String body = gson.toJson(request);
 
@@ -122,12 +115,8 @@ public class ProducerCallbacks {
             });
     }
 
-    private Collection<EiProducer> getProducers(EiJob eiJob) {
-        try {
-            return this.eiTypes.getType(eiJob.getTypeId()).getProducers();
-        } catch (Exception e) {
-            return new Vector<>();
-        }
+    private Collection<EiProducer> getProducersForJob(EiJob eiJob, EiProducers eiProducers) {
+        return eiProducers.getProducersForType(eiJob.getTypeId());
     }
 
 }
index e517b3a..e773117 100644 (file)
@@ -35,7 +35,6 @@ import java.util.List;
 
 import org.oransc.enrichment.controllers.ErrorResponse;
 import org.oransc.enrichment.controllers.VoidResponse;
-import org.oransc.enrichment.controllers.consumer.ConsumerCallbacks;
 import org.oransc.enrichment.controllers.producer.ProducerRegistrationInfo.ProducerEiTypeRegistrationInfo;
 import org.oransc.enrichment.repository.EiJob;
 import org.oransc.enrichment.repository.EiJobs;
@@ -43,6 +42,8 @@ import org.oransc.enrichment.repository.EiProducer;
 import org.oransc.enrichment.repository.EiProducers;
 import org.oransc.enrichment.repository.EiType;
 import org.oransc.enrichment.repository.EiTypes;
+import org.oransc.enrichment.repository.ImmutableEiProducerRegistrationInfo;
+import org.oransc.enrichment.repository.ImmutableEiTypeRegistrationInfo;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -74,12 +75,6 @@ public class ProducerController {
     @Autowired
     private EiProducers eiProducers;
 
-    @Autowired
-    ProducerCallbacks producerCallbacks;
-
-    @Autowired
-    ConsumerCallbacks consumerCallbacks;
-
     @GetMapping(path = ProducerConsts.API_ROOT + "/eitypes", produces = MediaType.APPLICATION_JSON_VALUE)
     @ApiOperation(value = "EI type identifiers", notes = "")
     @ApiResponses(
@@ -233,33 +228,13 @@ public class ProducerController {
         @RequestBody ProducerRegistrationInfo registrationInfo) {
         try {
             EiProducer previousDefinition = this.eiProducers.get(eiProducerId);
-            if (previousDefinition != null) {
-                for (EiType type : previousDefinition.getEiTypes()) {
-                    type.removeProducer(previousDefinition);
-                }
-            }
-
-            EiProducer producer = registerProducer(eiProducerId, registrationInfo);
-            if (previousDefinition != null) {
-                purgeTypes(previousDefinition.getEiTypes());
-                this.consumerCallbacks.notifyConsumersProducerDeleted(previousDefinition);
-            }
-            this.consumerCallbacks.notifyConsumersProducerAdded(producer);
-
+            this.eiProducers.registerProducer(toEiProducerRegistrationInfo(eiProducerId, registrationInfo));
             return new ResponseEntity<>(previousDefinition == null ? HttpStatus.CREATED : HttpStatus.OK);
         } catch (Exception e) {
             return ErrorResponse.create(e, HttpStatus.NOT_FOUND);
         }
     }
 
-    private void purgeTypes(Collection<EiType> types) {
-        for (EiType type : types) {
-            if (type.getProducerIds().isEmpty()) {
-                this.eiTypes.remove(type);
-            }
-        }
-    }
-
     @DeleteMapping(
         path = ProducerConsts.API_ROOT + "/eiproducers/{eiProducerId}",
         produces = MediaType.APPLICATION_JSON_VALUE)
@@ -272,45 +247,14 @@ public class ProducerController {
     public ResponseEntity<Object> deleteEiProducer(@PathVariable("eiProducerId") String eiProducerId) {
         try {
             final EiProducer producer = this.eiProducers.getProducer(eiProducerId);
-            this.eiProducers.deregisterProducer(producer, this.eiTypes, this.eiJobs);
-            this.consumerCallbacks.notifyConsumersProducerDeleted(producer);
+            this.eiProducers.deregisterProducer(producer, this.eiTypes);
             return new ResponseEntity<>(HttpStatus.NO_CONTENT);
         } catch (Exception e) {
             return ErrorResponse.create(e, HttpStatus.NOT_FOUND);
         }
     }
 
-    private EiType registerType(ProducerEiTypeRegistrationInfo typeInfo) {
-        EiType type = this.eiTypes.get(typeInfo.eiTypeId);
-        if (type == null) {
-            type = new EiType(typeInfo.eiTypeId, typeInfo.jobDataSchema);
-            this.eiTypes.put(type);
-            this.consumerCallbacks.notifyConsumersTypeAdded(type);
-        }
-        return type;
-    }
-
-    EiProducer createProducer(Collection<EiType> types, String producerId, ProducerRegistrationInfo registrationInfo) {
-        return new EiProducer(producerId, types, registrationInfo.jobCallbackUrl,
-            registrationInfo.producerSupervisionCallbackUrl);
-    }
-
-    private EiProducer registerProducer(String producerId, ProducerRegistrationInfo registrationInfo) {
-        ArrayList<EiType> typesForProducer = new ArrayList<>();
-        EiProducer producer = createProducer(typesForProducer, producerId, registrationInfo);
-        for (ProducerEiTypeRegistrationInfo typeInfo : registrationInfo.types) {
-            EiType type = registerType(typeInfo);
-            typesForProducer.add(type);
-            type.addProducer(producer); //
-        }
-        this.eiProducers.put(producer);
-
-        producerCallbacks.restartJobs(producer, this.eiJobs);
-
-        return producer;
-    }
-
-    ProducerRegistrationInfo toEiProducerRegistrationInfo(EiProducer p) {
+    private ProducerRegistrationInfo toEiProducerRegistrationInfo(EiProducer p) {
         Collection<ProducerEiTypeRegistrationInfo> types = new ArrayList<>();
         for (EiType type : p.getEiTypes()) {
             types.add(toEiTypeRegistrationInfo(type));
@@ -323,6 +267,26 @@ public class ProducerController {
     }
 
     private ProducerEiTypeInfo toEiTypeInfo(EiType t) {
-        return new ProducerEiTypeInfo(t.getJobDataSchema(), t.getProducerIds());
+        Collection<String> producerIds = this.eiProducers.getProducerIdsForType(t.getId());
+        return new ProducerEiTypeInfo(t.getJobDataSchema(), producerIds);
+    }
+
+    private EiProducers.EiProducerRegistrationInfo toEiProducerRegistrationInfo(String eiProducerId,
+        ProducerRegistrationInfo info) {
+        Collection<EiProducers.EiTypeRegistrationInfo> supportedTypes = new ArrayList<>();
+        for (ProducerEiTypeRegistrationInfo typeInfo : info.types) {
+            EiProducers.EiTypeRegistrationInfo i = ImmutableEiTypeRegistrationInfo.builder() //
+                .id(typeInfo.eiTypeId) //
+                .jobDataSchema(typeInfo.jobDataSchema) //
+                .build();
+            supportedTypes.add(i);
+        }
+        return ImmutableEiProducerRegistrationInfo.builder() //
+            .id(eiProducerId) //
+            .jobCallbackUrl(info.jobCallbackUrl) //
+            .producerSupervisionCallbackUrl(info.producerSupervisionCallbackUrl) //
+            .supportedTypes(supportedTypes) //
+            .build();
     }
+
 }
index bff5be2..f5224f2 100644 (file)
@@ -39,6 +39,7 @@ import java.util.ServiceLoader;
 import java.util.Vector;
 
 import org.oransc.enrichment.configuration.ApplicationConfig;
+import org.oransc.enrichment.controllers.producer.ProducerCallbacks;
 import org.oransc.enrichment.exceptions.ServiceException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -57,11 +58,14 @@ public class EiJobs {
     private final ApplicationConfig config;
     private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
 
-    public EiJobs(ApplicationConfig config) {
+    private final ProducerCallbacks producerCallbacks;
+
+    public EiJobs(ApplicationConfig config, ProducerCallbacks producerCallbacks) {
         this.config = config;
         GsonBuilder gsonBuilder = new GsonBuilder();
         ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
         this.gson = gsonBuilder.create();
+        this.producerCallbacks = producerCallbacks;
     }
 
     public synchronized void restoreJobsFromDatabase() throws IOException {
@@ -71,13 +75,13 @@ public class EiJobs {
         for (File file : dbDir.listFiles()) {
             String json = Files.readString(file.toPath());
             EiJob job = gson.fromJson(json, EiJob.class);
-            this.put(job, false);
+            this.doPut(job);
         }
-
     }
 
     public synchronized void put(EiJob job) {
-        this.put(job, true);
+        this.doPut(job);
+        storeJobInFile(job);
     }
 
     public synchronized Collection<EiJob> getJobs() {
@@ -108,15 +112,15 @@ public class EiJobs {
         return allEiJobs.get(id);
     }
 
-    public synchronized EiJob remove(String id) {
+    public synchronized EiJob remove(String id, EiProducers eiProducers) {
         EiJob job = allEiJobs.get(id);
         if (job != null) {
-            remove(job);
+            remove(job, eiProducers);
         }
         return job;
     }
 
-    public synchronized void remove(EiJob job) {
+    public synchronized void remove(EiJob job, EiProducers eiProducers) {
         this.allEiJobs.remove(job.getId());
         jobsByType.remove(job.getTypeId(), job.getId());
         jobsByOwner.remove(job.getOwner(), job.getId());
@@ -126,7 +130,7 @@ public class EiJobs {
         } catch (IOException e) {
             logger.warn("Could not remove file: {}", e.getMessage());
         }
-
+        this.producerCallbacks.stopEiJob(job, eiProducers);
     }
 
     public synchronized int size() {
@@ -137,6 +141,10 @@ public class EiJobs {
         this.allEiJobs.clear();
         this.jobsByType.clear();
         jobsByOwner.clear();
+        clearDatabase();
+    }
+
+    private void clearDatabase() {
         try {
             FileSystemUtils.deleteRecursively(Path.of(getDatabaseDirectory()));
             Files.createDirectories(Paths.get(getDatabaseDirectory()));
@@ -145,13 +153,10 @@ public class EiJobs {
         }
     }
 
-    private void put(EiJob job, boolean storePersistently) {
+    private void doPut(EiJob job) {
         allEiJobs.put(job.getId(), job);
         jobsByType.put(job.getTypeId(), job.getId(), job);
         jobsByOwner.put(job.getOwner(), job.getId(), job);
-        if (storePersistently) {
-            storeJobInFile(job);
-        }
     }
 
     private void storeJobInFile(EiJob job) {
@@ -173,7 +178,7 @@ public class EiJobs {
     }
 
     private String getDatabaseDirectory() {
-        return config.getVardataDirectory() + "/database";
+        return config.getVardataDirectory() + "/eijobs";
     }
 
 }
index 801e7fc..fcc9156 100644 (file)
 package org.oransc.enrichment.repository;
 
 import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Vector;
 
+import org.immutables.value.Value.Immutable;
+import org.oransc.enrichment.controllers.consumer.ConsumerCallbacks;
+import org.oransc.enrichment.controllers.producer.ProducerCallbacks;
 import org.oransc.enrichment.exceptions.ServiceException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
 
 /**
  * Dynamic representation of all EiProducers.
  */
 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
+@Component
 public class EiProducers {
     private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
     private final Map<String, EiProducer> allEiProducers = new HashMap<>();
+    private final MultiMap<EiProducer> producersByType = new MultiMap<>();
 
-    public synchronized void put(EiProducer producer) {
+    @Autowired
+    private ProducerCallbacks producerCallbacks;
+
+    @Autowired
+    private ConsumerCallbacks consumerCallbacks;
+
+    @Autowired
+    private EiTypes eiTypes;
+
+    @Autowired
+    private EiJobs eiJobs;
+
+    @Immutable
+    public interface EiTypeRegistrationInfo {
+        String id();
+
+        Object jobDataSchema();
+    }
+
+    @Immutable
+    public interface EiProducerRegistrationInfo {
+        String id();
+
+        Collection<EiTypeRegistrationInfo> supportedTypes();
+
+        String jobCallbackUrl();
+
+        String producerSupervisionCallbackUrl();
+
+    }
+
+    public EiProducer registerProducer(EiProducerRegistrationInfo producerInfo) {
+        final String producerId = producerInfo.id();
+        EiProducer previousDefinition = this.get(producerId);
+        if (previousDefinition != null) {
+            for (EiType type : previousDefinition.getEiTypes()) {
+                producersByType.remove(type.getId(), producerId);
+            }
+            allEiProducers.remove(producerId);
+        }
+
+        EiProducer producer = createProducer(producerInfo);
         allEiProducers.put(producer.getId(), producer);
+        for (EiType type : producer.getEiTypes()) {
+            producersByType.put(type.getId(), producer.getId(), producer);
+        }
+
+        if (previousDefinition != null) {
+            purgeTypes(previousDefinition.getEiTypes());
+            this.consumerCallbacks.notifyConsumersProducerDeleted(previousDefinition);
+        }
+
+        producerCallbacks.restartEiJobs(producer, this.eiJobs);
+        consumerCallbacks.notifyConsumersProducerAdded(producer);
+        return producer;
+    }
+
+    private void purgeTypes(Collection<EiType> types) {
+        for (EiType type : types) {
+            if (getProducersForType(type.getId()).isEmpty()) {
+                this.eiTypes.remove(type);
+            }
+        }
+    }
+
+    private EiType getType(EiTypeRegistrationInfo typeInfo) {
+        EiType type = this.eiTypes.get(typeInfo.id());
+        if (type == null) {
+            type = new EiType(typeInfo.id(), typeInfo.jobDataSchema());
+            this.eiTypes.put(type);
+            this.consumerCallbacks.notifyConsumersTypeAdded(type);
+        }
+        return type;
+    }
+
+    private EiProducer createProducer(EiProducerRegistrationInfo producerInfo) {
+        ArrayList<EiType> types = new ArrayList<>();
+
+        EiProducer producer = new EiProducer(producerInfo.id(), types, producerInfo.jobCallbackUrl(),
+            producerInfo.producerSupervisionCallbackUrl());
+
+        for (EiTypeRegistrationInfo typeInfo : producerInfo.supportedTypes()) {
+            EiType type = getType(typeInfo);
+            types.add(type);
+        }
+        return producer;
     }
 
     public synchronized Collection<EiProducer> getAllProducers() {
@@ -58,33 +150,42 @@ public class EiProducers {
         return allEiProducers.get(id);
     }
 
-    public synchronized void remove(String id) {
-        this.allEiProducers.remove(id);
-    }
-
     public synchronized int size() {
         return allEiProducers.size();
     }
 
     public synchronized void clear() {
         this.allEiProducers.clear();
+        this.producersByType.clear();
     }
 
-    public void deregisterProducer(EiProducer producer, EiTypes eiTypes, EiJobs eiJobs) {
-        this.remove(producer);
+    public void deregisterProducer(EiProducer producer, EiTypes eiTypes) {
+        allEiProducers.remove(producer.getId());
         for (EiType type : producer.getEiTypes()) {
-            boolean removed = type.removeProducer(producer) != null;
-            if (!removed) {
+            if (producersByType.remove(type.getId(), producer.getId()) == null) {
                 this.logger.error("Bug, no producer found");
             }
-            if (type.getProducerIds().isEmpty()) {
+            if (this.producersByType.get(type.getId()).isEmpty()) {
                 eiTypes.remove(type);
             }
         }
+        this.consumerCallbacks.notifyConsumersProducerDeleted(producer);
     }
 
-    private synchronized void remove(EiProducer producer) {
-        this.allEiProducers.remove(producer.getId());
+    public synchronized Collection<EiProducer> getProducersForType(EiType type) {
+        return this.producersByType.get(type.getId());
+    }
+
+    public synchronized Collection<EiProducer> getProducersForType(String typeId) {
+        return this.producersByType.get(typeId);
+    }
+
+    public synchronized Collection<String> getProducerIdsForType(String typeId) {
+        Collection<String> producerIds = new ArrayList<>();
+        for (EiProducer p : this.getProducersForType(typeId)) {
+            producerIds.add(p.getId());
+        }
+        return producerIds;
     }
 
 }
index a354198..5d9057a 100644 (file)
 
 package org.oransc.enrichment.repository;
 
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
 import lombok.Getter;
 
 public class EiType {
@@ -34,26 +29,9 @@ public class EiType {
     @Getter
     private final Object jobDataSchema;
 
-    private final Map<String, EiProducer> producers = new HashMap<>();
-
     public EiType(String id, Object jobDataSchema) {
         this.id = id;
         this.jobDataSchema = jobDataSchema;
     }
 
-    public synchronized Collection<EiProducer> getProducers() {
-        return Collections.unmodifiableCollection(producers.values());
-    }
-
-    public synchronized Collection<String> getProducerIds() {
-        return Collections.unmodifiableCollection(producers.keySet());
-    }
-
-    public synchronized void addProducer(EiProducer producer) {
-        this.producers.put(producer.getId(), producer);
-    }
-
-    public synchronized EiProducer removeProducer(EiProducer producer) {
-        return this.producers.remove(producer.getId());
-    }
 }
index d0bf53a..265a8e4 100644 (file)
 
 package org.oransc.enrichment.repository;
 
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.TypeAdapterFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
 import java.lang.invoke.MethodHandles;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.ServiceLoader;
 import java.util.Vector;
 
+import org.oransc.enrichment.configuration.ApplicationConfig;
 import org.oransc.enrichment.exceptions.ServiceException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.springframework.util.FileSystemUtils;
 
 /**
  * Dynamic representation of all EI types in the system.
@@ -37,9 +51,30 @@ import org.slf4j.LoggerFactory;
 public class EiTypes {
     private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
     private final Map<String, EiType> allEiTypes = new HashMap<>();
+    private final ApplicationConfig config;
+    private final Gson gson;
+
+    public EiTypes(ApplicationConfig config) {
+        this.config = config;
+        GsonBuilder gsonBuilder = new GsonBuilder();
+        ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
+        this.gson = gsonBuilder.create();
+    }
+
+    public synchronized void restoreTypesFromDatabase() throws IOException {
+        Files.createDirectories(Paths.get(getDatabaseDirectory()));
+        File dbDir = new File(getDatabaseDirectory());
+
+        for (File file : dbDir.listFiles()) {
+            String json = Files.readString(file.toPath());
+            EiType type = gson.fromJson(json, EiType.class);
+            allEiTypes.put(type.getId(), type);
+        }
+    }
 
     public synchronized void put(EiType type) {
         allEiTypes.put(type.getId(), type);
+        storeInFile(type);
     }
 
     public synchronized Collection<EiType> getAllEiTypes() {
@@ -58,12 +93,13 @@ public class EiTypes {
         return allEiTypes.get(id);
     }
 
-    public synchronized void remove(String id) {
-        allEiTypes.remove(id);
-    }
-
     public synchronized void remove(EiType type) {
-        this.remove(type.getId());
+        allEiTypes.remove(type.getId());
+        try {
+            Files.delete(getPath(type));
+        } catch (IOException e) {
+            logger.warn("Could not remove file: {} {}", type.getId(), e.getMessage());
+        }
     }
 
     public synchronized int size() {
@@ -72,5 +108,41 @@ public class EiTypes {
 
     public synchronized void clear() {
         this.allEiTypes.clear();
+        clearDatabase();
+    }
+
+    private void clearDatabase() {
+        try {
+            FileSystemUtils.deleteRecursively(Path.of(getDatabaseDirectory()));
+            Files.createDirectories(Paths.get(getDatabaseDirectory()));
+        } catch (IOException e) {
+            logger.warn("Could not delete database : {}", e.getMessage());
+        }
+    }
+
+    private void storeInFile(EiType type) {
+        try {
+            try (PrintStream out = new PrintStream(new FileOutputStream(getFile(type)))) {
+                out.print(gson.toJson(type));
+            }
+        } catch (Exception e) {
+            logger.warn("Could not save job: {} {}", type.getId(), e.getMessage());
+        }
+    }
+
+    private File getFile(EiType type) {
+        return getPath(type).toFile();
+    }
+
+    private Path getPath(EiType type) {
+        return getPath(type.getId());
+    }
+
+    private Path getPath(String typeId) {
+        return Path.of(getDatabaseDirectory(), typeId);
+    }
+
+    private String getDatabaseDirectory() {
+        return config.getVardataDirectory() + "/eitypes";
     }
 }
index c2b8270..25e559c 100644 (file)
@@ -38,14 +38,16 @@ public class MultiMap<T> {
         this.map.computeIfAbsent(key, k -> new HashMap<>()).put(id, value);
     }
 
-    public void remove(String key, String id) {
+    public T remove(String key, String id) {
         Map<String, T> innerMap = this.map.get(key);
         if (innerMap != null) {
-            innerMap.remove(id);
+            T removedElement = innerMap.remove(id);
             if (innerMap.isEmpty()) {
                 this.map.remove(key);
             }
+            return removedElement;
         }
+        return null;
     }
 
     public Collection<T> get(String key) {
index b4c21d4..d8ee970 100644 (file)
@@ -89,7 +89,7 @@ public class ProducerSupervision {
         logger.warn("Unresponsive producer: {} exception: {}", producer.getId(), throwable.getMessage());
         producer.setAliveStatus(false);
         if (producer.isDead()) {
-            this.eiProducers.deregisterProducer(producer, this.eiTypes, this.eiJobs);
+            this.eiProducers.deregisterProducer(producer, this.eiTypes);
             this.consumerCallbacks.notifyConsumersProducerDeleted(producer);
         }
     }
index b62a965..9f3dd49 100644 (file)
@@ -55,6 +55,7 @@ import org.oransc.enrichment.controllers.consumer.ConsumerConsts;
 import org.oransc.enrichment.controllers.consumer.ConsumerEiJobInfo;
 import org.oransc.enrichment.controllers.consumer.ConsumerEiJobStatus;
 import org.oransc.enrichment.controllers.consumer.ConsumerEiTypeInfo;
+import org.oransc.enrichment.controllers.producer.ProducerCallbacks;
 import org.oransc.enrichment.controllers.producer.ProducerConsts;
 import org.oransc.enrichment.controllers.producer.ProducerJobInfo;
 import org.oransc.enrichment.controllers.producer.ProducerRegistrationInfo;
@@ -127,6 +128,9 @@ class ApplicationTest {
     @Autowired
     ProducerSupervision producerSupervision;
 
+    @Autowired
+    ProducerCallbacks producerCallbacks;
+
     private static Gson gson = new GsonBuilder().create();
 
     /**
@@ -382,7 +386,7 @@ class ApplicationTest {
 
         assertThat(this.eiTypes.size()).isEqualTo(1);
         EiType type = this.eiTypes.getType(EI_TYPE_ID);
-        assertThat(type.getProducerIds()).contains("eiProducerId");
+        assertThat(this.eiProducers.getProducersForType(EI_TYPE_ID).size()).isEqualTo(1);
         assertThat(this.eiProducers.size()).isEqualTo(1);
         assertThat(this.eiProducers.get("eiProducerId").getEiTypes().iterator().next().getId()).isEqualTo(EI_TYPE_ID);
 
@@ -453,14 +457,14 @@ class ApplicationTest {
 
         assertThat(this.eiProducers.size()).isEqualTo(2);
         EiType type = this.eiTypes.getType(EI_TYPE_ID);
-        assertThat(type.getProducerIds()).contains("eiProducerId");
-        assertThat(type.getProducerIds()).contains("eiProducerId2");
+        assertThat(this.eiProducers.getProducerIdsForType(type.getId())).contains("eiProducerId");
+        assertThat(this.eiProducers.getProducerIdsForType(type.getId())).contains("eiProducerId2");
         putEiJob(EI_TYPE_ID, "jobId");
         assertThat(this.eiJobs.size()).isEqualTo(1);
 
         deleteEiProducer("eiProducerId");
         assertThat(this.eiProducers.size()).isEqualTo(1);
-        assertThat(this.eiTypes.getType(EI_TYPE_ID).getProducerIds()).doesNotContain("eiProducerId");
+        assertThat(this.eiProducers.getProducerIdsForType(EI_TYPE_ID)).doesNotContain("eiProducerId");
         verifyJobStatus("jobId", "ENABLED");
 
         deleteEiProducer("eiProducerId2");
@@ -555,8 +559,8 @@ class ApplicationTest {
 
         // After 3 failed checks, the producer and the type shall be deregisterred
         this.producerSupervision.createTask().blockLast();
-        assertThat(this.eiProducers.size()).isEqualTo(0);
-        assertThat(this.eiTypes.size()).isEqualTo(0);
+        assertThat(this.eiProducers.size()).isEqualTo(0); // The producer is removed
+        assertThat(this.eiTypes.size()).isEqualTo(0); // The type is removed
         verifyJobStatus(EI_JOB_ID, "DISABLED");
 
         // Job disabled status notification shall be received
@@ -585,21 +589,49 @@ class ApplicationTest {
 
         {
             // Restore the jobs
-            EiJobs jobs = new EiJobs(this.applicationConfig);
+            EiJobs jobs = new EiJobs(this.applicationConfig, this.producerCallbacks);
             jobs.restoreJobsFromDatabase();
             assertThat(jobs.size()).isEqualTo(2);
-            jobs.remove("jobId1");
-            jobs.remove("jobId2");
+            jobs.remove("jobId1", this.eiProducers);
+            jobs.remove("jobId2", this.eiProducers);
         }
         {
             // Restore the jobs, no jobs in database
-            EiJobs jobs = new EiJobs(this.applicationConfig);
+            EiJobs jobs = new EiJobs(this.applicationConfig, this.producerCallbacks);
             jobs.restoreJobsFromDatabase();
             assertThat(jobs.size()).isEqualTo(0);
         }
         logger.warn("Test removing a job when the db file is gone");
-        this.eiJobs.remove("jobId1");
+        this.eiJobs.remove("jobId1", this.eiProducers);
         assertThat(this.eiJobs.size()).isEqualTo(1);
+
+        ProducerSimulatorController.TestResults simulatorResults = this.producerSimulator.getTestResults();
+        await().untilAsserted(() -> assertThat(simulatorResults.jobsStopped.size()).isEqualTo(3));
+    }
+
+    @Test
+    void testEiTypesDatabase() throws Exception {
+        putEiProducerWithOneType(EI_PRODUCER_ID, EI_TYPE_ID);
+
+        assertThat(this.eiTypes.size()).isEqualTo(1);
+
+        {
+            // Restore the types
+            EiTypes types = new EiTypes(this.applicationConfig);
+            types.restoreTypesFromDatabase();
+            assertThat(types.size()).isEqualTo(1);
+
+        }
+        {
+            // Restore the jobs, no jobs in database
+            EiTypes types = new EiTypes(this.applicationConfig);
+            types.clear();
+            types.restoreTypesFromDatabase();
+            assertThat(types.size()).isEqualTo(0);
+        }
+        logger.warn("Test removing a job when the db file is gone");
+        this.eiTypes.remove(this.eiTypes.getType(EI_TYPE_ID));
+        assertThat(this.eiJobs.size()).isEqualTo(0);
     }
 
     private void deleteEiProducer(String eiProducerId) {
@@ -706,6 +738,7 @@ class ApplicationTest {
         String body = gson.toJson(producerEiRegistratioInfo(eiTypeId));
 
         restClient().putForEntity(url, body).block();
+
         return this.eiTypes.getType(eiTypeId);
     }