import org.oransc.enrichment.clients.AsyncRestClient;
import org.oransc.enrichment.clients.AsyncRestClientFactory;
import org.oransc.enrichment.configuration.ApplicationConfig;
-import org.oransc.enrichment.repository.EiJob;
-import org.oransc.enrichment.repository.EiJobs;
-import org.oransc.enrichment.repository.EiProducer;
-import org.oransc.enrichment.repository.EiProducers;
+import org.oransc.enrichment.repository.InfoJob;
+import org.oransc.enrichment.repository.InfoJobs;
+import org.oransc.enrichment.repository.InfoProducer;
+import org.oransc.enrichment.repository.InfoProducers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.util.retry.Retry;
/**
- * Callbacks to the EiProducer
+ * Callbacks to the Producer
*/
@SuppressWarnings("java:S3457") // No need to call "toString()" method as formatting and string ..
public class ProducerCallbacks {
this.restClient = restClientFactory.createRestClientNoHttpProxy("");
}
- public Mono<String> healthCheck(EiProducer producer) {
+ public Mono<String> healthCheck(InfoProducer producer) {
return restClient.get(producer.getProducerSupervisionCallbackUrl());
}
- public void stopEiJob(EiJob eiJob, EiProducers eiProducers) {
- for (EiProducer producer : getProducersForJob(eiJob, eiProducers)) {
- String url = producer.getJobCallbackUrl() + "/" + eiJob.getId();
- producer.setJobDisabled(eiJob);
+ public void stopInfoJob(InfoJob infoJob, InfoProducers infoProducers) {
+ for (InfoProducer producer : getProducersForJob(infoJob, infoProducers)) {
+ String url = producer.getJobCallbackUrl() + "/" + infoJob.getId();
+ producer.setJobDisabled(infoJob);
restClient.delete(url) //
.subscribe(response -> logger.debug("Producer job deleted OK {}", producer.getId()), //
throwable -> logger.warn("Producer job delete failed {} {}", producer.getId(),
/**
* Start a job in all producers that suports the job type
*
- * @param eiJob an EI job
+ * @param infoJob an Information Job
* @return the number of producers that returned OK
*/
- public Mono<Integer> startInfoSubscriptionJob(EiJob eiJob, EiProducers eiProducers) {
+ public Mono<Integer> startInfoSubscriptionJob(InfoJob infoJob, InfoProducers infoProducers) {
Retry retrySpec = Retry.fixedDelay(1, Duration.ofSeconds(1));
- return Flux.fromIterable(getProducersForJob(eiJob, eiProducers)) //
- .flatMap(eiProducer -> startEiJob(eiProducer, eiJob, retrySpec)) //
+ return Flux.fromIterable(getProducersForJob(infoJob, infoProducers)) //
+ .flatMap(infoProducer -> startInfoJob(infoProducer, infoJob, retrySpec)) //
.collectList() //
.flatMap(okResponses -> Mono.just(Integer.valueOf(okResponses.size()))); //
}
* Start all jobs for one producer
*
* @param producer
- * @param eiJobs
+ * @param infoJobs
*/
- public Flux<String> startEiJobs(EiProducer producer, EiJobs eiJobs) {
+ public Flux<String> startInfoJobs(InfoProducer producer, InfoJobs infoJobs) {
final int maxNoOfParalellRequests = 10;
Retry retrySpec = Retry.backoff(3, Duration.ofSeconds(1));
- return Flux.fromIterable(producer.getEiTypes()) //
- .flatMap(type -> Flux.fromIterable(eiJobs.getJobsForType(type))) //
- .flatMap(job -> startEiJob(producer, job, retrySpec), maxNoOfParalellRequests);
+ return Flux.fromIterable(producer.getInfoTypes()) //
+ .flatMap(type -> Flux.fromIterable(infoJobs.getJobsForType(type))) //
+ .flatMap(job -> startInfoJob(producer, job, retrySpec), maxNoOfParalellRequests);
}
- public Mono<String> startEiJob(EiProducer producer, EiJob eiJob, Retry retrySpec) {
- ProducerJobInfo request = new ProducerJobInfo(eiJob);
+ public Mono<String> startInfoJob(InfoProducer producer, InfoJob infoJob, Retry retrySpec) {
+ ProducerJobInfo request = new ProducerJobInfo(infoJob);
String body = gson.toJson(request);
return restClient.post(producer.getJobCallbackUrl(), body) //
.retryWhen(retrySpec) //
- .doOnNext(resp -> logger.debug("Job subscription {} started OK {}", eiJob.getId(), producer.getId())) //
+ .doOnNext(resp -> logger.debug("Job subscription {} started OK {}", infoJob.getId(), producer.getId())) //
.onErrorResume(throwable -> {
- producer.setJobDisabled(eiJob);
+ producer.setJobDisabled(infoJob);
logger.warn("Job subscription failed {}", producer.getId(), throwable.toString());
return Mono.empty();
}) //
- .doOnNext(resp -> producer.setJobEnabled(eiJob));
+ .doOnNext(resp -> producer.setJobEnabled(infoJob));
}
- private Collection<EiProducer> getProducersForJob(EiJob eiJob, EiProducers eiProducers) {
- return eiProducers.getProducersForType(eiJob.getTypeId());
+ private Collection<InfoProducer> getProducersForJob(InfoJob infoJob, InfoProducers infoProducers) {
+ return infoProducers.getProducersForType(infoJob.getTypeId());
}
}