Refactor so that types can exist without producer.
A consumer can then create a job before any producer is available.
EiTypes are made persistent.
Change-Id: Id2cd0be5d2b6fe81ae6367d57fa3cc7c4c8c44c6
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-317
import org.apache.catalina.connector.Connector;
import org.oransc.enrichment.configuration.ApplicationConfig;
+import org.oransc.enrichment.controllers.producer.ProducerCallbacks;
import org.oransc.enrichment.repository.EiJobs;
import org.oransc.enrichment.repository.EiProducers;
import org.oransc.enrichment.repository.EiTypes;
private final ApplicationConfig applicationConfig = new ApplicationConfig();
private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+ private ProducerCallbacks producerCallbacks;
+ private EiTypes eiTypes;
+ private EiJobs eiJobs;
+ private EiProducers eiProducers;
+
@Bean
public ObjectMapper mapper() {
return new ObjectMapper();
@Bean
public EiJobs eiJobs() {
- EiJobs jobs = new EiJobs(getApplicationConfig());
- try {
- jobs.restoreJobsFromDatabase();
- } catch (Exception e) {
- logger.error("Could not restore jobs from database: {}", e.getMessage());
+ if (eiJobs == null) {
+ eiJobs = new EiJobs(getApplicationConfig(), producerCallbacks());
+ try {
+ eiJobs.restoreJobsFromDatabase();
+ } catch (Exception e) {
+ logger.error("Could not restore jobs from database: {}", e.getMessage());
+ }
}
- return jobs;
+ return eiJobs;
}
@Bean
public EiTypes eiTypes() {
- return new EiTypes();
+ if (this.eiTypes == null) {
+ eiTypes = new EiTypes(getApplicationConfig());
+ try {
+ eiTypes.restoreTypesFromDatabase();
+ } catch (Exception e) {
+ logger.error("Could not restore EI types from database: {}", e.getMessage());
+ }
+ }
+ return eiTypes;
}
@Bean
- public EiProducers eiProducers() {
- return new EiProducers();
+ public ProducerCallbacks producerCallbacks() {
+ if (this.producerCallbacks == null) {
+ producerCallbacks = new ProducerCallbacks(getApplicationConfig());
+ }
+ return this.producerCallbacks;
}
@Bean
@Value("${app.webclient.http.proxy-port:0}")
private int httpProxyPort = 0;
+ private WebClientConfig webClientConfig = null;
+
public WebClientConfig getWebClientConfig() {
- HttpProxyConfig httpProxyConfig = ImmutableHttpProxyConfig.builder() //
- .httpProxyHost(this.httpProxyHost) //
- .httpProxyPort(this.httpProxyPort) //
- .build();
- return ImmutableWebClientConfig.builder() //
- .keyStoreType(this.sslKeyStoreType) //
- .keyStorePassword(this.sslKeyStorePassword) //
- .keyStore(this.sslKeyStore) //
- .keyPassword(this.sslKeyPassword) //
- .isTrustStoreUsed(this.sslTrustStoreUsed) //
- .trustStore(this.sslTrustStore) //
- .trustStorePassword(this.sslTrustStorePassword) //
- .httpProxyConfig(httpProxyConfig) //
- .build();
+ if (this.webClientConfig == null) {
+ HttpProxyConfig httpProxyConfig = ImmutableHttpProxyConfig.builder() //
+ .httpProxyHost(this.httpProxyHost) //
+ .httpProxyPort(this.httpProxyPort) //
+ .build();
+ this.webClientConfig = ImmutableWebClientConfig.builder() //
+ .keyStoreType(this.sslKeyStoreType) //
+ .keyStorePassword(this.sslKeyStorePassword) //
+ .keyStore(this.sslKeyStore) //
+ .keyPassword(this.sslKeyPassword) //
+ .isTrustStoreUsed(this.sslTrustStoreUsed) //
+ .trustStore(this.sslTrustStore) //
+ .trustStorePassword(this.sslTrustStorePassword) //
+ .httpProxyConfig(httpProxyConfig) //
+ .build();
+ }
+ return this.webClientConfig;
}
}
import org.oransc.enrichment.repository.EiJob;
import org.oransc.enrichment.repository.EiJobs;
import org.oransc.enrichment.repository.EiProducer;
+import org.oransc.enrichment.repository.EiProducers;
import org.oransc.enrichment.repository.EiType;
-import org.oransc.enrichment.repository.EiTypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
private static Gson gson = new GsonBuilder().create();
private final AsyncRestClient restClient;
- private final EiTypes eiTypes;
private final EiJobs eiJobs;
+ private final EiProducers eiProducers;
@Autowired
- public ConsumerCallbacks(ApplicationConfig config, EiTypes eiTypes, EiJobs eiJobs) {
+ public ConsumerCallbacks(ApplicationConfig config, EiJobs eiJobs, EiProducers eiProducers) {
AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config.getWebClientConfig());
this.restClient = restClientFactory.createRestClientUseHttpProxy("");
- this.eiTypes = eiTypes;
this.eiJobs = eiJobs;
+ this.eiProducers = eiProducers;
}
public void notifyConsumersProducerDeleted(EiProducer eiProducer) {
for (EiType type : eiProducer.getEiTypes()) {
- if (this.eiTypes.get(type.getId()) == null) {
- // The type is removed
+ if (this.eiProducers.getProducersForType(type).isEmpty()) {
+ // No producers left for the type
for (EiJob job : this.eiJobs.getJobsForType(type)) {
if (job.isLastStatusReportedEnabled()) {
noifyJobOwner(job, new ConsumerEiJobStatus(ConsumerEiJobStatus.EiJobStatusValues.DISABLED));
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import java.util.Vector;
import org.everit.json.schema.Schema;
import org.everit.json.schema.loader.SchemaLoader;
import org.oransc.enrichment.repository.EiJob;
import org.oransc.enrichment.repository.EiJobs;
import org.oransc.enrichment.repository.EiProducer;
+import org.oransc.enrichment.repository.EiProducers;
import org.oransc.enrichment.repository.EiType;
import org.oransc.enrichment.repository.EiTypes;
import org.springframework.beans.factory.annotation.Autowired;
@Autowired
private EiTypes eiTypes;
+ @Autowired
+ private EiProducers eiProducers;
+
@Autowired
ProducerCallbacks producerCallbacks;
}
private Collection<EiProducer> getProducers(EiJob eiJob) {
- try {
- return this.eiTypes.getType(eiJob.getTypeId()).getProducers();
- } catch (Exception e) {
- return new Vector<>();
- }
+ return this.eiProducers.getProducersForType(eiJob.getTypeId());
}
private ConsumerEiJobStatus toEiJobStatus(EiJob job) {
@PathVariable("eiJobId") String eiJobId) {
try {
EiJob job = this.eiJobs.getJob(eiJobId);
- this.eiJobs.remove(job);
- this.producerCallbacks.notifyProducersJobDeleted(job);
+ this.eiJobs.remove(job, this.eiProducers);
return new ResponseEntity<>(HttpStatus.NO_CONTENT);
} catch (Exception e) {
return ErrorResponse.create(e, HttpStatus.NOT_FOUND);
final boolean isNewJob = this.eiJobs.get(eiJobId) == null;
return validatePutEiJob(eiJobId, eiJobObject) //
- .flatMap(this::notifyProducersNewJob) //
+ .flatMap(this::startEiJob) //
.doOnNext(newEiJob -> this.eiJobs.put(newEiJob)) //
.flatMap(newEiJob -> Mono.just(new ResponseEntity<>(isNewJob ? HttpStatus.CREATED : HttpStatus.OK)))
.onErrorResume(throwable -> Mono.just(ErrorResponse.create(throwable, HttpStatus.NOT_FOUND)));
}
- private Mono<EiJob> notifyProducersNewJob(EiJob newEiJob) {
- return this.producerCallbacks.notifyProducersJobStarted(newEiJob) //
+ private Mono<EiJob> startEiJob(EiJob newEiJob) {
+ return this.producerCallbacks.startEiJob(newEiJob, eiProducers) //
.flatMap(noOfAcceptingProducers -> {
if (noOfAcceptingProducers.intValue() > 0) {
return Mono.just(newEiJob);
import java.lang.invoke.MethodHandles;
import java.time.Duration;
import java.util.Collection;
-import java.util.Vector;
import org.oransc.enrichment.clients.AsyncRestClient;
import org.oransc.enrichment.clients.AsyncRestClientFactory;
import org.oransc.enrichment.repository.EiJob;
import org.oransc.enrichment.repository.EiJobs;
import org.oransc.enrichment.repository.EiProducer;
-import org.oransc.enrichment.repository.EiTypes;
+import org.oransc.enrichment.repository.EiProducers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
* Callbacks to the EiProducer
*/
-@Component
@SuppressWarnings("java:S3457") // No need to call "toString()" method as formatting and string ..
public class ProducerCallbacks {
private static Gson gson = new GsonBuilder().create();
private final AsyncRestClient restClient;
- private final EiTypes eiTypes;
- @Autowired
- public ProducerCallbacks(ApplicationConfig config, EiTypes eiTypes) {
+ public ProducerCallbacks(ApplicationConfig config) {
AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(config.getWebClientConfig());
this.restClient = restClientFactory.createRestClientNoHttpProxy("");
- this.eiTypes = eiTypes;
}
- public void notifyProducersJobDeleted(EiJob eiJob) {
- for (EiProducer producer : getProducers(eiJob)) {
+ public void stopEiJob(EiJob eiJob, EiProducers eiProducers) {
+ for (EiProducer producer : getProducersForJob(eiJob, eiProducers)) {
String url = producer.getJobCallbackUrl() + "/" + eiJob.getId();
restClient.delete(url) //
.subscribe(notUsed -> logger.debug("Producer job deleted OK {}", producer.getId()), //
* @param eiJob an EI job
* @return the number of producers that returned OK
*/
- public Mono<Integer> notifyProducersJobStarted(EiJob eiJob) {
+ public Mono<Integer> startEiJob(EiJob eiJob, EiProducers eiProducers) {
Retry retrySpec = Retry.fixedDelay(1, Duration.ofSeconds(1));
- return Flux.fromIterable(getProducers(eiJob)) //
- .flatMap(eiProducer -> notifyProducerJobStarted(eiProducer, eiJob, retrySpec)) //
+ return Flux.fromIterable(getProducersForJob(eiJob, eiProducers)) //
+ .flatMap(eiProducer -> postStartEiJob(eiProducer, eiJob, retrySpec)) //
.collectList() //
.flatMap(okResponses -> Mono.just(Integer.valueOf(okResponses.size()))); //
}
* @param producer
* @param eiJobs
*/
- public void restartJobs(EiProducer producer, EiJobs eiJobs) {
+ public void restartEiJobs(EiProducer producer, EiJobs eiJobs) {
final int maxNoOfParalellRequests = 10;
Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1));
Flux.fromIterable(producer.getEiTypes()) //
.flatMap(type -> Flux.fromIterable(eiJobs.getJobsForType(type))) //
- .flatMap(job -> notifyProducerJobStarted(producer, job, retrySpec), maxNoOfParalellRequests) //
+ .flatMap(job -> postStartEiJob(producer, job, retrySpec), maxNoOfParalellRequests) //
.onErrorResume(t -> {
logger.error("Could not restart EI Job for producer: {}, reason :{}", producer.getId(), t.getMessage());
return Flux.empty();
.subscribe();
}
- private Mono<String> notifyProducerJobStarted(EiProducer producer, EiJob eiJob, Retry retrySpec) {
+ private Mono<String> postStartEiJob(EiProducer producer, EiJob eiJob, Retry retrySpec) {
ProducerJobInfo request = new ProducerJobInfo(eiJob);
String body = gson.toJson(request);
});
}
- private Collection<EiProducer> getProducers(EiJob eiJob) {
- try {
- return this.eiTypes.getType(eiJob.getTypeId()).getProducers();
- } catch (Exception e) {
- return new Vector<>();
- }
+ private Collection<EiProducer> getProducersForJob(EiJob eiJob, EiProducers eiProducers) {
+ return eiProducers.getProducersForType(eiJob.getTypeId());
}
}
import org.oransc.enrichment.controllers.ErrorResponse;
import org.oransc.enrichment.controllers.VoidResponse;
-import org.oransc.enrichment.controllers.consumer.ConsumerCallbacks;
import org.oransc.enrichment.controllers.producer.ProducerRegistrationInfo.ProducerEiTypeRegistrationInfo;
import org.oransc.enrichment.repository.EiJob;
import org.oransc.enrichment.repository.EiJobs;
import org.oransc.enrichment.repository.EiProducers;
import org.oransc.enrichment.repository.EiType;
import org.oransc.enrichment.repository.EiTypes;
+import org.oransc.enrichment.repository.ImmutableEiProducerRegistrationInfo;
+import org.oransc.enrichment.repository.ImmutableEiTypeRegistrationInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@Autowired
private EiProducers eiProducers;
- @Autowired
- ProducerCallbacks producerCallbacks;
-
- @Autowired
- ConsumerCallbacks consumerCallbacks;
-
@GetMapping(path = ProducerConsts.API_ROOT + "/eitypes", produces = MediaType.APPLICATION_JSON_VALUE)
@ApiOperation(value = "EI type identifiers", notes = "")
@ApiResponses(
@RequestBody ProducerRegistrationInfo registrationInfo) {
try {
EiProducer previousDefinition = this.eiProducers.get(eiProducerId);
- if (previousDefinition != null) {
- for (EiType type : previousDefinition.getEiTypes()) {
- type.removeProducer(previousDefinition);
- }
- }
-
- EiProducer producer = registerProducer(eiProducerId, registrationInfo);
- if (previousDefinition != null) {
- purgeTypes(previousDefinition.getEiTypes());
- this.consumerCallbacks.notifyConsumersProducerDeleted(previousDefinition);
- }
- this.consumerCallbacks.notifyConsumersProducerAdded(producer);
-
+ this.eiProducers.registerProducer(toEiProducerRegistrationInfo(eiProducerId, registrationInfo));
return new ResponseEntity<>(previousDefinition == null ? HttpStatus.CREATED : HttpStatus.OK);
} catch (Exception e) {
return ErrorResponse.create(e, HttpStatus.NOT_FOUND);
}
}
- private void purgeTypes(Collection<EiType> types) {
- for (EiType type : types) {
- if (type.getProducerIds().isEmpty()) {
- this.eiTypes.remove(type);
- }
- }
- }
-
@DeleteMapping(
path = ProducerConsts.API_ROOT + "/eiproducers/{eiProducerId}",
produces = MediaType.APPLICATION_JSON_VALUE)
public ResponseEntity<Object> deleteEiProducer(@PathVariable("eiProducerId") String eiProducerId) {
try {
final EiProducer producer = this.eiProducers.getProducer(eiProducerId);
- this.eiProducers.deregisterProducer(producer, this.eiTypes, this.eiJobs);
- this.consumerCallbacks.notifyConsumersProducerDeleted(producer);
+ this.eiProducers.deregisterProducer(producer, this.eiTypes);
return new ResponseEntity<>(HttpStatus.NO_CONTENT);
} catch (Exception e) {
return ErrorResponse.create(e, HttpStatus.NOT_FOUND);
}
}
- private EiType registerType(ProducerEiTypeRegistrationInfo typeInfo) {
- EiType type = this.eiTypes.get(typeInfo.eiTypeId);
- if (type == null) {
- type = new EiType(typeInfo.eiTypeId, typeInfo.jobDataSchema);
- this.eiTypes.put(type);
- this.consumerCallbacks.notifyConsumersTypeAdded(type);
- }
- return type;
- }
-
- EiProducer createProducer(Collection<EiType> types, String producerId, ProducerRegistrationInfo registrationInfo) {
- return new EiProducer(producerId, types, registrationInfo.jobCallbackUrl,
- registrationInfo.producerSupervisionCallbackUrl);
- }
-
- private EiProducer registerProducer(String producerId, ProducerRegistrationInfo registrationInfo) {
- ArrayList<EiType> typesForProducer = new ArrayList<>();
- EiProducer producer = createProducer(typesForProducer, producerId, registrationInfo);
- for (ProducerEiTypeRegistrationInfo typeInfo : registrationInfo.types) {
- EiType type = registerType(typeInfo);
- typesForProducer.add(type);
- type.addProducer(producer); //
- }
- this.eiProducers.put(producer);
-
- producerCallbacks.restartJobs(producer, this.eiJobs);
-
- return producer;
- }
-
- ProducerRegistrationInfo toEiProducerRegistrationInfo(EiProducer p) {
+ private ProducerRegistrationInfo toEiProducerRegistrationInfo(EiProducer p) {
Collection<ProducerEiTypeRegistrationInfo> types = new ArrayList<>();
for (EiType type : p.getEiTypes()) {
types.add(toEiTypeRegistrationInfo(type));
}
private ProducerEiTypeInfo toEiTypeInfo(EiType t) {
- return new ProducerEiTypeInfo(t.getJobDataSchema(), t.getProducerIds());
+ Collection<String> producerIds = this.eiProducers.getProducerIdsForType(t.getId());
+ return new ProducerEiTypeInfo(t.getJobDataSchema(), producerIds);
+ }
+
+ private EiProducers.EiProducerRegistrationInfo toEiProducerRegistrationInfo(String eiProducerId,
+ ProducerRegistrationInfo info) {
+ Collection<EiProducers.EiTypeRegistrationInfo> supportedTypes = new ArrayList<>();
+ for (ProducerEiTypeRegistrationInfo typeInfo : info.types) {
+ EiProducers.EiTypeRegistrationInfo i = ImmutableEiTypeRegistrationInfo.builder() //
+ .id(typeInfo.eiTypeId) //
+ .jobDataSchema(typeInfo.jobDataSchema) //
+ .build();
+ supportedTypes.add(i);
+ }
+ return ImmutableEiProducerRegistrationInfo.builder() //
+ .id(eiProducerId) //
+ .jobCallbackUrl(info.jobCallbackUrl) //
+ .producerSupervisionCallbackUrl(info.producerSupervisionCallbackUrl) //
+ .supportedTypes(supportedTypes) //
+ .build();
}
+
}
import java.util.Vector;
import org.oransc.enrichment.configuration.ApplicationConfig;
+import org.oransc.enrichment.controllers.producer.ProducerCallbacks;
import org.oransc.enrichment.exceptions.ServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
private final ApplicationConfig config;
private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
- public EiJobs(ApplicationConfig config) {
+ private final ProducerCallbacks producerCallbacks;
+
+ public EiJobs(ApplicationConfig config, ProducerCallbacks producerCallbacks) {
this.config = config;
GsonBuilder gsonBuilder = new GsonBuilder();
ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
this.gson = gsonBuilder.create();
+ this.producerCallbacks = producerCallbacks;
}
public synchronized void restoreJobsFromDatabase() throws IOException {
for (File file : dbDir.listFiles()) {
String json = Files.readString(file.toPath());
EiJob job = gson.fromJson(json, EiJob.class);
- this.put(job, false);
+ this.doPut(job);
}
-
}
public synchronized void put(EiJob job) {
- this.put(job, true);
+ this.doPut(job);
+ storeJobInFile(job);
}
public synchronized Collection<EiJob> getJobs() {
return allEiJobs.get(id);
}
- public synchronized EiJob remove(String id) {
+ public synchronized EiJob remove(String id, EiProducers eiProducers) {
EiJob job = allEiJobs.get(id);
if (job != null) {
- remove(job);
+ remove(job, eiProducers);
}
return job;
}
- public synchronized void remove(EiJob job) {
+ public synchronized void remove(EiJob job, EiProducers eiProducers) {
this.allEiJobs.remove(job.getId());
jobsByType.remove(job.getTypeId(), job.getId());
jobsByOwner.remove(job.getOwner(), job.getId());
} catch (IOException e) {
logger.warn("Could not remove file: {}", e.getMessage());
}
-
+ this.producerCallbacks.stopEiJob(job, eiProducers);
}
public synchronized int size() {
this.allEiJobs.clear();
this.jobsByType.clear();
jobsByOwner.clear();
+ clearDatabase();
+ }
+
+ private void clearDatabase() {
try {
FileSystemUtils.deleteRecursively(Path.of(getDatabaseDirectory()));
Files.createDirectories(Paths.get(getDatabaseDirectory()));
}
}
- private void put(EiJob job, boolean storePersistently) {
+ private void doPut(EiJob job) {
allEiJobs.put(job.getId(), job);
jobsByType.put(job.getTypeId(), job.getId(), job);
jobsByOwner.put(job.getOwner(), job.getId(), job);
- if (storePersistently) {
- storeJobInFile(job);
- }
}
private void storeJobInFile(EiJob job) {
}
private String getDatabaseDirectory() {
- return config.getVardataDirectory() + "/database";
+ return config.getVardataDirectory() + "/eijobs";
}
}
package org.oransc.enrichment.repository;
import java.lang.invoke.MethodHandles;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Vector;
+import org.immutables.value.Value.Immutable;
+import org.oransc.enrichment.controllers.consumer.ConsumerCallbacks;
+import org.oransc.enrichment.controllers.producer.ProducerCallbacks;
import org.oransc.enrichment.exceptions.ServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
/**
* Dynamic representation of all EiProducers.
*/
@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
+@Component
public class EiProducers {
private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final Map<String, EiProducer> allEiProducers = new HashMap<>();
+ private final MultiMap<EiProducer> producersByType = new MultiMap<>();
- public synchronized void put(EiProducer producer) {
+ @Autowired
+ private ProducerCallbacks producerCallbacks;
+
+ @Autowired
+ private ConsumerCallbacks consumerCallbacks;
+
+ @Autowired
+ private EiTypes eiTypes;
+
+ @Autowired
+ private EiJobs eiJobs;
+
+ @Immutable
+ public interface EiTypeRegistrationInfo {
+ String id();
+
+ Object jobDataSchema();
+ }
+
+ @Immutable
+ public interface EiProducerRegistrationInfo {
+ String id();
+
+ Collection<EiTypeRegistrationInfo> supportedTypes();
+
+ String jobCallbackUrl();
+
+ String producerSupervisionCallbackUrl();
+
+ }
+
+ public EiProducer registerProducer(EiProducerRegistrationInfo producerInfo) {
+ final String producerId = producerInfo.id();
+ EiProducer previousDefinition = this.get(producerId);
+ if (previousDefinition != null) {
+ for (EiType type : previousDefinition.getEiTypes()) {
+ producersByType.remove(type.getId(), producerId);
+ }
+ allEiProducers.remove(producerId);
+ }
+
+ EiProducer producer = createProducer(producerInfo);
allEiProducers.put(producer.getId(), producer);
+ for (EiType type : producer.getEiTypes()) {
+ producersByType.put(type.getId(), producer.getId(), producer);
+ }
+
+ if (previousDefinition != null) {
+ purgeTypes(previousDefinition.getEiTypes());
+ this.consumerCallbacks.notifyConsumersProducerDeleted(previousDefinition);
+ }
+
+ producerCallbacks.restartEiJobs(producer, this.eiJobs);
+ consumerCallbacks.notifyConsumersProducerAdded(producer);
+ return producer;
+ }
+
+ private void purgeTypes(Collection<EiType> types) {
+ for (EiType type : types) {
+ if (getProducersForType(type.getId()).isEmpty()) {
+ this.eiTypes.remove(type);
+ }
+ }
+ }
+
+ private EiType getType(EiTypeRegistrationInfo typeInfo) {
+ EiType type = this.eiTypes.get(typeInfo.id());
+ if (type == null) {
+ type = new EiType(typeInfo.id(), typeInfo.jobDataSchema());
+ this.eiTypes.put(type);
+ this.consumerCallbacks.notifyConsumersTypeAdded(type);
+ }
+ return type;
+ }
+
+ private EiProducer createProducer(EiProducerRegistrationInfo producerInfo) {
+ ArrayList<EiType> types = new ArrayList<>();
+
+ EiProducer producer = new EiProducer(producerInfo.id(), types, producerInfo.jobCallbackUrl(),
+ producerInfo.producerSupervisionCallbackUrl());
+
+ for (EiTypeRegistrationInfo typeInfo : producerInfo.supportedTypes()) {
+ EiType type = getType(typeInfo);
+ types.add(type);
+ }
+ return producer;
}
public synchronized Collection<EiProducer> getAllProducers() {
return allEiProducers.get(id);
}
- public synchronized void remove(String id) {
- this.allEiProducers.remove(id);
- }
-
public synchronized int size() {
return allEiProducers.size();
}
public synchronized void clear() {
this.allEiProducers.clear();
+ this.producersByType.clear();
}
- public void deregisterProducer(EiProducer producer, EiTypes eiTypes, EiJobs eiJobs) {
- this.remove(producer);
+ public void deregisterProducer(EiProducer producer, EiTypes eiTypes) {
+ allEiProducers.remove(producer.getId());
for (EiType type : producer.getEiTypes()) {
- boolean removed = type.removeProducer(producer) != null;
- if (!removed) {
+ if (producersByType.remove(type.getId(), producer.getId()) == null) {
this.logger.error("Bug, no producer found");
}
- if (type.getProducerIds().isEmpty()) {
+ if (this.producersByType.get(type.getId()).isEmpty()) {
eiTypes.remove(type);
}
}
+ this.consumerCallbacks.notifyConsumersProducerDeleted(producer);
}
- private synchronized void remove(EiProducer producer) {
- this.allEiProducers.remove(producer.getId());
+ public synchronized Collection<EiProducer> getProducersForType(EiType type) {
+ return this.producersByType.get(type.getId());
+ }
+
+ public synchronized Collection<EiProducer> getProducersForType(String typeId) {
+ return this.producersByType.get(typeId);
+ }
+
+ public synchronized Collection<String> getProducerIdsForType(String typeId) {
+ Collection<String> producerIds = new ArrayList<>();
+ for (EiProducer p : this.getProducersForType(typeId)) {
+ producerIds.add(p.getId());
+ }
+ return producerIds;
}
}
package org.oransc.enrichment.repository;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-
import lombok.Getter;
public class EiType {
@Getter
private final Object jobDataSchema;
- private final Map<String, EiProducer> producers = new HashMap<>();
-
public EiType(String id, Object jobDataSchema) {
this.id = id;
this.jobDataSchema = jobDataSchema;
}
- public synchronized Collection<EiProducer> getProducers() {
- return Collections.unmodifiableCollection(producers.values());
- }
-
- public synchronized Collection<String> getProducerIds() {
- return Collections.unmodifiableCollection(producers.keySet());
- }
-
- public synchronized void addProducer(EiProducer producer) {
- this.producers.put(producer.getId(), producer);
- }
-
- public synchronized EiProducer removeProducer(EiProducer producer) {
- return this.producers.remove(producer.getId());
- }
}
package org.oransc.enrichment.repository;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.TypeAdapterFactory;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
import java.lang.invoke.MethodHandles;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.ServiceLoader;
import java.util.Vector;
+import org.oransc.enrichment.configuration.ApplicationConfig;
import org.oransc.enrichment.exceptions.ServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.util.FileSystemUtils;
/**
* Dynamic representation of all EI types in the system.
public class EiTypes {
private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final Map<String, EiType> allEiTypes = new HashMap<>();
+ private final ApplicationConfig config;
+ private final Gson gson;
+
+ public EiTypes(ApplicationConfig config) {
+ this.config = config;
+ GsonBuilder gsonBuilder = new GsonBuilder();
+ ServiceLoader.load(TypeAdapterFactory.class).forEach(gsonBuilder::registerTypeAdapterFactory);
+ this.gson = gsonBuilder.create();
+ }
+
+ public synchronized void restoreTypesFromDatabase() throws IOException {
+ Files.createDirectories(Paths.get(getDatabaseDirectory()));
+ File dbDir = new File(getDatabaseDirectory());
+
+ for (File file : dbDir.listFiles()) {
+ String json = Files.readString(file.toPath());
+ EiType type = gson.fromJson(json, EiType.class);
+ allEiTypes.put(type.getId(), type);
+ }
+ }
public synchronized void put(EiType type) {
allEiTypes.put(type.getId(), type);
+ storeInFile(type);
}
public synchronized Collection<EiType> getAllEiTypes() {
return allEiTypes.get(id);
}
- public synchronized void remove(String id) {
- allEiTypes.remove(id);
- }
-
public synchronized void remove(EiType type) {
- this.remove(type.getId());
+ allEiTypes.remove(type.getId());
+ try {
+ Files.delete(getPath(type));
+ } catch (IOException e) {
+ logger.warn("Could not remove file: {} {}", type.getId(), e.getMessage());
+ }
}
public synchronized int size() {
public synchronized void clear() {
this.allEiTypes.clear();
+ clearDatabase();
+ }
+
+ private void clearDatabase() {
+ try {
+ FileSystemUtils.deleteRecursively(Path.of(getDatabaseDirectory()));
+ Files.createDirectories(Paths.get(getDatabaseDirectory()));
+ } catch (IOException e) {
+ logger.warn("Could not delete database : {}", e.getMessage());
+ }
+ }
+
+ private void storeInFile(EiType type) {
+ try {
+ try (PrintStream out = new PrintStream(new FileOutputStream(getFile(type)))) {
+ out.print(gson.toJson(type));
+ }
+ } catch (Exception e) {
+ logger.warn("Could not save job: {} {}", type.getId(), e.getMessage());
+ }
+ }
+
+ private File getFile(EiType type) {
+ return getPath(type).toFile();
+ }
+
+ private Path getPath(EiType type) {
+ return getPath(type.getId());
+ }
+
+ private Path getPath(String typeId) {
+ return Path.of(getDatabaseDirectory(), typeId);
+ }
+
+ private String getDatabaseDirectory() {
+ return config.getVardataDirectory() + "/eitypes";
}
}
this.map.computeIfAbsent(key, k -> new HashMap<>()).put(id, value);
}
- public void remove(String key, String id) {
+ public T remove(String key, String id) {
Map<String, T> innerMap = this.map.get(key);
if (innerMap != null) {
- innerMap.remove(id);
+ T removedElement = innerMap.remove(id);
if (innerMap.isEmpty()) {
this.map.remove(key);
}
+ return removedElement;
}
+ return null;
}
public Collection<T> get(String key) {
logger.warn("Unresponsive producer: {} exception: {}", producer.getId(), throwable.getMessage());
producer.setAliveStatus(false);
if (producer.isDead()) {
- this.eiProducers.deregisterProducer(producer, this.eiTypes, this.eiJobs);
+ this.eiProducers.deregisterProducer(producer, this.eiTypes);
this.consumerCallbacks.notifyConsumersProducerDeleted(producer);
}
}
import org.oransc.enrichment.controllers.consumer.ConsumerEiJobInfo;
import org.oransc.enrichment.controllers.consumer.ConsumerEiJobStatus;
import org.oransc.enrichment.controllers.consumer.ConsumerEiTypeInfo;
+import org.oransc.enrichment.controllers.producer.ProducerCallbacks;
import org.oransc.enrichment.controllers.producer.ProducerConsts;
import org.oransc.enrichment.controllers.producer.ProducerJobInfo;
import org.oransc.enrichment.controllers.producer.ProducerRegistrationInfo;
@Autowired
ProducerSupervision producerSupervision;
+ @Autowired
+ ProducerCallbacks producerCallbacks;
+
private static Gson gson = new GsonBuilder().create();
/**
assertThat(this.eiTypes.size()).isEqualTo(1);
EiType type = this.eiTypes.getType(EI_TYPE_ID);
- assertThat(type.getProducerIds()).contains("eiProducerId");
+ assertThat(this.eiProducers.getProducersForType(EI_TYPE_ID).size()).isEqualTo(1);
assertThat(this.eiProducers.size()).isEqualTo(1);
assertThat(this.eiProducers.get("eiProducerId").getEiTypes().iterator().next().getId()).isEqualTo(EI_TYPE_ID);
assertThat(this.eiProducers.size()).isEqualTo(2);
EiType type = this.eiTypes.getType(EI_TYPE_ID);
- assertThat(type.getProducerIds()).contains("eiProducerId");
- assertThat(type.getProducerIds()).contains("eiProducerId2");
+ assertThat(this.eiProducers.getProducerIdsForType(type.getId())).contains("eiProducerId");
+ assertThat(this.eiProducers.getProducerIdsForType(type.getId())).contains("eiProducerId2");
putEiJob(EI_TYPE_ID, "jobId");
assertThat(this.eiJobs.size()).isEqualTo(1);
deleteEiProducer("eiProducerId");
assertThat(this.eiProducers.size()).isEqualTo(1);
- assertThat(this.eiTypes.getType(EI_TYPE_ID).getProducerIds()).doesNotContain("eiProducerId");
+ assertThat(this.eiProducers.getProducerIdsForType(EI_TYPE_ID)).doesNotContain("eiProducerId");
verifyJobStatus("jobId", "ENABLED");
deleteEiProducer("eiProducerId2");
// After 3 failed checks, the producer and the type shall be deregisterred
this.producerSupervision.createTask().blockLast();
- assertThat(this.eiProducers.size()).isEqualTo(0);
- assertThat(this.eiTypes.size()).isEqualTo(0);
+ assertThat(this.eiProducers.size()).isEqualTo(0); // The producer is removed
+ assertThat(this.eiTypes.size()).isEqualTo(0); // The type is removed
verifyJobStatus(EI_JOB_ID, "DISABLED");
// Job disabled status notification shall be received
{
// Restore the jobs
- EiJobs jobs = new EiJobs(this.applicationConfig);
+ EiJobs jobs = new EiJobs(this.applicationConfig, this.producerCallbacks);
jobs.restoreJobsFromDatabase();
assertThat(jobs.size()).isEqualTo(2);
- jobs.remove("jobId1");
- jobs.remove("jobId2");
+ jobs.remove("jobId1", this.eiProducers);
+ jobs.remove("jobId2", this.eiProducers);
}
{
// Restore the jobs, no jobs in database
- EiJobs jobs = new EiJobs(this.applicationConfig);
+ EiJobs jobs = new EiJobs(this.applicationConfig, this.producerCallbacks);
jobs.restoreJobsFromDatabase();
assertThat(jobs.size()).isEqualTo(0);
}
logger.warn("Test removing a job when the db file is gone");
- this.eiJobs.remove("jobId1");
+ this.eiJobs.remove("jobId1", this.eiProducers);
assertThat(this.eiJobs.size()).isEqualTo(1);
+
+ ProducerSimulatorController.TestResults simulatorResults = this.producerSimulator.getTestResults();
+ await().untilAsserted(() -> assertThat(simulatorResults.jobsStopped.size()).isEqualTo(3));
+ }
+
+ @Test
+ void testEiTypesDatabase() throws Exception {
+ putEiProducerWithOneType(EI_PRODUCER_ID, EI_TYPE_ID);
+
+ assertThat(this.eiTypes.size()).isEqualTo(1);
+
+ {
+ // Restore the types
+ EiTypes types = new EiTypes(this.applicationConfig);
+ types.restoreTypesFromDatabase();
+ assertThat(types.size()).isEqualTo(1);
+
+ }
+ {
+ // Restore the jobs, no jobs in database
+ EiTypes types = new EiTypes(this.applicationConfig);
+ types.clear();
+ types.restoreTypesFromDatabase();
+ assertThat(types.size()).isEqualTo(0);
+ }
+ logger.warn("Test removing a job when the db file is gone");
+ this.eiTypes.remove(this.eiTypes.getType(EI_TYPE_ID));
+ assertThat(this.eiJobs.size()).isEqualTo(0);
}
private void deleteEiProducer(String eiProducerId) {
String body = gson.toJson(producerEiRegistratioInfo(eiTypeId));
restClient().putForEntity(url, body).block();
+
return this.eiTypes.getType(eiTypeId);
}