import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
import org.oran.dmaapadapter.configuration.ApplicationConfig;
import org.oran.dmaapadapter.repository.InfoType;
-import org.oran.dmaapadapter.repository.Jobs;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* The class fetches incoming requests from DMAAP and sends them further to the
* consumers that has a job for this InformationType.
*/
-public class DmaapTopicListener {
+public class DmaapTopicListener implements TopicListener {
private static final Duration TIME_BETWEEN_DMAAP_RETRIES = Duration.ofSeconds(3);
private static final Logger logger = LoggerFactory.getLogger(DmaapTopicListener.class);
private final AsyncRestClient dmaapRestClient;
- protected final ApplicationConfig applicationConfig;
- protected final InfoType type;
- protected final Jobs jobs;
+ private final ApplicationConfig applicationConfig;
+ private final InfoType type;
private final com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
private Many<String> output;
private Disposable topicReceiverTask;
- public DmaapTopicListener(ApplicationConfig applicationConfig, InfoType type, Jobs jobs) {
+ public DmaapTopicListener(ApplicationConfig applicationConfig, InfoType type) {
AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
this.dmaapRestClient = restclientFactory.createRestClientNoHttpProxy("");
this.applicationConfig = applicationConfig;
this.type = type;
- this.jobs = jobs;
-
}
+ @Override
public Many<String> getOutput() {
return this.output;
}
+ @Override
public void start() {
stop();
this::onComplete); //
}
+ @Override
public void stop() {
if (topicReceiverTask != null) {
topicReceiverTask.dispose();
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2022 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oran.dmaapadapter.tasks;
+
+import reactor.core.publisher.Sinks.Many;
+
+public interface TopicListener {
+ public void start();
+
+ public void stop();
+
+ public Many<String> getOutput();
+}
public class TopicListeners {
private static final Logger logger = LoggerFactory.getLogger(TopicListeners.class);
- private final Map<String, KafkaTopicListener> kafkaTopicListeners = new HashMap<>(); // Key is typeId
- private final Map<String, DmaapTopicListener> dmaapTopicListeners = new HashMap<>(); // Key is typeId
+ private final Map<String, TopicListener> kafkaTopicListeners = new HashMap<>(); // Key is typeId
+ private final Map<String, TopicListener> dmaapTopicListeners = new HashMap<>(); // Key is typeId
@Getter
private final MultiMap<JobDataConsumer> kafkaConsumers = new MultiMap<>(); // Key is typeId, jobId
kafkaTopicListeners.put(type.getId(), topicConsumer);
}
if (type.isDmaapTopicDefined()) {
- DmaapTopicListener topicListener = new DmaapTopicListener(appConfig, type, jobs);
+ DmaapTopicListener topicListener = new DmaapTopicListener(appConfig, type);
dmaapTopicListeners.put(type.getId(), topicListener);
}
}
removeJob(job);
logger.debug("Job added {}", job.getId());
if (job.getType().isKafkaTopicDefined()) {
- KafkaTopicListener topicListener = kafkaTopicListeners.get(job.getType().getId());
- if (kafkaConsumers.get(job.getType().getId()).isEmpty()) {
- topicListener.start();
- }
- JobDataConsumer subscription = new JobDataConsumer(job);
- subscription.start(topicListener.getOutput().asFlux());
- kafkaConsumers.put(job.getType().getId(), job.getId(), subscription);
+ addJob(job, kafkaConsumers, kafkaTopicListeners);
}
if (job.getType().isDmaapTopicDefined()) {
- DmaapTopicListener topicListener = dmaapTopicListeners.get(job.getType().getId());
- if (dmaapConsumers.get(job.getType().getId()).isEmpty()) {
- topicListener.start();
- }
- JobDataConsumer subscription = new JobDataConsumer(job);
- subscription.start(topicListener.getOutput().asFlux());
- dmaapConsumers.put(job.getType().getId(), job.getId(), subscription);
+ addJob(job, dmaapConsumers, dmaapTopicListeners);
}
}
- public synchronized void removeJob(Job job) {
- JobDataConsumer consumer = kafkaConsumers.remove(job.getType().getId(), job.getId());
- if (consumer != null) {
- logger.debug("Kafka job removed {}", job.getId());
- consumer.stop();
+ private static void addJob(Job job, MultiMap<JobDataConsumer> consumers,
+ Map<String, TopicListener> topicListeners) {
+ TopicListener topicListener = topicListeners.get(job.getType().getId());
+ if (consumers.get(job.getType().getId()).isEmpty()) {
+ topicListener.start();
}
- consumer = this.dmaapConsumers.remove(job.getType().getId(), job.getId());
+ JobDataConsumer subscription = new JobDataConsumer(job);
+ subscription.start(topicListener.getOutput().asFlux());
+ consumers.put(job.getType().getId(), job.getId(), subscription);
+ }
+
+ public synchronized void removeJob(Job job) {
+ removeJob(job, kafkaConsumers);
+ removeJob(job, dmaapConsumers);
+ }
+
+ private static void removeJob(Job job, MultiMap<JobDataConsumer> consumers) {
+ JobDataConsumer consumer = consumers.remove(job.getType().getId(), job.getId());
if (consumer != null) {
- logger.debug("DMAAP job removed {}", job.getId());
+ logger.debug("Job removed {}", job.getId());
consumer.stop();
}
}
for (String typeId : this.kafkaConsumers.keySet()) {
for (JobDataConsumer consumer : this.kafkaConsumers.get(typeId)) {
if (!consumer.isRunning()) {
- restartKafkaTopic(consumer);
+ restartTopicAndConsumers(this.kafkaTopicListeners, this.kafkaConsumers, consumer);
}
}
}
}
- private void restartKafkaTopic(JobDataConsumer consumer) {
+ private static void restartTopicAndConsumers(Map<String, TopicListener> topicListeners,
+ MultiMap<JobDataConsumer> consumers, JobDataConsumer consumer) {
InfoType type = consumer.getJob().getType();
- KafkaTopicListener topic = this.kafkaTopicListeners.get(type.getId());
+ TopicListener topic = topicListeners.get(type.getId());
topic.start();
- restartConsumersOfType(topic, type);
+ restartConsumersOfType(consumers, topic, type);
}
- private void restartConsumersOfType(KafkaTopicListener topic, InfoType type) {
- this.kafkaConsumers.get(type.getId()).forEach(consumer -> consumer.start(topic.getOutput().asFlux()));
+ private static void restartConsumersOfType(MultiMap<JobDataConsumer> consumers, TopicListener topic,
+ InfoType type) {
+ consumers.get(type.getId()).forEach(consumer -> consumer.start(topic.getOutput().asFlux()));
}
}