Code Review
/
nonrtric.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
Merge "Updated test of info-types"
[nonrtric.git]
/
enrichment-coordinator-service
/
src
/
main
/
java
/
org
/
oransc
/
enrichment
/
tasks
/
ProducerSupervision.java
diff --git
a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java
b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java
index
c2e4b97
..
7852bef
100644
(file)
--- a/
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java
+++ b/
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java
@@
-20,11
+20,9
@@
package org.oransc.enrichment.tasks;
package org.oransc.enrichment.tasks;
-import java.time.Duration;
-
import org.oransc.enrichment.configuration.ApplicationConfig;
import org.oransc.enrichment.configuration.ApplicationConfig;
-import org.oransc.enrichment.controllers.
consumer.Consumer
Callbacks;
-import org.oransc.enrichment.controllers.producer.ProducerCallbacks;
+import org.oransc.enrichment.controllers.
a1e.A1e
Callbacks;
+import org.oransc.enrichment.controllers.
r1
producer.ProducerCallbacks;
import org.oransc.enrichment.repository.EiJob;
import org.oransc.enrichment.repository.EiJobs;
import org.oransc.enrichment.repository.EiProducer;
import org.oransc.enrichment.repository.EiJob;
import org.oransc.enrichment.repository.EiJobs;
import org.oransc.enrichment.repository.EiProducer;
@@
-52,11
+50,11
@@
public class ProducerSupervision {
private final EiProducers eiProducers;
private final EiJobs eiJobs;
private final ProducerCallbacks producerCallbacks;
private final EiProducers eiProducers;
private final EiJobs eiJobs;
private final ProducerCallbacks producerCallbacks;
- private final
Consumer
Callbacks consumerCallbacks;
+ private final
A1e
Callbacks consumerCallbacks;
@Autowired
public ProducerSupervision(ApplicationConfig applicationConfig, EiProducers eiProducers, EiJobs eiJobs,
@Autowired
public ProducerSupervision(ApplicationConfig applicationConfig, EiProducers eiProducers, EiJobs eiJobs,
- ProducerCallbacks producerCallbacks,
Consumer
Callbacks consumerCallbacks) {
+ ProducerCallbacks producerCallbacks,
A1e
Callbacks consumerCallbacks) {
this.eiProducers = eiProducers;
this.eiJobs = eiJobs;
this.producerCallbacks = producerCallbacks;
this.eiProducers = eiProducers;
this.eiJobs = eiJobs;
this.producerCallbacks = producerCallbacks;
@@
-86,19
+84,15
@@
public class ProducerSupervision {
}
private Mono<?> checkProducerJobs(EiProducer producer) {
}
private Mono<?> checkProducerJobs(EiProducer producer) {
+ final int MAX_CONCURRENCY = 10;
return getEiJobs(producer) //
.filter(eiJob -> !producer.isJobEnabled(eiJob)) //
return getEiJobs(producer) //
.filter(eiJob -> !producer.isJobEnabled(eiJob)) //
- .flatMap(eiJob ->
startEiJob(producer, eiJob), 1
) //
+ .flatMap(eiJob ->
producerCallbacks.startEiJob(producer, eiJob, Retry.max(1)), MAX_CONCURRENCY
) //
.collectList() //
.collectList() //
- .flatMapMany(
eiJob
-> consumerCallbacks.notifyJobStatus(producer.getEiTypes())) //
+ .flatMapMany(
startedJobs
-> consumerCallbacks.notifyJobStatus(producer.getEiTypes())) //
.collectList();
}
.collectList();
}
- private Mono<String> startEiJob(EiProducer producer, EiJob eiJob) {
- Retry retrySpec = Retry.fixedDelay(1, Duration.ofSeconds(1));
- return producerCallbacks.startEiJob(producer, eiJob, retrySpec);
- }
-
private Flux<EiJob> getEiJobs(EiProducer producer) {
return Flux.fromIterable(producer.getEiTypes()) //
.flatMap(eiType -> Flux.fromIterable(eiJobs.getJobsForType(eiType)));
private Flux<EiJob> getEiJobs(EiProducer producer) {
return Flux.fromIterable(producer.getEiTypes()) //
.flatMap(eiType -> Flux.fromIterable(eiJobs.getJobsForType(eiType)));