public void notifyConsumersProducerDeleted(EiProducer eiProducer) {
for (EiType type : eiProducer.getEiTypes()) {
if (this.eiTypes.get(type.getId()) == null) {
+ // The type is removed
for (EiJob job : this.eiJobs.getJobsForType(type)) {
- noifyJobOwner(job, new ConsumerEiJobStatus(ConsumerEiJobStatus.EiJobStatusValues.DISABLED));
+ if (job.isLastStatusReportedEnabled()) {
+ noifyJobOwner(job, new ConsumerEiJobStatus(ConsumerEiJobStatus.EiJobStatusValues.DISABLED));
+ job.setLastReportedStatus(false);
+ }
}
}
}
}
+ public void notifyConsumersProducerAdded(EiProducer eiProducer) {
+ for (EiType type : eiProducer.getEiTypes()) {
+ notifyConsumersTypeAdded(type);
+ }
+ }
+
public void notifyConsumersTypeAdded(EiType eiType) {
for (EiJob job : this.eiJobs.getJobsForType(eiType)) {
- noifyJobOwner(job, new ConsumerEiJobStatus(ConsumerEiJobStatus.EiJobStatusValues.ENABLED));
+ if (!job.isLastStatusReportedEnabled()) {
+ noifyJobOwner(job, new ConsumerEiJobStatus(ConsumerEiJobStatus.EiJobStatusValues.ENABLED));
+ job.setLastReportedStatus(true);
+ }
}
}
private void noifyJobOwner(EiJob job, ConsumerEiJobStatus status) {
- if (!job.jobStatusUrl().isEmpty()) {
+ if (!job.getJobStatusUrl().isEmpty()) {
String body = gson.toJson(status);
- this.restClient.post(job.jobStatusUrl(), body) //
- .subscribe(notUsed -> logger.debug("Consumer notified OK {}", job.id()), //
- throwable -> logger.warn("Consumer notify failed {} {}", job.jobStatusUrl(), throwable.toString()), //
+ this.restClient.post(job.getJobStatusUrl(), body) //
+ .subscribe(notUsed -> logger.debug("Consumer notified OK {}", job.getId()), //
+ throwable -> logger.warn("Consumer notify failed {} {}", job.getJobStatusUrl(),
+ throwable.toString()), //
null);
}
}
import org.oransc.enrichment.repository.EiProducer;
import org.oransc.enrichment.repository.EiType;
import org.oransc.enrichment.repository.EiTypes;
-import org.oransc.enrichment.repository.ImmutableEiJob;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
List<String> result = new ArrayList<>();
if (owner != null) {
for (EiJob job : this.eiJobs.getJobsForOwner(owner)) {
- if (eiTypeId == null || job.typeId().equals(eiTypeId)) {
- result.add(job.id());
+ if (eiTypeId == null || job.getTypeId().equals(eiTypeId)) {
+ result.add(job.getId());
}
}
} else if (eiTypeId != null) {
- this.eiJobs.getJobsForType(eiTypeId).forEach(job -> result.add(job.id()));
+ this.eiJobs.getJobsForType(eiTypeId).forEach(job -> result.add(job.getId()));
} else {
- this.eiJobs.getJobs().forEach(job -> result.add(job.id()));
+ this.eiJobs.getJobs().forEach(job -> result.add(job.getId()));
}
return new ResponseEntity<>(gson.toJson(result), HttpStatus.OK);
} catch (
private Collection<EiProducer> getProducers(EiJob eiJob) {
try {
- return this.eiTypes.getType(eiJob.typeId()).getProducers();
+ return this.eiTypes.getType(eiJob.getTypeId()).getProducers();
} catch (Exception e) {
return new Vector<>();
}
}
private ConsumerEiJobStatus toEiJobStatus(EiJob job) {
- for (EiProducer producer : getProducers(job)) {
- if (producer.isAvailable()) {
- return new ConsumerEiJobStatus(ConsumerEiJobStatus.EiJobStatusValues.ENABLED);
- }
+ if (getProducers(job).isEmpty()) {
+ return new ConsumerEiJobStatus(ConsumerEiJobStatus.EiJobStatusValues.DISABLED);
+ } else {
+ return new ConsumerEiJobStatus(ConsumerEiJobStatus.EiJobStatusValues.ENABLED);
}
- return new ConsumerEiJobStatus(ConsumerEiJobStatus.EiJobStatusValues.DISABLED);
}
@DeleteMapping(path = "/eijobs/{eiJobId}", produces = MediaType.APPLICATION_JSON_VALUE)
validateJsonObjectAgainstSchema(eiType.getJobDataSchema(), eiJobInfo.jobData);
EiJob existingEiJob = this.eiJobs.get(eiJobId);
- if (existingEiJob != null && !existingEiJob.typeId().equals(eiJobInfo.eiTypeId)) {
+ if (existingEiJob != null && !existingEiJob.getTypeId().equals(eiJobInfo.eiTypeId)) {
throw new ServiceException("Not allowed to change type for existing EI job", HttpStatus.CONFLICT);
}
return Mono.just(toEiJob(eiJobInfo, eiJobId, eiType));
}
private EiJob toEiJob(ConsumerEiJobInfo info, String id, EiType type) {
- return ImmutableEiJob.builder() //
- .id(id) //
- .typeId(type.getId()) //
- .owner(info.owner) //
- .jobData(info.jobData) //
- .targetUrl(info.targetUri) //
- .jobStatusUrl(info.statusNotificationUri == null ? "" : info.statusNotificationUri) //
- .build();
+ return new EiJob(id, //
+ type.getId(), //
+ info.owner, //
+ info.jobData, //
+ info.targetUri, //
+ info.statusNotificationUri == null ? "" : info.statusNotificationUri);
}
private ConsumerEiTypeInfo toEiTypeInfo() {
}
private ConsumerEiJobInfo toEiJobInfo(EiJob s) {
- return new ConsumerEiJobInfo(s.typeId(), s.jobData(), s.owner(), s.targetUrl(), s.jobStatusUrl());
+ return new ConsumerEiJobInfo(s.getTypeId(), s.getJobData(), s.getOwner(), s.getTargetUrl(),
+ s.getJobStatusUrl());
}
}
import com.google.gson.GsonBuilder;
import java.lang.invoke.MethodHandles;
+import java.time.Duration;
import java.util.Collection;
import java.util.Vector;
import org.oransc.enrichment.clients.AsyncRestClientFactory;
import org.oransc.enrichment.configuration.ApplicationConfig;
import org.oransc.enrichment.repository.EiJob;
+import org.oransc.enrichment.repository.EiJobs;
import org.oransc.enrichment.repository.EiProducer;
import org.oransc.enrichment.repository.EiTypes;
import org.slf4j.Logger;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
/**
* Callbacks to the EiProducer
public void notifyProducersJobDeleted(EiJob eiJob) {
for (EiProducer producer : getProducers(eiJob)) {
- String url = producer.getJobCallbackUrl() + "/" + eiJob.id();
+ String url = producer.getJobCallbackUrl() + "/" + eiJob.getId();
restClient.delete(url) //
.subscribe(notUsed -> logger.debug("Producer job deleted OK {}", producer.getId()), //
throwable -> logger.warn("Producer job delete failed {} {}", producer.getId(),
* @return the number of producers that returned OK
*/
public Mono<Integer> notifyProducersJobStarted(EiJob eiJob) {
+ Retry retrySpec = Retry.fixedDelay(1, Duration.ofSeconds(1));
return Flux.fromIterable(getProducers(eiJob)) //
- .flatMap(eiProducer -> notifyProducerJobStarted(eiProducer, eiJob)) //
+ .flatMap(eiProducer -> notifyProducerJobStarted(eiProducer, eiJob, retrySpec)) //
.collectList() //
.flatMap(okResponses -> Mono.just(Integer.valueOf(okResponses.size()))); //
}
/**
- * Calls one producer for an EiJob activation.
+ * Restart all jobs for one producer
*
- * @param producer a producer
- * @param eiJob an EI job
- * @return the body of the response from the REST call
+ * @param producer
+ * @param eiJobs
*/
- public Mono<String> notifyProducerJobStarted(EiProducer producer, EiJob eiJob) {
+ public void restartJobs(EiProducer producer, EiJobs eiJobs) {
+ final int maxNoOfParalellRequests = 10;
+ Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1));
+
+ Flux.fromIterable(producer.getEiTypes()) //
+ .flatMap(type -> Flux.fromIterable(eiJobs.getJobsForType(type))) //
+ .flatMap(job -> notifyProducerJobStarted(producer, job, retrySpec), maxNoOfParalellRequests) //
+ .onErrorResume(t -> {
+ logger.error("Could not restart EI Job for producer: {}, reason :{}", producer.getId(), t.getMessage());
+ return Flux.empty();
+ }) //
+ .subscribe();
+ }
+
+ private Mono<String> notifyProducerJobStarted(EiProducer producer, EiJob eiJob, Retry retrySpec) {
ProducerJobInfo request = new ProducerJobInfo(eiJob);
String body = gson.toJson(request);
- return restClient.post(producer.getJobCallbackUrl(), body)
- .doOnNext(resp -> logger.debug("Job subscription started OK {}", producer.getId()))
+ return restClient.post(producer.getJobCallbackUrl(), body) //
+ .retryWhen(retrySpec) //
+ .doOnNext(resp -> logger.debug("Job subscription {} started OK {}", eiJob.getId(), producer.getId())) //
.onErrorResume(throwable -> {
logger.warn("Job subscription failed {}", producer.getId(), throwable.toString());
return Mono.empty();
private Collection<EiProducer> getProducers(EiJob eiJob) {
try {
- return this.eiTypes.getType(eiJob.typeId()).getProducers();
+ return this.eiTypes.getType(eiJob.getTypeId()).getProducers();
} catch (Exception e) {
return new Vector<>();
}
ProducerStatusInfo.OperationalState opState =
producer.isAvailable() ? ProducerStatusInfo.OperationalState.ENABLED
: ProducerStatusInfo.OperationalState.DISABLED;
- this.logger.debug("opState {}", opState);
return new ProducerStatusInfo(opState);
}
}
}
- registerProducer(eiProducerId, registrationInfo);
+ EiProducer producer = registerProducer(eiProducerId, registrationInfo);
if (previousDefinition != null) {
purgeTypes(previousDefinition.getEiTypes());
+ this.consumerCallbacks.notifyConsumersProducerDeleted(previousDefinition);
}
+ this.consumerCallbacks.notifyConsumersProducerAdded(producer);
return new ResponseEntity<>(previousDefinition == null ? HttpStatus.CREATED : HttpStatus.OK);
} catch (Exception e) {
}
private EiProducer registerProducer(String producerId, ProducerRegistrationInfo registrationInfo) {
- ArrayList<EiType> types = new ArrayList<>();
+ ArrayList<EiType> typesForProducer = new ArrayList<>();
+ EiProducer producer = createProducer(typesForProducer, producerId, registrationInfo);
for (ProducerEiTypeRegistrationInfo typeInfo : registrationInfo.types) {
- types.add(registerType(typeInfo));
+ EiType type = registerType(typeInfo);
+ typesForProducer.add(type);
+ type.addProducer(producer); //
}
- EiProducer producer = createProducer(types, producerId, registrationInfo);
this.eiProducers.put(producer);
- for (EiType type : types) {
- for (EiJob job : this.eiJobs.getJobsForType(type)) {
- this.producerCallbacks.notifyProducerJobStarted(producer, job) //
- .subscribe();
- }
- type.addProducer(producer);
- }
+ producerCallbacks.restartJobs(producer, this.eiJobs);
+
return producer;
}
}
public ProducerJobInfo(EiJob job) {
- this(job.jobData(), job.id(), job.typeId(), job.targetUrl());
+ this(job.getJobData(), job.getId(), job.getTypeId(), job.getTargetUrl());
}
public ProducerJobInfo() {
package org.oransc.enrichment.repository;
-import org.immutables.gson.Gson;
-import org.immutables.value.Value;
+import java.lang.invoke.MethodHandles;
+
+import lombok.Getter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Represents the dynamic information about a EI job
*/
-@Value.Immutable
-@Gson.TypeAdapters
-public interface EiJob {
- String id();
+public class EiJob {
+ private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ @Getter
+ private final String id;
+
+ @Getter
+ private final String typeId;
+
+ @Getter
+ private final String owner;
+
+ @Getter
+ private final Object jobData;
+
+ @Getter
+ private final String targetUrl;
- String typeId();
+ @Getter
+ private final String jobStatusUrl;
- String owner();
+ @Getter
+ private boolean isLastStatusReportedEnabled = true;
- Object jobData();
+ public EiJob(String id, String typeId, String owner, Object jobData, String targetUrl, String jobStatusUrl) {
+ this.id = id;
+ this.typeId = typeId;
+ this.owner = owner;
+ this.jobData = jobData;
+ this.targetUrl = targetUrl;
+ this.jobStatusUrl = jobStatusUrl;
+ }
- String targetUrl();
+ public void setLastReportedStatus(boolean isEnabled) {
+ this.isLastStatusReportedEnabled = isEnabled;
+ logger.debug("Job status id: {}, enabled: {}", this.isLastStatusReportedEnabled, isEnabled);
+ }
- String jobStatusUrl();
}
}
public synchronized void restoreJobsFromDatabase() throws IOException {
+ Files.createDirectories(Paths.get(getDatabaseDirectory()));
File dbDir = new File(getDatabaseDirectory());
+
for (File file : dbDir.listFiles()) {
String json = Files.readString(file.toPath());
EiJob job = gson.fromJson(json, EiJob.class);
this.put(job, false);
}
+
}
public synchronized void put(EiJob job) {
}
public synchronized void remove(EiJob job) {
- this.allEiJobs.remove(job.id());
- jobsByType.remove(job.typeId(), job.id());
- jobsByOwner.remove(job.owner(), job.id());
+ this.allEiJobs.remove(job.getId());
+ jobsByType.remove(job.getTypeId(), job.getId());
+ jobsByOwner.remove(job.getOwner(), job.getId());
try {
Files.delete(getPath(job));
jobsByOwner.clear();
try {
FileSystemUtils.deleteRecursively(Path.of(getDatabaseDirectory()));
+ Files.createDirectories(Paths.get(getDatabaseDirectory()));
} catch (IOException e) {
logger.warn("Could not delete database : {}", e.getMessage());
}
}
private void put(EiJob job, boolean storePersistently) {
- allEiJobs.put(job.id(), job);
- jobsByType.put(job.typeId(), job.id(), job);
- jobsByOwner.put(job.owner(), job.id(), job);
+ allEiJobs.put(job.getId(), job);
+ jobsByType.put(job.getTypeId(), job.getId(), job);
+ jobsByOwner.put(job.getOwner(), job.getId(), job);
if (storePersistently) {
storeJobInFile(job);
}
private void storeJobInFile(EiJob job) {
try {
- Files.createDirectories(Paths.get(getDatabaseDirectory()));
try (PrintStream out = new PrintStream(new FileOutputStream(getFile(job)))) {
out.print(gson.toJson(job));
}
} catch (Exception e) {
- logger.warn("Could not save job: {} {}", job.id(), e.getMessage());
+ logger.warn("Could not save job: {} {}", job.getId(), e.getMessage());
}
}
}
private Path getPath(EiJob job) {
- return Path.of(getDatabaseDirectory(), job.id());
+ return Path.of(getDatabaseDirectory(), job.getId());
}
private String getDatabaseDirectory() {
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
+import java.lang.invoke.MethodHandles;
import java.util.ArrayList;
import java.util.Collection;
import org.oransc.enrichment.repository.EiType;
import org.oransc.enrichment.repository.EiTypes;
import org.oransc.enrichment.tasks.ProducerSupervision;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
"app.webclient.trust-store=./config/truststore.jks", //
"app.vardata-directory=./target"})
class ApplicationTest {
+ private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
private final String EI_TYPE_ID = "typeId";
private final String EI_PRODUCER_ID = "producerId";
private final String EI_JOB_PROPERTY = "\"property1\"";
ProducerJobInfo request = simulatorResults.jobsStarted.get(0);
assertThat(request.id).isEqualTo("jobId");
- assertThat(simulatorResults.noOfRejectedCreate).isEqualTo(1);
+ // One retry --> two calls
+ await().untilAsserted(() -> assertThat(simulatorResults.noOfRejectedCreate).isEqualTo(2));
+ assertThat(simulatorResults.noOfRejectedCreate).isEqualTo(2);
resp = restClient().putForEntity(url, body).block();
assertThat(resp.getStatusCode()).isEqualTo(HttpStatus.OK);
EiJob job = this.eiJobs.getJob("jobId");
- assertThat(job.owner()).isEqualTo("owner");
+ assertThat(job.getOwner()).isEqualTo("owner");
}
@Test
testErrorCode(restClient().put(url, body), HttpStatus.CONFLICT, "Job not accepted by any producers");
ProducerSimulatorController.TestResults simulatorResults = this.producerSimulator.getTestResults();
- assertThat(simulatorResults.noOfRejectedCreate).isEqualTo(1);
+ // There is one retry -> 2 calls
+ await().untilAsserted(() -> assertThat(simulatorResults.noOfRejectedCreate).isEqualTo(2));
+ assertThat(simulatorResults.noOfRejectedCreate).isEqualTo(2);
}
@Test
@Test
void testJobStatusNotifications() throws JsonMappingException, JsonProcessingException, ServiceException {
+ ConsumerSimulatorController.TestResults consumerCalls = this.consumerSimulator.getTestResults();
+ ProducerSimulatorController.TestResults producerCalls = this.producerSimulator.getTestResults();
+
putEiProducerWithOneType("eiProducerId", EI_TYPE_ID);
putEiJob(EI_TYPE_ID, "jobId");
+ putEiProducerWithOneType("eiProducerId2", EI_TYPE_ID);
+ await().untilAsserted(() -> assertThat(producerCalls.jobsStarted.size()).isEqualTo(2));
+ deleteEiProducer("eiProducerId2");
+ assertThat(this.eiTypes.size()).isEqualTo(1); // The type remains, one producer left
deleteEiProducer("eiProducerId");
assertThat(this.eiTypes.size()).isZero(); // The type is gone
assertThat(this.eiJobs.size()).isEqualTo(1); // The job remains
- ConsumerSimulatorController.TestResults consumerResults = this.consumerSimulator.getTestResults();
- await().untilAsserted(() -> assertThat(consumerResults.status.size()).isEqualTo(1));
- assertThat(consumerResults.status.get(0).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.DISABLED);
+ await().untilAsserted(() -> assertThat(consumerCalls.status.size()).isEqualTo(1));
+ assertThat(consumerCalls.status.get(0).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.DISABLED);
putEiProducerWithOneType("eiProducerId", EI_TYPE_ID);
- await().untilAsserted(() -> assertThat(consumerResults.status.size()).isEqualTo(2));
- assertThat(consumerResults.status.get(1).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.ENABLED);
+ await().untilAsserted(() -> assertThat(consumerCalls.status.size()).isEqualTo(2));
+ assertThat(consumerCalls.status.get(1).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.ENABLED);
+ }
+
+ @Test
+ void testJobStatusNotifications2() throws JsonMappingException, JsonProcessingException, ServiceException {
+ // Test replacing a producer with new and removed types
+
+ // Create a job
+ putEiProducerWithOneType(EI_PRODUCER_ID, EI_TYPE_ID);
+ putEiJob(EI_TYPE_ID, EI_JOB_ID);
+
+ // change the type for the producer, the EI_TYPE_ID is deleted
+ putEiProducerWithOneType(EI_PRODUCER_ID, "junk");
+ verifyJobStatus(EI_JOB_ID, "DISABLED");
+ ConsumerSimulatorController.TestResults consumerCalls = this.consumerSimulator.getTestResults();
+ await().untilAsserted(() -> assertThat(consumerCalls.status.size()).isEqualTo(1));
+ assertThat(consumerCalls.status.get(0).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.DISABLED);
+
+ putEiProducerWithOneType(EI_PRODUCER_ID, EI_TYPE_ID);
+ verifyJobStatus(EI_JOB_ID, "ENABLED");
+ await().untilAsserted(() -> assertThat(consumerCalls.status.size()).isEqualTo(2));
+ assertThat(consumerCalls.status.get(1).state).isEqualTo(ConsumerEiJobStatus.EiJobStatusValues.ENABLED);
}
@Test
{
// Create a job
putEiProducerWithOneType(EI_PRODUCER_ID, EI_TYPE_ID);
- putEiJob(EI_TYPE_ID, "jobId");
+ putEiJob(EI_TYPE_ID, EI_JOB_ID);
deleteEiProducer(EI_PRODUCER_ID);
}
this.producerSupervision.createTask().blockLast();
this.producerSupervision.createTask().blockLast();
+
+ // Now we have one producer that is disabled, but the job will be enabled until
+ // the producer/type is removed
assertThat(this.eiProducers.size()).isEqualTo(1);
assertProducerOpState("simulateProducerError", ProducerStatusInfo.OperationalState.DISABLED);
+ verifyJobStatus(EI_JOB_ID, "ENABLED");
// After 3 failed checks, the producer and the type shall be deregisterred
this.producerSupervision.createTask().blockLast();
assertThat(this.eiProducers.size()).isEqualTo(0);
assertThat(this.eiTypes.size()).isEqualTo(0);
+ verifyJobStatus(EI_JOB_ID, "DISABLED");
// Job disabled status notification shall be received
ConsumerSimulatorController.TestResults consumerResults = this.consumerSimulator.getTestResults();
jobs.restoreJobsFromDatabase();
assertThat(jobs.size()).isEqualTo(0);
}
-
- this.eiJobs.remove("jobId1"); // removing a job when the db file is gone
+ logger.warn("Test removing a job when the db file is gone");
+ this.eiJobs.remove("jobId1");
assertThat(this.eiJobs.size()).isEqualTo(1);
}