From: Henrik Andersson Date: Fri, 8 Jan 2021 16:25:38 +0000 (+0000) Subject: Merge "Refactor datamodel" X-Git-Tag: 2.2.0~93 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=b61264738a459de5f1b9333ee4cb486df9f3b9f4;hp=bc89f17bf07fc2aca8e8004b769c52eaafb117b8;p=nonrtric.git Merge "Refactor datamodel" --- diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/BeanFactory.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/BeanFactory.java index c5d2bec7..785ddfc7 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/BeanFactory.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/BeanFactory.java @@ -26,6 +26,7 @@ import java.lang.invoke.MethodHandles; import org.apache.catalina.connector.Connector; import org.oransc.enrichment.configuration.ApplicationConfig; +import org.oransc.enrichment.controllers.producer.ProducerCallbacks; import org.oransc.enrichment.repository.EiJobs; import org.oransc.enrichment.repository.EiProducers; import org.oransc.enrichment.repository.EiTypes; @@ -46,6 +47,11 @@ class BeanFactory { private final ApplicationConfig applicationConfig = new ApplicationConfig(); private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); + private ProducerCallbacks producerCallbacks; + private EiTypes eiTypes; + private EiJobs eiJobs; + private EiProducers eiProducers; + @Bean public ObjectMapper mapper() { return new ObjectMapper(); @@ -62,23 +68,36 @@ class BeanFactory { @Bean public EiJobs eiJobs() { - EiJobs jobs = new EiJobs(getApplicationConfig()); - try { - jobs.restoreJobsFromDatabase(); - } catch (Exception e) { - logger.error("Could not restore jobs from database: {}", e.getMessage()); + if (eiJobs == null) { + eiJobs = new EiJobs(getApplicationConfig(), producerCallbacks()); + try { + eiJobs.restoreJobsFromDatabase(); + } catch (Exception e) { + logger.error("Could not restore jobs from database: {}", e.getMessage()); + } } - return jobs; + return eiJobs; } @Bean public EiTypes eiTypes() { - return new EiTypes(); + if (this.eiTypes == null) { + eiTypes = new EiTypes(getApplicationConfig()); + try { + eiTypes.restoreTypesFromDatabase(); + } catch (Exception e) { + logger.error("Could not restore EI types from database: {}", e.getMessage()); + } + } + return eiTypes; } @Bean - public EiProducers eiProducers() { - return new EiProducers(); + public ProducerCallbacks producerCallbacks() { + if (this.producerCallbacks == null) { + producerCallbacks = new ProducerCallbacks(getApplicationConfig()); + } + return this.producerCallbacks; } @Bean diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/configuration/ApplicationConfig.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/configuration/ApplicationConfig.java index fce9e224..db4201b9 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/configuration/ApplicationConfig.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/configuration/ApplicationConfig.java @@ -62,21 +62,26 @@ public class ApplicationConfig { @Value("${app.webclient.http.proxy-port:0}") private int httpProxyPort = 0; + private WebClientConfig webClientConfig = null; + public WebClientConfig getWebClientConfig() { - HttpProxyConfig httpProxyConfig = ImmutableHttpProxyConfig.builder() // - .httpProxyHost(this.httpProxyHost) // - .httpProxyPort(this.httpProxyPort) // - .build(); - return ImmutableWebClientConfig.builder() // - .keyStoreType(this.sslKeyStoreType) // - .keyStorePassword(this.sslKeyStorePassword) // - .keyStore(this.sslKeyStore) // - .keyPassword(this.sslKeyPassword) // - .isTrustStoreUsed(this.sslTrustStoreUsed) // - .trustStore(this.sslTrustStore) // - .trustStorePassword(this.sslTrustStorePassword) // - .httpProxyConfig(httpProxyConfig) // - .build(); + if (this.webClientConfig == null) { + HttpProxyConfig httpProxyConfig = ImmutableHttpProxyConfig.builder() // + .httpProxyHost(this.httpProxyHost) // + .httpProxyPort(this.httpProxyPort) // + .build(); + this.webClientConfig = ImmutableWebClientConfig.builder() // + .keyStoreType(this.sslKeyStoreType) // + .keyStorePassword(this.sslKeyStorePassword) // + .keyStore(this.sslKeyStore) // + .keyPassword(this.sslKeyPassword) // + .isTrustStoreUsed(this.sslTrustStoreUsed) // + .trustStore(this.sslTrustStore) // + .trustStorePassword(this.sslTrustStorePassword) // + .httpProxyConfig(httpProxyConfig) // + .build(); + } + return this.webClientConfig; } } diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerCallbacks.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerCallbacks.java index c222cfab..eb85d376 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerCallbacks.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerCallbacks.java @@ -31,8 +31,8 @@ import org.oransc.enrichment.configuration.ApplicationConfig; import org.oransc.enrichment.repository.EiJob; import org.oransc.enrichment.repository.EiJobs; import org.oransc.enrichment.repository.EiProducer; +import org.oransc.enrichment.repository.EiProducers; import org.oransc.enrichment.repository.EiType; -import org.oransc.enrichment.repository.EiTypes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -49,21 +49,21 @@ public class ConsumerCallbacks { private static Gson gson = new GsonBuilder().create(); private final AsyncRestClient restClient; - private final EiTypes eiTypes; private final EiJobs eiJobs; + private final EiProducers eiProducers; @Autowired - public ConsumerCallbacks(ApplicationConfig config, EiTypes eiTypes, EiJobs eiJobs) { + public ConsumerCallbacks(ApplicationConfig config, EiJobs eiJobs, EiProducers eiProducers) { AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config.getWebClientConfig()); this.restClient = restClientFactory.createRestClientUseHttpProxy(""); - this.eiTypes = eiTypes; this.eiJobs = eiJobs; + this.eiProducers = eiProducers; } public void notifyConsumersProducerDeleted(EiProducer eiProducer) { for (EiType type : eiProducer.getEiTypes()) { - if (this.eiTypes.get(type.getId()) == null) { - // The type is removed + if (this.eiProducers.getProducersForType(type).isEmpty()) { + // No producers left for the type for (EiJob job : this.eiJobs.getJobsForType(type)) { if (job.isLastStatusReportedEnabled()) { noifyJobOwner(job, new ConsumerEiJobStatus(ConsumerEiJobStatus.EiJobStatusValues.DISABLED)); diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerController.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerController.java index f45ff736..6e9438d3 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerController.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/consumer/ConsumerController.java @@ -33,7 +33,6 @@ import io.swagger.annotations.ApiResponses; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.Vector; import org.everit.json.schema.Schema; import org.everit.json.schema.loader.SchemaLoader; @@ -46,6 +45,7 @@ import org.oransc.enrichment.exceptions.ServiceException; import org.oransc.enrichment.repository.EiJob; import org.oransc.enrichment.repository.EiJobs; import org.oransc.enrichment.repository.EiProducer; +import org.oransc.enrichment.repository.EiProducers; import org.oransc.enrichment.repository.EiType; import org.oransc.enrichment.repository.EiTypes; import org.springframework.beans.factory.annotation.Autowired; @@ -77,6 +77,9 @@ public class ConsumerController { @Autowired private EiTypes eiTypes; + @Autowired + private EiProducers eiProducers; + @Autowired ProducerCallbacks producerCallbacks; @@ -206,11 +209,7 @@ public class ConsumerController { } private Collection getProducers(EiJob eiJob) { - try { - return this.eiTypes.getType(eiJob.getTypeId()).getProducers(); - } catch (Exception e) { - return new Vector<>(); - } + return this.eiProducers.getProducersForType(eiJob.getTypeId()); } private ConsumerEiJobStatus toEiJobStatus(EiJob job) { @@ -235,8 +234,7 @@ public class ConsumerController { @PathVariable("eiJobId") String eiJobId) { try { EiJob job = this.eiJobs.getJob(eiJobId); - this.eiJobs.remove(job); - this.producerCallbacks.notifyProducersJobDeleted(job); + this.eiJobs.remove(job, this.eiProducers); return new ResponseEntity<>(HttpStatus.NO_CONTENT); } catch (Exception e) { return ErrorResponse.create(e, HttpStatus.NOT_FOUND); @@ -263,14 +261,14 @@ public class ConsumerController { final boolean isNewJob = this.eiJobs.get(eiJobId) == null; return validatePutEiJob(eiJobId, eiJobObject) // - .flatMap(this::notifyProducersNewJob) // + .flatMap(this::startEiJob) // .doOnNext(newEiJob -> this.eiJobs.put(newEiJob)) // .flatMap(newEiJob -> Mono.just(new ResponseEntity<>(isNewJob ? HttpStatus.CREATED : HttpStatus.OK))) .onErrorResume(throwable -> Mono.just(ErrorResponse.create(throwable, HttpStatus.NOT_FOUND))); } - private Mono notifyProducersNewJob(EiJob newEiJob) { - return this.producerCallbacks.notifyProducersJobStarted(newEiJob) // + private Mono startEiJob(EiJob newEiJob) { + return this.producerCallbacks.startEiJob(newEiJob, eiProducers) // .flatMap(noOfAcceptingProducers -> { if (noOfAcceptingProducers.intValue() > 0) { return Mono.just(newEiJob); diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerCallbacks.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerCallbacks.java index 00d9c149..45b44754 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerCallbacks.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerCallbacks.java @@ -26,7 +26,6 @@ import com.google.gson.GsonBuilder; import java.lang.invoke.MethodHandles; import java.time.Duration; import java.util.Collection; -import java.util.Vector; import org.oransc.enrichment.clients.AsyncRestClient; import org.oransc.enrichment.clients.AsyncRestClientFactory; @@ -34,11 +33,9 @@ import org.oransc.enrichment.configuration.ApplicationConfig; import org.oransc.enrichment.repository.EiJob; import org.oransc.enrichment.repository.EiJobs; import org.oransc.enrichment.repository.EiProducer; -import org.oransc.enrichment.repository.EiTypes; +import org.oransc.enrichment.repository.EiProducers; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -47,7 +44,6 @@ import reactor.util.retry.Retry; /** * Callbacks to the EiProducer */ -@Component @SuppressWarnings("java:S3457") // No need to call "toString()" method as formatting and string .. public class ProducerCallbacks { @@ -55,17 +51,14 @@ public class ProducerCallbacks { private static Gson gson = new GsonBuilder().create(); private final AsyncRestClient restClient; - private final EiTypes eiTypes; - @Autowired - public ProducerCallbacks(ApplicationConfig config, EiTypes eiTypes) { + public ProducerCallbacks(ApplicationConfig config) { AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config.getWebClientConfig()); this.restClient = restClientFactory.createRestClientNoHttpProxy(""); - this.eiTypes = eiTypes; } - public void notifyProducersJobDeleted(EiJob eiJob) { - for (EiProducer producer : getProducers(eiJob)) { + public void stopEiJob(EiJob eiJob, EiProducers eiProducers) { + for (EiProducer producer : getProducersForJob(eiJob, eiProducers)) { String url = producer.getJobCallbackUrl() + "/" + eiJob.getId(); restClient.delete(url) // .subscribe(notUsed -> logger.debug("Producer job deleted OK {}", producer.getId()), // @@ -81,10 +74,10 @@ public class ProducerCallbacks { * @param eiJob an EI job * @return the number of producers that returned OK */ - public Mono notifyProducersJobStarted(EiJob eiJob) { + public Mono startEiJob(EiJob eiJob, EiProducers eiProducers) { Retry retrySpec = Retry.fixedDelay(1, Duration.ofSeconds(1)); - return Flux.fromIterable(getProducers(eiJob)) // - .flatMap(eiProducer -> notifyProducerJobStarted(eiProducer, eiJob, retrySpec)) // + return Flux.fromIterable(getProducersForJob(eiJob, eiProducers)) // + .flatMap(eiProducer -> postStartEiJob(eiProducer, eiJob, retrySpec)) // .collectList() // .flatMap(okResponses -> Mono.just(Integer.valueOf(okResponses.size()))); // } @@ -95,13 +88,13 @@ public class ProducerCallbacks { * @param producer * @param eiJobs */ - public void restartJobs(EiProducer producer, EiJobs eiJobs) { + public void restartEiJobs(EiProducer producer, EiJobs eiJobs) { final int maxNoOfParalellRequests = 10; Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1)); Flux.fromIterable(producer.getEiTypes()) // .flatMap(type -> Flux.fromIterable(eiJobs.getJobsForType(type))) // - .flatMap(job -> notifyProducerJobStarted(producer, job, retrySpec), maxNoOfParalellRequests) // + .flatMap(job -> postStartEiJob(producer, job, retrySpec), maxNoOfParalellRequests) // .onErrorResume(t -> { logger.error("Could not restart EI Job for producer: {}, reason :{}", producer.getId(), t.getMessage()); return Flux.empty(); @@ -109,7 +102,7 @@ public class ProducerCallbacks { .subscribe(); } - private Mono notifyProducerJobStarted(EiProducer producer, EiJob eiJob, Retry retrySpec) { + private Mono postStartEiJob(EiProducer producer, EiJob eiJob, Retry retrySpec) { ProducerJobInfo request = new ProducerJobInfo(eiJob); String body = gson.toJson(request); @@ -122,12 +115,8 @@ public class ProducerCallbacks { }); } - private Collection getProducers(EiJob eiJob) { - try { - return this.eiTypes.getType(eiJob.getTypeId()).getProducers(); - } catch (Exception e) { - return new Vector<>(); - } + private Collection getProducersForJob(EiJob eiJob, EiProducers eiProducers) { + return eiProducers.getProducersForType(eiJob.getTypeId()); } } diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerController.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerController.java index e517b3a9..e773117a 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerController.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/controllers/producer/ProducerController.java @@ -35,7 +35,6 @@ import java.util.List; import org.oransc.enrichment.controllers.ErrorResponse; import org.oransc.enrichment.controllers.VoidResponse; -import org.oransc.enrichment.controllers.consumer.ConsumerCallbacks; import org.oransc.enrichment.controllers.producer.ProducerRegistrationInfo.ProducerEiTypeRegistrationInfo; import org.oransc.enrichment.repository.EiJob; import org.oransc.enrichment.repository.EiJobs; @@ -43,6 +42,8 @@ import org.oransc.enrichment.repository.EiProducer; import org.oransc.enrichment.repository.EiProducers; import org.oransc.enrichment.repository.EiType; import org.oransc.enrichment.repository.EiTypes; +import org.oransc.enrichment.repository.ImmutableEiProducerRegistrationInfo; +import org.oransc.enrichment.repository.ImmutableEiTypeRegistrationInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -74,12 +75,6 @@ public class ProducerController { @Autowired private EiProducers eiProducers; - @Autowired - ProducerCallbacks producerCallbacks; - - @Autowired - ConsumerCallbacks consumerCallbacks; - @GetMapping(path = ProducerConsts.API_ROOT + "/eitypes", produces = MediaType.APPLICATION_JSON_VALUE) @ApiOperation(value = "EI type identifiers", notes = "") @ApiResponses( @@ -233,33 +228,13 @@ public class ProducerController { @RequestBody ProducerRegistrationInfo registrationInfo) { try { EiProducer previousDefinition = this.eiProducers.get(eiProducerId); - if (previousDefinition != null) { - for (EiType type : previousDefinition.getEiTypes()) { - type.removeProducer(previousDefinition); - } - } - - EiProducer producer = registerProducer(eiProducerId, registrationInfo); - if (previousDefinition != null) { - purgeTypes(previousDefinition.getEiTypes()); - this.consumerCallbacks.notifyConsumersProducerDeleted(previousDefinition); - } - this.consumerCallbacks.notifyConsumersProducerAdded(producer); - + this.eiProducers.registerProducer(toEiProducerRegistrationInfo(eiProducerId, registrationInfo)); return new ResponseEntity<>(previousDefinition == null ? HttpStatus.CREATED : HttpStatus.OK); } catch (Exception e) { return ErrorResponse.create(e, HttpStatus.NOT_FOUND); } } - private void purgeTypes(Collection types) { - for (EiType type : types) { - if (type.getProducerIds().isEmpty()) { - this.eiTypes.remove(type); - } - } - } - @DeleteMapping( path = ProducerConsts.API_ROOT + "/eiproducers/{eiProducerId}", produces = MediaType.APPLICATION_JSON_VALUE) @@ -272,45 +247,14 @@ public class ProducerController { public ResponseEntity deleteEiProducer(@PathVariable("eiProducerId") String eiProducerId) { try { final EiProducer producer = this.eiProducers.getProducer(eiProducerId); - this.eiProducers.deregisterProducer(producer, this.eiTypes, this.eiJobs); - this.consumerCallbacks.notifyConsumersProducerDeleted(producer); + this.eiProducers.deregisterProducer(producer, this.eiTypes); return new ResponseEntity<>(HttpStatus.NO_CONTENT); } catch (Exception e) { return ErrorResponse.create(e, HttpStatus.NOT_FOUND); } } - private EiType registerType(ProducerEiTypeRegistrationInfo typeInfo) { - EiType type = this.eiTypes.get(typeInfo.eiTypeId); - if (type == null) { - type = new EiType(typeInfo.eiTypeId, typeInfo.jobDataSchema); - this.eiTypes.put(type); - this.consumerCallbacks.notifyConsumersTypeAdded(type); - } - return type; - } - - EiProducer createProducer(Collection types, String producerId, ProducerRegistrationInfo registrationInfo) { - return new EiProducer(producerId, types, registrationInfo.jobCallbackUrl, - registrationInfo.producerSupervisionCallbackUrl); - } - - private EiProducer registerProducer(String producerId, ProducerRegistrationInfo registrationInfo) { - ArrayList typesForProducer = new ArrayList<>(); - EiProducer producer = createProducer(typesForProducer, producerId, registrationInfo); - for (ProducerEiTypeRegistrationInfo typeInfo : registrationInfo.types) { - EiType type = registerType(typeInfo); - typesForProducer.add(type); - type.addProducer(producer); // - } - this.eiProducers.put(producer); - - producerCallbacks.restartJobs(producer, this.eiJobs); - - return producer; - } - - ProducerRegistrationInfo toEiProducerRegistrationInfo(EiProducer p) { + private ProducerRegistrationInfo toEiProducerRegistrationInfo(EiProducer p) { Collection types = new ArrayList<>(); for (EiType type : p.getEiTypes()) { types.add(toEiTypeRegistrationInfo(type)); @@ -323,6 +267,26 @@ public class ProducerController { } private ProducerEiTypeInfo toEiTypeInfo(EiType t) { - return new ProducerEiTypeInfo(t.getJobDataSchema(), t.getProducerIds()); + Collection producerIds = this.eiProducers.getProducerIdsForType(t.getId()); + return new ProducerEiTypeInfo(t.getJobDataSchema(), producerIds); + } + + private EiProducers.EiProducerRegistrationInfo toEiProducerRegistrationInfo(String eiProducerId, + ProducerRegistrationInfo info) { + Collection supportedTypes = new ArrayList<>(); + for (ProducerEiTypeRegistrationInfo typeInfo : info.types) { + EiProducers.EiTypeRegistrationInfo i = ImmutableEiTypeRegistrationInfo.builder() // + .id(typeInfo.eiTypeId) // + .jobDataSchema(typeInfo.jobDataSchema) // + .build(); + supportedTypes.add(i); + } + return ImmutableEiProducerRegistrationInfo.builder() // + .id(eiProducerId) // + .jobCallbackUrl(info.jobCallbackUrl) // + .producerSupervisionCallbackUrl(info.producerSupervisionCallbackUrl) // + .supportedTypes(supportedTypes) // + .build(); } + } diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiJobs.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiJobs.java index bff5be2c..f5224f25 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiJobs.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiJobs.java @@ -39,6 +39,7 @@ import java.util.ServiceLoader; import java.util.Vector; import org.oransc.enrichment.configuration.ApplicationConfig; +import org.oransc.enrichment.controllers.producer.ProducerCallbacks; import org.oransc.enrichment.exceptions.ServiceException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,11 +58,14 @@ public class EiJobs { private final ApplicationConfig config; private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); - public EiJobs(ApplicationConfig config) { + private final ProducerCallbacks producerCallbacks; + + public EiJobs(ApplicationConfig config, ProducerCallbacks producerCallbacks) { this.config = config; GsonBuilder gsonBuilder = new GsonBuilder(); ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); this.gson = gsonBuilder.create(); + this.producerCallbacks = producerCallbacks; } public synchronized void restoreJobsFromDatabase() throws IOException { @@ -71,13 +75,13 @@ public class EiJobs { for (File file : dbDir.listFiles()) { String json = Files.readString(file.toPath()); EiJob job = gson.fromJson(json, EiJob.class); - this.put(job, false); + this.doPut(job); } - } public synchronized void put(EiJob job) { - this.put(job, true); + this.doPut(job); + storeJobInFile(job); } public synchronized Collection getJobs() { @@ -108,15 +112,15 @@ public class EiJobs { return allEiJobs.get(id); } - public synchronized EiJob remove(String id) { + public synchronized EiJob remove(String id, EiProducers eiProducers) { EiJob job = allEiJobs.get(id); if (job != null) { - remove(job); + remove(job, eiProducers); } return job; } - public synchronized void remove(EiJob job) { + public synchronized void remove(EiJob job, EiProducers eiProducers) { this.allEiJobs.remove(job.getId()); jobsByType.remove(job.getTypeId(), job.getId()); jobsByOwner.remove(job.getOwner(), job.getId()); @@ -126,7 +130,7 @@ public class EiJobs { } catch (IOException e) { logger.warn("Could not remove file: {}", e.getMessage()); } - + this.producerCallbacks.stopEiJob(job, eiProducers); } public synchronized int size() { @@ -137,6 +141,10 @@ public class EiJobs { this.allEiJobs.clear(); this.jobsByType.clear(); jobsByOwner.clear(); + clearDatabase(); + } + + private void clearDatabase() { try { FileSystemUtils.deleteRecursively(Path.of(getDatabaseDirectory())); Files.createDirectories(Paths.get(getDatabaseDirectory())); @@ -145,13 +153,10 @@ public class EiJobs { } } - private void put(EiJob job, boolean storePersistently) { + private void doPut(EiJob job) { allEiJobs.put(job.getId(), job); jobsByType.put(job.getTypeId(), job.getId(), job); jobsByOwner.put(job.getOwner(), job.getId(), job); - if (storePersistently) { - storeJobInFile(job); - } } private void storeJobInFile(EiJob job) { @@ -173,7 +178,7 @@ public class EiJobs { } private String getDatabaseDirectory() { - return config.getVardataDirectory() + "/database"; + return config.getVardataDirectory() + "/eijobs"; } } diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiProducers.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiProducers.java index 801e7fcc..fcc91568 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiProducers.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiProducers.java @@ -21,25 +21,117 @@ package org.oransc.enrichment.repository; import java.lang.invoke.MethodHandles; +import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Vector; +import org.immutables.value.Value.Immutable; +import org.oransc.enrichment.controllers.consumer.ConsumerCallbacks; +import org.oransc.enrichment.controllers.producer.ProducerCallbacks; import org.oransc.enrichment.exceptions.ServiceException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; /** * Dynamic representation of all EiProducers. */ @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally +@Component public class EiProducers { private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final Map allEiProducers = new HashMap<>(); + private final MultiMap producersByType = new MultiMap<>(); - public synchronized void put(EiProducer producer) { + @Autowired + private ProducerCallbacks producerCallbacks; + + @Autowired + private ConsumerCallbacks consumerCallbacks; + + @Autowired + private EiTypes eiTypes; + + @Autowired + private EiJobs eiJobs; + + @Immutable + public interface EiTypeRegistrationInfo { + String id(); + + Object jobDataSchema(); + } + + @Immutable + public interface EiProducerRegistrationInfo { + String id(); + + Collection supportedTypes(); + + String jobCallbackUrl(); + + String producerSupervisionCallbackUrl(); + + } + + public EiProducer registerProducer(EiProducerRegistrationInfo producerInfo) { + final String producerId = producerInfo.id(); + EiProducer previousDefinition = this.get(producerId); + if (previousDefinition != null) { + for (EiType type : previousDefinition.getEiTypes()) { + producersByType.remove(type.getId(), producerId); + } + allEiProducers.remove(producerId); + } + + EiProducer producer = createProducer(producerInfo); allEiProducers.put(producer.getId(), producer); + for (EiType type : producer.getEiTypes()) { + producersByType.put(type.getId(), producer.getId(), producer); + } + + if (previousDefinition != null) { + purgeTypes(previousDefinition.getEiTypes()); + this.consumerCallbacks.notifyConsumersProducerDeleted(previousDefinition); + } + + producerCallbacks.restartEiJobs(producer, this.eiJobs); + consumerCallbacks.notifyConsumersProducerAdded(producer); + return producer; + } + + private void purgeTypes(Collection types) { + for (EiType type : types) { + if (getProducersForType(type.getId()).isEmpty()) { + this.eiTypes.remove(type); + } + } + } + + private EiType getType(EiTypeRegistrationInfo typeInfo) { + EiType type = this.eiTypes.get(typeInfo.id()); + if (type == null) { + type = new EiType(typeInfo.id(), typeInfo.jobDataSchema()); + this.eiTypes.put(type); + this.consumerCallbacks.notifyConsumersTypeAdded(type); + } + return type; + } + + private EiProducer createProducer(EiProducerRegistrationInfo producerInfo) { + ArrayList types = new ArrayList<>(); + + EiProducer producer = new EiProducer(producerInfo.id(), types, producerInfo.jobCallbackUrl(), + producerInfo.producerSupervisionCallbackUrl()); + + for (EiTypeRegistrationInfo typeInfo : producerInfo.supportedTypes()) { + EiType type = getType(typeInfo); + types.add(type); + } + return producer; } public synchronized Collection getAllProducers() { @@ -58,33 +150,42 @@ public class EiProducers { return allEiProducers.get(id); } - public synchronized void remove(String id) { - this.allEiProducers.remove(id); - } - public synchronized int size() { return allEiProducers.size(); } public synchronized void clear() { this.allEiProducers.clear(); + this.producersByType.clear(); } - public void deregisterProducer(EiProducer producer, EiTypes eiTypes, EiJobs eiJobs) { - this.remove(producer); + public void deregisterProducer(EiProducer producer, EiTypes eiTypes) { + allEiProducers.remove(producer.getId()); for (EiType type : producer.getEiTypes()) { - boolean removed = type.removeProducer(producer) != null; - if (!removed) { + if (producersByType.remove(type.getId(), producer.getId()) == null) { this.logger.error("Bug, no producer found"); } - if (type.getProducerIds().isEmpty()) { + if (this.producersByType.get(type.getId()).isEmpty()) { eiTypes.remove(type); } } + this.consumerCallbacks.notifyConsumersProducerDeleted(producer); } - private synchronized void remove(EiProducer producer) { - this.allEiProducers.remove(producer.getId()); + public synchronized Collection getProducersForType(EiType type) { + return this.producersByType.get(type.getId()); + } + + public synchronized Collection getProducersForType(String typeId) { + return this.producersByType.get(typeId); + } + + public synchronized Collection getProducerIdsForType(String typeId) { + Collection producerIds = new ArrayList<>(); + for (EiProducer p : this.getProducersForType(typeId)) { + producerIds.add(p.getId()); + } + return producerIds; } } diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiType.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiType.java index a354198a..5d9057a6 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiType.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiType.java @@ -20,11 +20,6 @@ package org.oransc.enrichment.repository; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; - import lombok.Getter; public class EiType { @@ -34,26 +29,9 @@ public class EiType { @Getter private final Object jobDataSchema; - private final Map producers = new HashMap<>(); - public EiType(String id, Object jobDataSchema) { this.id = id; this.jobDataSchema = jobDataSchema; } - public synchronized Collection getProducers() { - return Collections.unmodifiableCollection(producers.values()); - } - - public synchronized Collection getProducerIds() { - return Collections.unmodifiableCollection(producers.keySet()); - } - - public synchronized void addProducer(EiProducer producer) { - this.producers.put(producer.getId(), producer); - } - - public synchronized EiProducer removeProducer(EiProducer producer) { - return this.producers.remove(producer.getId()); - } } diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiTypes.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiTypes.java index d0bf53a5..265a8e41 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiTypes.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/EiTypes.java @@ -20,15 +20,29 @@ package org.oransc.enrichment.repository; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.TypeAdapterFactory; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.PrintStream; import java.lang.invoke.MethodHandles; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.ServiceLoader; import java.util.Vector; +import org.oransc.enrichment.configuration.ApplicationConfig; import org.oransc.enrichment.exceptions.ServiceException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.util.FileSystemUtils; /** * Dynamic representation of all EI types in the system. @@ -37,9 +51,30 @@ import org.slf4j.LoggerFactory; public class EiTypes { private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass()); private final Map allEiTypes = new HashMap<>(); + private final ApplicationConfig config; + private final Gson gson; + + public EiTypes(ApplicationConfig config) { + this.config = config; + GsonBuilder gsonBuilder = new GsonBuilder(); + ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory); + this.gson = gsonBuilder.create(); + } + + public synchronized void restoreTypesFromDatabase() throws IOException { + Files.createDirectories(Paths.get(getDatabaseDirectory())); + File dbDir = new File(getDatabaseDirectory()); + + for (File file : dbDir.listFiles()) { + String json = Files.readString(file.toPath()); + EiType type = gson.fromJson(json, EiType.class); + allEiTypes.put(type.getId(), type); + } + } public synchronized void put(EiType type) { allEiTypes.put(type.getId(), type); + storeInFile(type); } public synchronized Collection getAllEiTypes() { @@ -58,12 +93,13 @@ public class EiTypes { return allEiTypes.get(id); } - public synchronized void remove(String id) { - allEiTypes.remove(id); - } - public synchronized void remove(EiType type) { - this.remove(type.getId()); + allEiTypes.remove(type.getId()); + try { + Files.delete(getPath(type)); + } catch (IOException e) { + logger.warn("Could not remove file: {} {}", type.getId(), e.getMessage()); + } } public synchronized int size() { @@ -72,5 +108,41 @@ public class EiTypes { public synchronized void clear() { this.allEiTypes.clear(); + clearDatabase(); + } + + private void clearDatabase() { + try { + FileSystemUtils.deleteRecursively(Path.of(getDatabaseDirectory())); + Files.createDirectories(Paths.get(getDatabaseDirectory())); + } catch (IOException e) { + logger.warn("Could not delete database : {}", e.getMessage()); + } + } + + private void storeInFile(EiType type) { + try { + try (PrintStream out = new PrintStream(new FileOutputStream(getFile(type)))) { + out.print(gson.toJson(type)); + } + } catch (Exception e) { + logger.warn("Could not save job: {} {}", type.getId(), e.getMessage()); + } + } + + private File getFile(EiType type) { + return getPath(type).toFile(); + } + + private Path getPath(EiType type) { + return getPath(type.getId()); + } + + private Path getPath(String typeId) { + return Path.of(getDatabaseDirectory(), typeId); + } + + private String getDatabaseDirectory() { + return config.getVardataDirectory() + "/eitypes"; } } diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/MultiMap.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/MultiMap.java index c2b82704..25e559ce 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/MultiMap.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/repository/MultiMap.java @@ -38,14 +38,16 @@ public class MultiMap { this.map.computeIfAbsent(key, k -> new HashMap<>()).put(id, value); } - public void remove(String key, String id) { + public T remove(String key, String id) { Map innerMap = this.map.get(key); if (innerMap != null) { - innerMap.remove(id); + T removedElement = innerMap.remove(id); if (innerMap.isEmpty()) { this.map.remove(key); } + return removedElement; } + return null; } public Collection get(String key) { diff --git a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java index b4c21d46..d8ee9707 100644 --- a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java +++ b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java @@ -89,7 +89,7 @@ public class ProducerSupervision { logger.warn("Unresponsive producer: {} exception: {}", producer.getId(), throwable.getMessage()); producer.setAliveStatus(false); if (producer.isDead()) { - this.eiProducers.deregisterProducer(producer, this.eiTypes, this.eiJobs); + this.eiProducers.deregisterProducer(producer, this.eiTypes); this.consumerCallbacks.notifyConsumersProducerDeleted(producer); } } diff --git a/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/ApplicationTest.java b/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/ApplicationTest.java index b62a9653..9f3dd496 100644 --- a/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/ApplicationTest.java +++ b/enrichment-coordinator-service/src/test/java/org/oransc/enrichment/ApplicationTest.java @@ -55,6 +55,7 @@ import org.oransc.enrichment.controllers.consumer.ConsumerConsts; import org.oransc.enrichment.controllers.consumer.ConsumerEiJobInfo; import org.oransc.enrichment.controllers.consumer.ConsumerEiJobStatus; import org.oransc.enrichment.controllers.consumer.ConsumerEiTypeInfo; +import org.oransc.enrichment.controllers.producer.ProducerCallbacks; import org.oransc.enrichment.controllers.producer.ProducerConsts; import org.oransc.enrichment.controllers.producer.ProducerJobInfo; import org.oransc.enrichment.controllers.producer.ProducerRegistrationInfo; @@ -127,6 +128,9 @@ class ApplicationTest { @Autowired ProducerSupervision producerSupervision; + @Autowired + ProducerCallbacks producerCallbacks; + private static Gson gson = new GsonBuilder().create(); /** @@ -382,7 +386,7 @@ class ApplicationTest { assertThat(this.eiTypes.size()).isEqualTo(1); EiType type = this.eiTypes.getType(EI_TYPE_ID); - assertThat(type.getProducerIds()).contains("eiProducerId"); + assertThat(this.eiProducers.getProducersForType(EI_TYPE_ID).size()).isEqualTo(1); assertThat(this.eiProducers.size()).isEqualTo(1); assertThat(this.eiProducers.get("eiProducerId").getEiTypes().iterator().next().getId()).isEqualTo(EI_TYPE_ID); @@ -453,14 +457,14 @@ class ApplicationTest { assertThat(this.eiProducers.size()).isEqualTo(2); EiType type = this.eiTypes.getType(EI_TYPE_ID); - assertThat(type.getProducerIds()).contains("eiProducerId"); - assertThat(type.getProducerIds()).contains("eiProducerId2"); + assertThat(this.eiProducers.getProducerIdsForType(type.getId())).contains("eiProducerId"); + assertThat(this.eiProducers.getProducerIdsForType(type.getId())).contains("eiProducerId2"); putEiJob(EI_TYPE_ID, "jobId"); assertThat(this.eiJobs.size()).isEqualTo(1); deleteEiProducer("eiProducerId"); assertThat(this.eiProducers.size()).isEqualTo(1); - assertThat(this.eiTypes.getType(EI_TYPE_ID).getProducerIds()).doesNotContain("eiProducerId"); + assertThat(this.eiProducers.getProducerIdsForType(EI_TYPE_ID)).doesNotContain("eiProducerId"); verifyJobStatus("jobId", "ENABLED"); deleteEiProducer("eiProducerId2"); @@ -555,8 +559,8 @@ class ApplicationTest { // After 3 failed checks, the producer and the type shall be deregisterred this.producerSupervision.createTask().blockLast(); - assertThat(this.eiProducers.size()).isEqualTo(0); - assertThat(this.eiTypes.size()).isEqualTo(0); + assertThat(this.eiProducers.size()).isEqualTo(0); // The producer is removed + assertThat(this.eiTypes.size()).isEqualTo(0); // The type is removed verifyJobStatus(EI_JOB_ID, "DISABLED"); // Job disabled status notification shall be received @@ -585,21 +589,49 @@ class ApplicationTest { { // Restore the jobs - EiJobs jobs = new EiJobs(this.applicationConfig); + EiJobs jobs = new EiJobs(this.applicationConfig, this.producerCallbacks); jobs.restoreJobsFromDatabase(); assertThat(jobs.size()).isEqualTo(2); - jobs.remove("jobId1"); - jobs.remove("jobId2"); + jobs.remove("jobId1", this.eiProducers); + jobs.remove("jobId2", this.eiProducers); } { // Restore the jobs, no jobs in database - EiJobs jobs = new EiJobs(this.applicationConfig); + EiJobs jobs = new EiJobs(this.applicationConfig, this.producerCallbacks); jobs.restoreJobsFromDatabase(); assertThat(jobs.size()).isEqualTo(0); } logger.warn("Test removing a job when the db file is gone"); - this.eiJobs.remove("jobId1"); + this.eiJobs.remove("jobId1", this.eiProducers); assertThat(this.eiJobs.size()).isEqualTo(1); + + ProducerSimulatorController.TestResults simulatorResults = this.producerSimulator.getTestResults(); + await().untilAsserted(() -> assertThat(simulatorResults.jobsStopped.size()).isEqualTo(3)); + } + + @Test + void testEiTypesDatabase() throws Exception { + putEiProducerWithOneType(EI_PRODUCER_ID, EI_TYPE_ID); + + assertThat(this.eiTypes.size()).isEqualTo(1); + + { + // Restore the types + EiTypes types = new EiTypes(this.applicationConfig); + types.restoreTypesFromDatabase(); + assertThat(types.size()).isEqualTo(1); + + } + { + // Restore the jobs, no jobs in database + EiTypes types = new EiTypes(this.applicationConfig); + types.clear(); + types.restoreTypesFromDatabase(); + assertThat(types.size()).isEqualTo(0); + } + logger.warn("Test removing a job when the db file is gone"); + this.eiTypes.remove(this.eiTypes.getType(EI_TYPE_ID)); + assertThat(this.eiJobs.size()).isEqualTo(0); } private void deleteEiProducer(String eiProducerId) { @@ -706,6 +738,7 @@ class ApplicationTest { String body = gson.toJson(producerEiRegistratioInfo(eiTypeId)); restClient().putForEntity(url, body).block(); + return this.eiTypes.getType(eiTypeId); }