Code Review
/
nonrtric.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
review
|
tree
raw
|
inline
| side by side
Merge "Add trace to SDNC stub O-RU Closed Loop usecase"
[nonrtric.git]
/
dmaap-adaptor-java
/
src
/
main
/
java
/
org
/
oran
/
dmaapadapter
/
tasks
/
KafkaTopicConsumers.java
diff --git
a/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java
b/dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java
index
0ed85c6
..
4809017
100644
(file)
--- a/
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java
+++ b/
dmaap-adaptor-java/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicConsumers.java
@@
-30,6
+30,7
@@
import org.oran.dmaapadapter.repository.InfoType;
import org.oran.dmaapadapter.repository.InfoTypes;
import org.oran.dmaapadapter.repository.Job;
import org.oran.dmaapadapter.repository.Jobs;
import org.oran.dmaapadapter.repository.InfoTypes;
import org.oran.dmaapadapter.repository.Job;
import org.oran.dmaapadapter.repository.Jobs;
+import org.oran.dmaapadapter.repository.MultiMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
@@
-46,7
+47,7
@@
public class KafkaTopicConsumers {
private final Map<String, KafkaTopicListener> topicListeners = new HashMap<>(); // Key is typeId
@Getter
private final Map<String, KafkaTopicListener> topicListeners = new HashMap<>(); // Key is typeId
@Getter
- private final M
ap<String, KafkaJobDataConsumer> consumers = new HashMap<>(); // Key is
jobId
+ private final M
ultiMap<KafkaJobDataConsumer> consumers = new MultiMap<>(); // Key is typeId,
jobId
private static final int CONSUMER_SUPERVISION_INTERVAL_MS = 1000 * 60 * 3;
private static final int CONSUMER_SUPERVISION_INTERVAL_MS = 1000 * 60 * 3;
@@
-70,22
+71,25
@@
public class KafkaTopicConsumers {
public void onJobRemoved(Job job) {
removeJob(job);
}
public void onJobRemoved(Job job) {
removeJob(job);
}
-
});
}
public synchronized void addJob(Job job) {
});
}
public synchronized void addJob(Job job) {
- if (this.consumers.get(job.getId()) == null && job.getType().isKafkaTopicDefined()) {
+ if (job.getType().isKafkaTopicDefined()) {
+ removeJob(job);
logger.debug("Kafka job added {}", job.getId());
KafkaTopicListener topicConsumer = topicListeners.get(job.getType().getId());
logger.debug("Kafka job added {}", job.getId());
KafkaTopicListener topicConsumer = topicListeners.get(job.getType().getId());
+ if (consumers.get(job.getType().getId()).isEmpty()) {
+ topicConsumer.start();
+ }
KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(job);
KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(job);
- subscription.start(topicConsumer.getOutput());
- consumers.put(job.getId(), subscription);
+ subscription.start(topicConsumer.getOutput()
.asFlux()
);
+ consumers.put(job.get
Type().getId(), job.get
Id(), subscription);
}
}
public synchronized void removeJob(Job job) {
}
}
public synchronized void removeJob(Job job) {
- KafkaJobDataConsumer d = consumers.remove(job.getId());
+ KafkaJobDataConsumer d = consumers.remove(job.get
Type().getId(), job.get
Id());
if (d != null) {
logger.debug("Kafka job removed {}", job.getId());
d.stop();
if (d != null) {
logger.debug("Kafka job removed {}", job.getId());
d.stop();
@@
-93,10
+97,9
@@
public class KafkaTopicConsumers {
}
@Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS)
}
@Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS)
- public synchronized void restartNonRunningTasks() {
-
- for (KafkaJobDataConsumer consumer : consumers.values()) {
- if (!consumer.isRunning()) {
+ public synchronized void restartNonRunningTopics() {
+ for (String typeId : this.consumers.keySet()) {
+ for (KafkaJobDataConsumer consumer : this.consumers.get(typeId)) {
restartTopic(consumer);
}
}
restartTopic(consumer);
}
}
@@
-110,10
+113,6
@@
public class KafkaTopicConsumers {
}
private void restartConsumersOfType(KafkaTopicListener topic, InfoType type) {
}
private void restartConsumersOfType(KafkaTopicListener topic, InfoType type) {
- this.consumers.forEach((jobId, consumer) -> {
- if (consumer.getJob().getType().getId().equals(type.getId())) {
- consumer.start(topic.getOutput());
- }
- });
+ this.consumers.get(type.getId()).forEach(consumer -> consumer.start(topic.getOutput().asFlux()));
}
}
}
}