import com.google.gson.GsonBuilder;
import java.lang.invoke.MethodHandles;
+import java.time.Duration;
import java.util.Collection;
import java.util.Vector;
import org.oransc.enrichment.clients.AsyncRestClientFactory;
import org.oransc.enrichment.configuration.ApplicationConfig;
import org.oransc.enrichment.repository.EiJob;
+import org.oransc.enrichment.repository.EiJobs;
import org.oransc.enrichment.repository.EiProducer;
import org.oransc.enrichment.repository.EiTypes;
import org.slf4j.Logger;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
+import reactor.util.retry.Retry;
/**
* Callbacks to the EiProducer
public void notifyProducersJobDeleted(EiJob eiJob) {
for (EiProducer producer : getProducers(eiJob)) {
- String url = producer.getJobCallbackUrl() + "/" + eiJob.id();
+ String url = producer.getJobCallbackUrl() + "/" + eiJob.getId();
restClient.delete(url) //
.subscribe(notUsed -> logger.debug("Producer job deleted OK {}", producer.getId()), //
throwable -> logger.warn("Producer job delete failed {} {}", producer.getId(),
* @return the number of producers that returned OK
*/
public Mono<Integer> notifyProducersJobStarted(EiJob eiJob) {
+ Retry retrySpec = Retry.fixedDelay(1, Duration.ofSeconds(1));
return Flux.fromIterable(getProducers(eiJob)) //
- .flatMap(eiProducer -> notifyProducerJobStarted(eiProducer, eiJob)) //
+ .flatMap(eiProducer -> notifyProducerJobStarted(eiProducer, eiJob, retrySpec)) //
.collectList() //
.flatMap(okResponses -> Mono.just(Integer.valueOf(okResponses.size()))); //
}
/**
- * Calls one producer for an EiJob activation.
+ * Restart all jobs for one producer
*
- * @param producer a producer
- * @param eiJob an EI job
- * @return the body of the response from the REST call
+ * @param producer
+ * @param eiJobs
*/
- public Mono<String> notifyProducerJobStarted(EiProducer producer, EiJob eiJob) {
+ public void restartJobs(EiProducer producer, EiJobs eiJobs) {
+ final int maxNoOfParalellRequests = 10;
+ Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1));
+
+ Flux.fromIterable(producer.getEiTypes()) //
+ .flatMap(type -> Flux.fromIterable(eiJobs.getJobsForType(type))) //
+ .flatMap(job -> notifyProducerJobStarted(producer, job, retrySpec), maxNoOfParalellRequests) //
+ .onErrorResume(t -> {
+ logger.error("Could not restart EI Job for producer: {}, reason :{}", producer.getId(), t.getMessage());
+ return Flux.empty();
+ }) //
+ .subscribe();
+ }
+
+ private Mono<String> notifyProducerJobStarted(EiProducer producer, EiJob eiJob, Retry retrySpec) {
ProducerJobInfo request = new ProducerJobInfo(eiJob);
String body = gson.toJson(request);
- return restClient.post(producer.getJobCallbackUrl(), body)
- .doOnNext(resp -> logger.debug("Job subscription started OK {}", producer.getId()))
+ return restClient.post(producer.getJobCallbackUrl(), body) //
+ .retryWhen(retrySpec) //
+ .doOnNext(resp -> logger.debug("Job subscription {} started OK {}", eiJob.getId(), producer.getId())) //
.onErrorResume(throwable -> {
logger.warn("Job subscription failed {}", producer.getId(), throwable.toString());
return Mono.empty();
private Collection<EiProducer> getProducers(EiJob eiJob) {
try {
- return this.eiTypes.getType(eiJob.typeId()).getProducers();
+ return this.eiTypes.getType(eiJob.getTypeId()).getProducers();
} catch (Exception e) {
return new Vector<>();
}