Code Review
/
nonrtric.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
Added some logging
[nonrtric.git]
/
enrichment-coordinator-service
/
src
/
main
/
java
/
org
/
oransc
/
enrichment
/
tasks
/
ProducerSupervision.java
diff --git
a/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java
b/enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java
index
c2e4b97
..
d73127f
100644
(file)
--- a/
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java
+++ b/
enrichment-coordinator-service/src/main/java/org/oransc/enrichment/tasks/ProducerSupervision.java
@@
-20,8
+20,6
@@
package org.oransc.enrichment.tasks;
package org.oransc.enrichment.tasks;
-import java.time.Duration;
-
import org.oransc.enrichment.configuration.ApplicationConfig;
import org.oransc.enrichment.controllers.consumer.ConsumerCallbacks;
import org.oransc.enrichment.controllers.producer.ProducerCallbacks;
import org.oransc.enrichment.configuration.ApplicationConfig;
import org.oransc.enrichment.controllers.consumer.ConsumerCallbacks;
import org.oransc.enrichment.controllers.producer.ProducerCallbacks;
@@
-86,19
+84,15
@@
public class ProducerSupervision {
}
private Mono<?> checkProducerJobs(EiProducer producer) {
}
private Mono<?> checkProducerJobs(EiProducer producer) {
+ final int MAX_CONCURRENCY = 10;
return getEiJobs(producer) //
.filter(eiJob -> !producer.isJobEnabled(eiJob)) //
return getEiJobs(producer) //
.filter(eiJob -> !producer.isJobEnabled(eiJob)) //
- .flatMap(eiJob ->
startEiJob(producer, eiJob), 1
) //
+ .flatMap(eiJob ->
producerCallbacks.startEiJob(producer, eiJob, Retry.max(1)), MAX_CONCURRENCY
) //
.collectList() //
.collectList() //
- .flatMapMany(
eiJob
-> consumerCallbacks.notifyJobStatus(producer.getEiTypes())) //
+ .flatMapMany(
startedJobs
-> consumerCallbacks.notifyJobStatus(producer.getEiTypes())) //
.collectList();
}
.collectList();
}
- private Mono<String> startEiJob(EiProducer producer, EiJob eiJob) {
- Retry retrySpec = Retry.fixedDelay(1, Duration.ofSeconds(1));
- return producerCallbacks.startEiJob(producer, eiJob, retrySpec);
- }
-
private Flux<EiJob> getEiJobs(EiProducer producer) {
return Flux.fromIterable(producer.getEiTypes()) //
.flatMap(eiType -> Flux.fromIterable(eiJobs.getJobsForType(eiType)));
private Flux<EiJob> getEiJobs(EiProducer producer) {
return Flux.fromIterable(producer.getEiTypes()) //
.flatMap(eiType -> Flux.fromIterable(eiJobs.getJobsForType(eiType)));