# The HTTP proxy (if configured) will only be used for accessing NearRT RIC:s
http.proxy-host:
http.proxy-port: 0
- ecs-base-url: https://localhost:8434
+ ics-base-url: https://localhost:8434
# Location of the component configuration file. The file will only be used if the Consul database is not used;
# configuration from the Consul will override the file.
configuration-filepath: /opt/app/dmaap-adaptor-service/data/application_configuration.json
private int localServerHttpPort;
@Getter
- @Value("${app.ecs-base-url}")
- private String ecsBaseUrl;
+ @Value("${app.ics-base-url}")
+ private String icsBaseUrl;
@Getter
@Value("${app.dmaap-adapter-base-url}")
return null;
}
+ public T get(String key1, String key2) {
+ Map<String, T> innerMap = this.map.get(key1);
+ if (innerMap == null) {
+ return null;
+ }
+ return innerMap.get(key2);
+ }
+
public Collection<T> get(String key) {
Map<String, T> innerMap = this.map.get(key);
if (innerMap == null) {
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
-import reactor.core.publisher.Sinks.Many;
/**
* The class streams data from a multi cast sink and sends the data to the Job
this.job = job;
}
- public synchronized void start(Many<String> input) {
+ public synchronized void start(Flux<String> input) {
stop();
this.errorStats.resetKafkaErrors();
this.subscription = getMessagesFromKafka(input, job) //
public synchronized void stop() {
if (this.subscription != null) {
- subscription.dispose();
- subscription = null;
+ this.subscription.dispose();
+ this.subscription = null;
}
}
return this.subscription != null;
}
- private Flux<String> getMessagesFromKafka(Many<String> input, Job job) {
- Flux<String> result = input.asFlux() //
- .filter(job::isFilterMatch);
+ private Flux<String> getMessagesFromKafka(Flux<String> input, Job job) {
+ Flux<String> result = input.filter(job::isFilterMatch);
if (job.isBuffered()) {
result = result.map(this::quote) //
public void onJobRemoved(Job job) {
removeJob(job);
}
-
});
}
topicConsumer.start();
}
KafkaJobDataConsumer subscription = new KafkaJobDataConsumer(job);
- subscription.start(topicConsumer.getOutput());
+ subscription.start(topicConsumer.getOutput().asFlux());
consumers.put(job.getType().getId(), job.getId(), subscription);
}
}
}
@Scheduled(fixedRate = CONSUMER_SUPERVISION_INTERVAL_MS)
- public synchronized void restartNonRunningTasks() {
- this.consumers.keySet().forEach(typeId -> {
- this.consumers.get(typeId).forEach(consumer -> {
- if (!consumer.isRunning()) {
- restartTopic(consumer);
- }
- });
- });
+ public synchronized void restartNonRunningTopics() {
+ for (String typeId : this.consumers.keySet()) {
+ for (KafkaJobDataConsumer consumer : this.consumers.get(typeId)) {
+ restartTopic(consumer);
+ }
+ }
}
private void restartTopic(KafkaJobDataConsumer consumer) {
}
private void restartConsumersOfType(KafkaTopicListener topic, InfoType type) {
- this.consumers.get(type.getId()).forEach((consumer) -> {
- consumer.start(topic.getOutput());
- });
+ this.consumers.get(type.getId()).forEach(consumer -> consumer.start(topic.getOutput().asFlux()));
}
}
import reactor.core.publisher.Mono;
/**
- * Registers the types and this producer in ECS. This is done when needed.
+ * Registers the types and this producer in Innformation Coordinator Service.
+ * This is done when needed.
*/
@Component
@EnableScheduling
private static final String PRODUCER_ID = "DmaapGenericInfoProducer";
@Getter
- private boolean isRegisteredInEcs = false;
+ private boolean isRegisteredInIcs = false;
private static final int REGISTRATION_SUPERVISION_INTERVAL_MS = 1000 * 5;
public ProducerRegstrationTask(@Autowired ApplicationConfig applicationConfig, @Autowired InfoTypes types) {
@Scheduled(fixedRate = REGISTRATION_SUPERVISION_INTERVAL_MS)
public void supervisionTask() {
checkRegistration() //
- .filter(isRegistrationOk -> !isRegistrationOk || !this.isRegisteredInEcs) //
+ .filter(isRegistrationOk -> !isRegistrationOk || !this.isRegisteredInIcs) //
.flatMap(isRegisterred -> registerTypesAndProducer()) //
.subscribe( //
null, //
}
private void handleRegistrationCompleted() {
- isRegisteredInEcs = true;
+ isRegisteredInIcs = true;
}
private void handleRegistrationFailure(Throwable t) {
// Returns TRUE if registration is correct
private Mono<Boolean> checkRegistration() {
- final String url = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
+ final String url = applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
return restClient.get(url) //
.flatMap(this::isRegisterredInfoCorrect) //
.onErrorResume(t -> Mono.just(Boolean.FALSE));
private Mono<Boolean> isRegisterredInfoCorrect(String registerredInfoStr) {
ProducerRegistrationInfo registerredInfo = gson.fromJson(registerredInfoStr, ProducerRegistrationInfo.class);
if (isEqual(producerRegistrationInfo(), registerredInfo)) {
- logger.trace("Already registered in ECS");
+ logger.trace("Already registered in ICS");
return Mono.just(Boolean.TRUE);
} else {
return Mono.just(Boolean.FALSE);
}
private String registerTypeUrl(InfoType type) {
- return applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId();
+ return applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId();
}
private Mono<String> registerTypesAndProducer() {
final int CONCURRENCY = 20;
final String producerUrl =
- applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
+ applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
return Flux.fromIterable(this.types.getAll()) //
.doOnNext(type -> logger.info("Registering type {}", type.getId())) //
import org.oran.dmaapadapter.repository.InfoTypes;
import org.oran.dmaapadapter.repository.Job;
import org.oran.dmaapadapter.repository.Jobs;
+import org.oran.dmaapadapter.tasks.KafkaJobDataConsumer;
+import org.oran.dmaapadapter.tasks.KafkaTopicConsumers;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
import org.springframework.test.context.junit.jupiter.SpringExtension;
import org.springframework.web.reactive.function.client.WebClientResponseException;
+import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
private ConsumerController consumerController;
@Autowired
- private EcsSimulatorController ecsSimulatorController;
+ private IcsSimulatorController icsSimulatorController;
+
+ @Autowired
+ KafkaTopicConsumers kafkaTopicConsumers;
private com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
static class TestApplicationConfig extends ApplicationConfig {
@Override
- public String getEcsBaseUrl() {
+ public String getIcsBaseUrl() {
return thisProcessUrl();
}
@AfterEach
void reset() {
this.consumerController.testResults.reset();
- this.ecsSimulatorController.testResults.reset();
+ this.icsSimulatorController.testResults.reset();
this.jobs.clear();
}
}
@Test
- void testWholeChain() throws Exception {
+ void testReceiveAndPostDataFromKafka() {
+ final String JOB_ID = "ID";
+ final String TYPE_ID = "KafkaInformationType";
+ await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+
+ // Create a job
+ Job.Parameters param = new Job.Parameters("", new Job.BufferTimeout(123, 456), 1);
+ String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL;
+ ConsumerJobInfo kafkaJobInfo =
+ new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", targetUri, "");
+
+ this.icsSimulatorController.addJob(kafkaJobInfo, JOB_ID, restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
+
+ KafkaJobDataConsumer kafkaConsumer = this.kafkaTopicConsumers.getConsumers().get(TYPE_ID, JOB_ID);
+
+ // Handle received data from Kafka, check that it has been posted to the
+ // consumer
+ kafkaConsumer.start(Flux.just("data"));
+
+ ConsumerController.TestResults consumer = this.consumerController.testResults;
+ await().untilAsserted(() -> assertThat(consumer.receivedBodies.size()).isEqualTo(1));
+ assertThat(consumer.receivedBodies.get(0)).isEqualTo("[\"data\"]");
+
+ // Test send an exception
+ kafkaConsumer.start(Flux.error(new NullPointerException()));
+
+ // Test regular restart of stopped
+ kafkaConsumer.stop();
+ this.kafkaTopicConsumers.restartNonRunningTopics();
+ await().untilAsserted(() -> assertThat(kafkaConsumer.isRunning()).isTrue());
+
+ // Delete the job
+ this.icsSimulatorController.deleteJob(JOB_ID, restClient());
+ await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+ }
+
+ @Test
+ void testReceiveAndPostDataFromDmaap() throws Exception {
final String JOB_ID = "ID";
// Register producer, Register types
- await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
- assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+ await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
// Create a job
- this.ecsSimulatorController.addJob(consumerJobInfo(), JOB_ID, restClient());
+ this.icsSimulatorController.addJob(consumerJobInfo(), JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
// Return two messages from DMAAP and verify that these are sent to the owner of
assertThat(jobs).contains(JOB_ID);
// Delete the job
- this.ecsSimulatorController.deleteJob(JOB_ID, restClient());
+ this.icsSimulatorController.deleteJob(JOB_ID, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
}
@Test
void testReRegister() throws Exception {
// Wait foir register types and producer
- await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
- assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+ await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
// Clear the registration, should trigger a re-register
- ecsSimulatorController.testResults.reset();
- await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
- assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+ icsSimulatorController.testResults.reset();
+ await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
// Just clear the registerred types, should trigger a re-register
- ecsSimulatorController.testResults.types.clear();
+ icsSimulatorController.testResults.types.clear();
await().untilAsserted(
- () -> assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(2));
- }
-
- @Test
- void testCreateKafkaJob() {
- await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
- assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
-
- final String TYPE_ID = "KafkaInformationType";
-
- Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 456), 1);
- String targetUri = baseUrl() + ConsumerController.CONSUMER_TARGET_URL;
- ConsumerJobInfo jobInfo = new ConsumerJobInfo(TYPE_ID, jsonObject(gson.toJson(param)), "owner", targetUri, "");
-
- // Create a job
- this.ecsSimulatorController.addJob(jobInfo, "JOB_ID", restClient());
- await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(1));
-
- // Delete the job
- this.ecsSimulatorController.deleteJob("JOB_ID", restClient());
- await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
+ () -> assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(2));
}
private void testErrorCode(Mono<?> request, HttpStatus expStatus, String responseContains) {
@RestController("IcsSimulatorController")
@Tag(name = "Information Coordinator Service Simulator (exists only in test)")
-public class EcsSimulatorController {
+public class IcsSimulatorController {
private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private final static Gson gson = new GsonBuilder().create();
static class TestApplicationConfig extends ApplicationConfig {
@Override
- public String getEcsBaseUrl() {
+ public String getIcsBaseUrl() {
return "https://localhost:8434";
}
}
private String ecsBaseUrl() {
- return applicationConfig.getEcsBaseUrl();
+ return applicationConfig.getIcsBaseUrl();
}
private String jobUrl(String jobId) {
@Test
void testCreateKafkaJob() {
- await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInEcs()).isTrue());
+ await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue());
final String TYPE_ID = "KafkaInformationType";
Job.Parameters param = new Job.Parameters("filter", new Job.BufferTimeout(123, 456), 1);
@Test
void testWholeChain() throws Exception {
- await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInEcs()).isTrue());
+ await().untilAsserted(() -> assertThat(producerRegstrationTask.isRegisteredInIcs()).isTrue());
createInformationJobInEcs(DMAAP_TYPE_ID, DMAAP_JOB_ID, ".*DmaapResponse.*");
private ConsumerController consumerController;
@Autowired
- private EcsSimulatorController ecsSimulatorController;
+ private IcsSimulatorController icsSimulatorController;
@Autowired
private KafkaTopicConsumers kafkaTopicConsumers;
static class TestApplicationConfig extends ApplicationConfig {
@Override
- public String getEcsBaseUrl() {
+ public String getIcsBaseUrl() {
return thisProcessUrl();
}
@AfterEach
void reset() {
this.consumerController.testResults.reset();
- this.ecsSimulatorController.testResults.reset();
+ this.icsSimulatorController.testResults.reset();
this.jobs.clear();
}
final String JOB_ID2 = "ID2";
// Register producer, Register types
- await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
- assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+ await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
// Create two jobs. One buffering and one with a filter
- this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 20), JOB_ID1,
+ this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 20), JOB_ID1,
restClient());
- this.ecsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient());
+ this.icsSimulatorController.addJob(consumerJobInfo("^Message_1$", Duration.ZERO, 0, 1), JOB_ID2, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
verifiedReceivedByConsumer("Message_1", "[\"Message_1\", \"Message_2\", \"Message_3\"]");
// Delete the jobs
- this.ecsSimulatorController.deleteJob(JOB_ID1, restClient());
- this.ecsSimulatorController.deleteJob(JOB_ID2, restClient());
+ this.icsSimulatorController.deleteJob(JOB_ID1, restClient());
+ this.icsSimulatorController.deleteJob(JOB_ID2, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty());
final String JOB_ID2 = "ID2";
// Register producer, Register types
- await().untilAsserted(() -> assertThat(ecsSimulatorController.testResults.registrationInfo).isNotNull());
- assertThat(ecsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
+ await().untilAsserted(() -> assertThat(icsSimulatorController.testResults.registrationInfo).isNotNull());
+ assertThat(icsSimulatorController.testResults.registrationInfo.supportedTypeIds).hasSize(this.types.size());
// Create two jobs.
- this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 1), JOB_ID1,
+ this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ofMillis(400), 1000, 1), JOB_ID1,
restClient());
- this.ecsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient());
+ this.icsSimulatorController.addJob(consumerJobInfo(null, Duration.ZERO, 0, 1), JOB_ID2, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isEqualTo(2));
await().untilAsserted(() -> assertThat(consumer.isRunning()).isFalse());
this.consumerController.testResults.reset();
- this.ecsSimulatorController.deleteJob(JOB_ID2, restClient()); // Delete one job
- kafkaTopicConsumers.restartNonRunningTasks();
+ this.icsSimulatorController.deleteJob(JOB_ID2, restClient()); // Delete one job
+ kafkaTopicConsumers.restartNonRunningTopics();
Thread.sleep(1000); // Restarting the input seems to take some asynch time
dataToSend = Flux.just(senderRecord("Howdy\""));
verifiedReceivedByConsumer("[\"Howdy\\\"\"]");
// Delete the jobs
- this.ecsSimulatorController.deleteJob(JOB_ID1, restClient());
- this.ecsSimulatorController.deleteJob(JOB_ID2, restClient());
+ this.icsSimulatorController.deleteJob(JOB_ID1, restClient());
+ this.icsSimulatorController.deleteJob(JOB_ID2, restClient());
await().untilAsserted(() -> assertThat(this.jobs.size()).isZero());
await().untilAsserted(() -> assertThat(this.kafkaTopicConsumers.getConsumers().keySet()).isEmpty());