From: PatrikBuhr Date: Thu, 5 May 2022 09:20:20 +0000 (+0200) Subject: PM Filter X-Git-Tag: 1.1.0~5 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=fae84c040f66caaebc48505b6aa3e5c85649d95d;p=nonrtric%2Fplt%2Fdmaapadapter.git PM Filter Minor changes. Signed-off-by: PatrikBuhr Issue-ID: NONRTRIC-743 Change-Id: I9cab3d7bf8e4f9d776d186f1d623e7aa0f9efb2a --- diff --git a/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java index 854cdc5..6b4f253 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/DmaapTopicListener.java @@ -28,7 +28,6 @@ import org.oran.dmaapadapter.clients.AsyncRestClient; import org.oran.dmaapadapter.clients.AsyncRestClientFactory; import org.oran.dmaapadapter.configuration.ApplicationConfig; import org.oran.dmaapadapter.repository.InfoType; -import org.oran.dmaapadapter.repository.Jobs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,31 +41,30 @@ import reactor.core.publisher.Sinks.Many; * The class fetches incoming requests from DMAAP and sends them further to the * consumers that has a job for this InformationType. */ -public class DmaapTopicListener { +public class DmaapTopicListener implements TopicListener { private static final Duration TIME_BETWEEN_DMAAP_RETRIES = Duration.ofSeconds(3); private static final Logger logger = LoggerFactory.getLogger(DmaapTopicListener.class); private final AsyncRestClient dmaapRestClient; - protected final ApplicationConfig applicationConfig; - protected final InfoType type; - protected final Jobs jobs; + private final ApplicationConfig applicationConfig; + private final InfoType type; private final com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create(); private Many output; private Disposable topicReceiverTask; - public DmaapTopicListener(ApplicationConfig applicationConfig, InfoType type, Jobs jobs) { + public DmaapTopicListener(ApplicationConfig applicationConfig, InfoType type) { AsyncRestClientFactory restclientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig()); this.dmaapRestClient = restclientFactory.createRestClientNoHttpProxy(""); this.applicationConfig = applicationConfig; this.type = type; - this.jobs = jobs; - } + @Override public Many getOutput() { return this.output; } + @Override public void start() { stop(); @@ -82,6 +80,7 @@ public class DmaapTopicListener { this::onComplete); // } + @Override public void stop() { if (topicReceiverTask != null) { topicReceiverTask.dispose(); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java index f32c585..4b58013 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/KafkaTopicListener.java @@ -43,7 +43,7 @@ import reactor.kafka.receiver.ReceiverOptions; * to a multi cast sink, which several other streams can connect to. */ @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally -public class KafkaTopicListener { +public class KafkaTopicListener implements TopicListener { private static final Logger logger = LoggerFactory.getLogger(KafkaTopicListener.class); private final ApplicationConfig applicationConfig; private final InfoType type; @@ -55,10 +55,12 @@ public class KafkaTopicListener { this.type = type; } + @Override public Many getOutput() { return this.output; } + @Override public void start() { stop(); final int CONSUMER_BACKPRESSURE_BUFFER_SIZE = 1024 * 10; @@ -72,6 +74,7 @@ public class KafkaTopicListener { () -> logger.warn("KafkaTopicReceiver stopped")); } + @Override public void stop() { if (topicReceiverTask != null) { topicReceiverTask.dispose(); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java b/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java index 24b53c9..f3b663b 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/ProducerRegstrationTask.java @@ -68,7 +68,7 @@ public class ProducerRegstrationTask { private static final String PRODUCER_ID = "DmaapGenericInfoProducer"; @Getter private boolean isRegisteredInIcs = false; - private static final int REGISTRATION_SUPERVISION_INTERVAL_MS = 1000 * 5; + private static final int REGISTRATION_SUPERVISION_INTERVAL_MS = 1000 * 10; public ProducerRegstrationTask(@Autowired ApplicationConfig applicationConfig, @Autowired InfoTypes types) { AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig()); diff --git a/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java b/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java new file mode 100644 index 0000000..503f113 --- /dev/null +++ b/src/main/java/org/oran/dmaapadapter/tasks/TopicListener.java @@ -0,0 +1,31 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2022 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oran.dmaapadapter.tasks; + +import reactor.core.publisher.Sinks.Many; + +public interface TopicListener { + public void start(); + + public void stop(); + + public Many getOutput(); +} diff --git a/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java b/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java index 1b8a740..5176867 100644 --- a/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java +++ b/src/main/java/org/oran/dmaapadapter/tasks/TopicListeners.java @@ -44,8 +44,8 @@ import org.springframework.stereotype.Component; public class TopicListeners { private static final Logger logger = LoggerFactory.getLogger(TopicListeners.class); - private final Map kafkaTopicListeners = new HashMap<>(); // Key is typeId - private final Map dmaapTopicListeners = new HashMap<>(); // Key is typeId + private final Map kafkaTopicListeners = new HashMap<>(); // Key is typeId + private final Map dmaapTopicListeners = new HashMap<>(); // Key is typeId @Getter private final MultiMap kafkaConsumers = new MultiMap<>(); // Key is typeId, jobId @@ -61,7 +61,7 @@ public class TopicListeners { kafkaTopicListeners.put(type.getId(), topicConsumer); } if (type.isDmaapTopicDefined()) { - DmaapTopicListener topicListener = new DmaapTopicListener(appConfig, type, jobs); + DmaapTopicListener topicListener = new DmaapTopicListener(appConfig, type); dmaapTopicListeners.put(type.getId(), topicListener); } } @@ -83,35 +83,34 @@ public class TopicListeners { removeJob(job); logger.debug("Job added {}", job.getId()); if (job.getType().isKafkaTopicDefined()) { - KafkaTopicListener topicListener = kafkaTopicListeners.get(job.getType().getId()); - if (kafkaConsumers.get(job.getType().getId()).isEmpty()) { - topicListener.start(); - } - JobDataConsumer subscription = new JobDataConsumer(job); - subscription.start(topicListener.getOutput().asFlux()); - kafkaConsumers.put(job.getType().getId(), job.getId(), subscription); + addJob(job, kafkaConsumers, kafkaTopicListeners); } if (job.getType().isDmaapTopicDefined()) { - DmaapTopicListener topicListener = dmaapTopicListeners.get(job.getType().getId()); - if (dmaapConsumers.get(job.getType().getId()).isEmpty()) { - topicListener.start(); - } - JobDataConsumer subscription = new JobDataConsumer(job); - subscription.start(topicListener.getOutput().asFlux()); - dmaapConsumers.put(job.getType().getId(), job.getId(), subscription); + addJob(job, dmaapConsumers, dmaapTopicListeners); } } - public synchronized void removeJob(Job job) { - JobDataConsumer consumer = kafkaConsumers.remove(job.getType().getId(), job.getId()); - if (consumer != null) { - logger.debug("Kafka job removed {}", job.getId()); - consumer.stop(); + private static void addJob(Job job, MultiMap consumers, + Map topicListeners) { + TopicListener topicListener = topicListeners.get(job.getType().getId()); + if (consumers.get(job.getType().getId()).isEmpty()) { + topicListener.start(); } - consumer = this.dmaapConsumers.remove(job.getType().getId(), job.getId()); + JobDataConsumer subscription = new JobDataConsumer(job); + subscription.start(topicListener.getOutput().asFlux()); + consumers.put(job.getType().getId(), job.getId(), subscription); + } + + public synchronized void removeJob(Job job) { + removeJob(job, kafkaConsumers); + removeJob(job, dmaapConsumers); + } + + private static void removeJob(Job job, MultiMap consumers) { + JobDataConsumer consumer = consumers.remove(job.getType().getId(), job.getId()); if (consumer != null) { - logger.debug("DMAAP job removed {}", job.getId()); + logger.debug("Job removed {}", job.getId()); consumer.stop(); } } @@ -121,20 +120,22 @@ public class TopicListeners { for (String typeId : this.kafkaConsumers.keySet()) { for (JobDataConsumer consumer : this.kafkaConsumers.get(typeId)) { if (!consumer.isRunning()) { - restartKafkaTopic(consumer); + restartTopicAndConsumers(this.kafkaTopicListeners, this.kafkaConsumers, consumer); } } } } - private void restartKafkaTopic(JobDataConsumer consumer) { + private static void restartTopicAndConsumers(Map topicListeners, + MultiMap consumers, JobDataConsumer consumer) { InfoType type = consumer.getJob().getType(); - KafkaTopicListener topic = this.kafkaTopicListeners.get(type.getId()); + TopicListener topic = topicListeners.get(type.getId()); topic.start(); - restartConsumersOfType(topic, type); + restartConsumersOfType(consumers, topic, type); } - private void restartConsumersOfType(KafkaTopicListener topic, InfoType type) { - this.kafkaConsumers.get(type.getId()).forEach(consumer -> consumer.start(topic.getOutput().asFlux())); + private static void restartConsumersOfType(MultiMap consumers, TopicListener topic, + InfoType type) { + consumers.get(type.getId()).forEach(consumer -> consumer.start(topic.getOutput().asFlux())); } }