Minor changes, renamed some classes.
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
Issue-ID: NONRTRIC-773
Change-Id: I1de5f8d88877b7a8f1693576ac52ca3bf5b5be51
* owner via REST calls.
*/
@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
-public class HttpDataConsumer extends DataConsumer {
- private static final Logger logger = LoggerFactory.getLogger(HttpDataConsumer.class);
+public class HttpJobDataDistributor extends JobDataDistributor {
+ private static final Logger logger = LoggerFactory.getLogger(HttpJobDataDistributor.class);
- public HttpDataConsumer(Job job) {
+ public HttpJobDataDistributor(Job job) {
super(job);
}
* owner via REST calls.
*/
@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
-public abstract class DataConsumer {
- private static final Logger logger = LoggerFactory.getLogger(DataConsumer.class);
+public abstract class JobDataDistributor {
+ private static final Logger logger = LoggerFactory.getLogger(JobDataDistributor.class);
@Getter
private final Job job;
private Disposable subscription;
}
}
- protected DataConsumer(Job job) {
+ protected JobDataDistributor(Job job) {
this.job = job;
}
this.subscription = handleReceivedMessage(input, job) //
.flatMap(this::sendToClient, job.getParameters().getMaxConcurrency()) //
.onErrorResume(this::handleError) //
- .subscribe(this::handleConsumerSentOk, //
+ .subscribe(this::handleSentOk, //
this::handleExceptionInStream, //
() -> logger.warn("HttpDataConsumer stopped jobId: {}", job.getId()));
}
}
}
- private void handleConsumerSentOk(String data) {
+ private void handleSentOk(String data) {
this.errorStats.handleOkFromConsumer();
}
* owner via REST calls.
*/
@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
-public class KafkaDataConsumer extends DataConsumer {
- private static final Logger logger = LoggerFactory.getLogger(KafkaDataConsumer.class);
+public class KafkaJobDataDistributor extends JobDataDistributor {
+ private static final Logger logger = LoggerFactory.getLogger(KafkaJobDataDistributor.class);
private KafkaSender<String, String> sender;
private final ApplicationConfig appConfig;
- public KafkaDataConsumer(Job job, ApplicationConfig appConfig) {
+ public KafkaJobDataDistributor(Job job, ApplicationConfig appConfig) {
super(job);
this.appConfig = appConfig;
}
private final Map<String, TopicListener> dmaapTopicListeners = new HashMap<>(); // Key is typeId
@Getter
- private final MultiMap<DataConsumer> dataConsumers = new MultiMap<>(); // Key is typeId, jobId
+ private final MultiMap<JobDataDistributor> dataDistributors = new MultiMap<>(); // Key is typeId, jobId
private final ApplicationConfig appConfig;
removeJob(job);
logger.debug("Job added {}", job.getId());
if (job.getType().isKafkaTopicDefined()) {
- addConsumer(job, dataConsumers, kafkaTopicListeners);
+ addConsumer(job, dataDistributors, kafkaTopicListeners);
}
if (job.getType().isDmaapTopicDefined()) {
- addConsumer(job, dataConsumers, dmaapTopicListeners);
+ addConsumer(job, dataDistributors, dmaapTopicListeners);
}
}
- private DataConsumer createConsumer(Job job) {
- return !Strings.isEmpty(job.getParameters().getKafkaOutputTopic()) ? new KafkaDataConsumer(job, appConfig)
- : new HttpDataConsumer(job);
+ private JobDataDistributor createConsumer(Job job) {
+ return !Strings.isEmpty(job.getParameters().getKafkaOutputTopic()) ? new KafkaJobDataDistributor(job, appConfig)
+ : new HttpJobDataDistributor(job);
}
- private void addConsumer(Job job, MultiMap<DataConsumer> consumers, Map<String, TopicListener> topicListeners) {
+ private void addConsumer(Job job, MultiMap<JobDataDistributor> distributors,
+ Map<String, TopicListener> topicListeners) {
TopicListener topicListener = topicListeners.get(job.getType().getId());
- DataConsumer consumer = createConsumer(job);
- consumer.start(topicListener.getFlux());
- consumers.put(job.getType().getId(), job.getId(), consumer);
+ JobDataDistributor distributor = createConsumer(job);
+ distributor.start(topicListener.getFlux());
+ distributors.put(job.getType().getId(), job.getId(), distributor);
}
public synchronized void removeJob(Job job) {
- removeJob(job, dataConsumers);
+ removeJob(job, dataDistributors);
}
- private static void removeJob(Job job, MultiMap<DataConsumer> consumers) {
- DataConsumer consumer = consumers.remove(job.getType().getId(), job.getId());
- if (consumer != null) {
+ private static void removeJob(Job job, MultiMap<JobDataDistributor> distributors) {
+ JobDataDistributor distributor = distributors.remove(job.getType().getId(), job.getId());
+ if (distributor != null) {
logger.debug("Job removed {}", job.getId());
- consumer.stop();
+ distributor.stop();
}
}
import org.oran.dmaapadapter.repository.Jobs;
import org.oran.dmaapadapter.repository.filters.PmReport;
import org.oran.dmaapadapter.repository.filters.PmReportFilter;
-import org.oran.dmaapadapter.tasks.DataConsumer;
+import org.oran.dmaapadapter.tasks.JobDataDistributor;
import org.oran.dmaapadapter.tasks.ProducerRegstrationTask;
import org.oran.dmaapadapter.tasks.TopicListener;
import org.oran.dmaapadapter.tasks.TopicListeners;
this.icsSimulatorController.addJob(kafkaJobInfo, JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
- DataConsumer kafkaConsumer = this.topicListeners.getDataConsumers().get(TYPE_ID, JOB_ID);
+ JobDataDistributor kafkaConsumer = this.topicListeners.getDataDistributors().get(TYPE_ID, JOB_ID);
// Handle received data from Kafka, check that it has been posted to the
// consumer
this.icsSimulatorController.deleteJob(job.getId(), restClient());
}
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
- await().untilAsserted(() -> assertThat(this.topicListeners.getDataConsumers().keySet()).isEmpty());
+ await().untilAsserted(() -> assertThat(this.topicListeners.getDataDistributors().keySet()).isEmpty());
this.consumerController.testResults.reset();
this.icsSimulatorController.testResults.reset();