From: lapentafd Date: Wed, 25 Sep 2024 09:06:01 +0000 (+0100) Subject: New simple kafka consumer microservice X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=51b20fd9d3b27a8ee872bceff30be7cd1066d80e;p=nonrtric.git New simple kafka consumer microservice Using Spring kafka listener in a simple reactive way Issue-ID: NONRTRIC-1009 Change-Id: I5cba8ab6fd83c8c49b3c46403486f5c6b44073c8 Signed-off-by: lapentafd --- diff --git a/sample-services/ics-simple-producer-consumer/kafka-consumer/Dockerfile b/sample-services/ics-simple-producer-consumer/kafka-consumer/Dockerfile new file mode 100644 index 00000000..a88aff05 --- /dev/null +++ b/sample-services/ics-simple-producer-consumer/kafka-consumer/Dockerfile @@ -0,0 +1,30 @@ +#================================================================================== +# Copyright (C) 2024: OpenInfra Foundation Europe +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# This source code is part of the near-RT RIC (RAN Intelligent Controller) +# platform project (RICP). +#================================================================================== +FROM openjdk:17-jdk-slim + +EXPOSE 9090 + +ARG SPRING_KAFKA_SERVER +ENV SPRING_KAFKA_SERVER=${SPRING_KAFKA_SERVER} + +WORKDIR /app + +COPY target/kafka-consumer-0.0.1-SNAPSHOT.jar /app/consumer-0.0.1.jar + +CMD ["java", "-jar", "consumer-0.0.1.jar"] diff --git a/sample-services/ics-simple-producer-consumer/kafka-consumer/container-tag.yaml b/sample-services/ics-simple-producer-consumer/kafka-consumer/container-tag.yaml new file mode 100644 index 00000000..48290af7 --- /dev/null +++ b/sample-services/ics-simple-producer-consumer/kafka-consumer/container-tag.yaml @@ -0,0 +1,23 @@ +# ========================LICENSE_START================================= +# O-RAN-SC +# +# Copyright (C) 2024: OpenInfra Foundation Europe +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= + +# The Jenkins job requires a tag to build the Docker image. +# By default this file is in the docker build directory, +# but the location can configured in the JJB template. +--- +tag: 0.0.1 \ No newline at end of file diff --git a/sample-services/ics-simple-producer-consumer/kafka-consumer/pom.xml b/sample-services/ics-simple-producer-consumer/kafka-consumer/pom.xml new file mode 100644 index 00000000..d62b0e3a --- /dev/null +++ b/sample-services/ics-simple-producer-consumer/kafka-consumer/pom.xml @@ -0,0 +1,128 @@ + + + 4.0.0 + + org.springframework.boot + spring-boot-starter-parent + 3.3.2 + + + com.demo + kafka-consumer + 0.0.1-SNAPSHOT + kafka-consumer + Demo project for Spring Boot and kafka consumer + + + 17 + + + + + org.springframework.boot + spring-boot-starter-web + + + org.springframework.kafka + spring-kafka + + + io.projectreactor + reactor-core + + + io.projectreactor + reactor-test + test + + + org.springframework.boot + spring-boot-starter-test + test + + + org.projectlombok + lombok + provided + + + org.springframework.kafka + spring-kafka-test + test + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + io.fabric8 + docker-maven-plugin + 0.45.0 + false + + + generate-producer-image + package + + build + + + ${env.CONTAINER_PULL_REGISTRY} + + + + o-ran-sc/nonrtric-simple-icsconsumer:latest + + + try + ${project.basedir} + Dockerfile + + ${project.build.finalName}.jar + + + latest + + + + + + + + push-producer-image + + build + push + + + ${env.CONTAINER_PULL_REGISTRY} + ${env.CONTAINER_PUSH_REGISTRY} + + + + o-ran-sc/nonrtric-simple-icsconsumer:latest + + + ${project.basedir} + Dockerfile + + ${project.build.finalName}.jar + + + ${project.version} + latest + + + + + + + + + + + diff --git a/sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/java/com/demo/kafkaconsumer/KafkaConsumerApplication.java b/sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/java/com/demo/kafkaconsumer/KafkaConsumerApplication.java new file mode 100644 index 00000000..371a22ff --- /dev/null +++ b/sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/java/com/demo/kafkaconsumer/KafkaConsumerApplication.java @@ -0,0 +1,32 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ +package com.demo.kafkaconsumer; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class KafkaConsumerApplication { + + public static void main(String[] args) { + SpringApplication.run(KafkaConsumerApplication.class, args); + } + +} diff --git a/sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/java/com/demo/kafkaconsumer/configuration/KafkaConfiguration.java b/sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/java/com/demo/kafkaconsumer/configuration/KafkaConfiguration.java new file mode 100644 index 00000000..09389ad8 --- /dev/null +++ b/sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/java/com/demo/kafkaconsumer/configuration/KafkaConfiguration.java @@ -0,0 +1,72 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ +package com.demo.kafkaconsumer.configuration; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.kafka.annotation.EnableKafka; +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; +import org.springframework.kafka.core.ConsumerFactory; +import org.springframework.kafka.core.DefaultKafkaConsumerFactory; +import org.springframework.kafka.listener.DefaultErrorHandler; +import org.springframework.util.backoff.FixedBackOff; + +import lombok.Getter; + +import java.util.HashMap; +import java.util.Map; + +@Getter +@EnableKafka +@Configuration +@EnableConfigurationProperties +public class KafkaConfiguration { + + @Value("${spring.kafka.server}") + private String server; + + @Value("${spring.kafka.group}") + private String group; + + @Bean + public ConsumerFactory consumerFactory() { + Map config = new HashMap<>(); + + config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server); + config.put(ConsumerConfig.GROUP_ID_CONFIG, group); + config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + + return new DefaultKafkaConsumerFactory<>(config); + } + + @Bean + public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { + ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory<>(); + factory.setConsumerFactory(consumerFactory()); + factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(2000L, 5))); // 2 seconds delay, 5 retry attempts + return factory; + } + +} diff --git a/sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/java/com/demo/kafkaconsumer/controllers/CallbacksController.java b/sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/java/com/demo/kafkaconsumer/controllers/CallbacksController.java new file mode 100644 index 00000000..ec0b2dbf --- /dev/null +++ b/sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/java/com/demo/kafkaconsumer/controllers/CallbacksController.java @@ -0,0 +1,58 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ +package com.demo.kafkaconsumer.controllers; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +import reactor.core.publisher.Mono; + +@EnableConfigurationProperties +@RestController +@RequestMapping(produces = "application/json") +public class CallbacksController { + private final Logger logger = LoggerFactory.getLogger(CallbacksController.class); + + @Autowired + KafkaListenerController kafkaListenerController; + + @PostMapping("/info-type-status") + public Mono> getStatusNotification(@RequestBody String requestBody) { + logger.info(requestBody); + if (requestBody.contains("DEREGISTERED")) { + logger.info("De-register info type"); + return kafkaListenerController.stopKafkaListener() + .map(message -> new ResponseEntity<>(message, HttpStatus.OK)); + } else { + logger.info("Register info Job"); + return kafkaListenerController.startKafkaListener() + .map(message -> new ResponseEntity<>(message, HttpStatus.OK)); + } + } + +} diff --git a/sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/java/com/demo/kafkaconsumer/controllers/KafkaListenerController.java b/sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/java/com/demo/kafkaconsumer/controllers/KafkaListenerController.java new file mode 100644 index 00000000..1fd0bfe1 --- /dev/null +++ b/sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/java/com/demo/kafkaconsumer/controllers/KafkaListenerController.java @@ -0,0 +1,71 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ +package com.demo.kafkaconsumer.controllers; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.listener.MessageListenerContainer; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; + +import reactor.core.publisher.Mono; + +@RestController +@EnableConfigurationProperties +public class KafkaListenerController { + + @Autowired + private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; + + @Value("${spring.kafka.id}") + private String id; + + @GetMapping("/start") + public Mono startKafkaListener() { + return Mono.defer(() -> { + MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(id); + if (listenerContainer != null && !listenerContainer.isRunning()) { + listenerContainer.start(); + return Mono.just("Kafka listener started"); + } else if (listenerContainer != null && listenerContainer.isRunning()) { + return Mono.just("Kafka listener is already running"); + } else { + return Mono.just("Kafka listener container not found"); + } + }); + } + + @GetMapping("/stop") + public Mono stopKafkaListener() { + return Mono.defer(() -> { + MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(id); + if (listenerContainer != null && listenerContainer.isRunning()) { + listenerContainer.stop(); + return Mono.just("Kafka listener stopped"); + } else if (listenerContainer != null && !listenerContainer.isRunning()) { + return Mono.just("Kafka listener is not running"); + } else { + return Mono.just("Kafka listener container not found"); + } + }); + } +} diff --git a/sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/java/com/demo/kafkaconsumer/model/Job.java b/sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/java/com/demo/kafkaconsumer/model/Job.java new file mode 100644 index 00000000..d7c19cad --- /dev/null +++ b/sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/java/com/demo/kafkaconsumer/model/Job.java @@ -0,0 +1,29 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ +package com.demo.kafkaconsumer.model; + +import lombok.Data; +import lombok.ToString; + +@Data +@ToString +public class Job { + private Object job; +} diff --git a/sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/java/com/demo/kafkaconsumer/service/KafkaConsumer.java b/sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/java/com/demo/kafkaconsumer/service/KafkaConsumer.java new file mode 100644 index 00000000..01cef9c7 --- /dev/null +++ b/sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/java/com/demo/kafkaconsumer/service/KafkaConsumer.java @@ -0,0 +1,41 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ +package com.demo.kafkaconsumer.service; + +import reactor.core.publisher.Mono; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.messaging.handler.annotation.Payload; +import org.springframework.stereotype.Service; + +@Service +@EnableConfigurationProperties +public class KafkaConsumer { + private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); + + @KafkaListener(id = "${spring.kafka.id}", topics = "${spring.kafka.topic}", autoStartup = "${spring.kafka.autostart}") + public Mono listen(@Payload String data) { + logger.info("Consumed: " + data); + return Mono.just(data); + } +} diff --git a/sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/resources/application.yaml b/sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/resources/application.yaml new file mode 100644 index 00000000..34e415c3 --- /dev/null +++ b/sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/resources/application.yaml @@ -0,0 +1,30 @@ +# ========================LICENSE_START================================= +# O-RAN-SC +# +# Copyright (C) 2024: OpenInfra Foundation Europe +# ======================================================================== +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# ============LICENSE_END================================================= +server: + port: 9090 +spring: + application: + name: consumer + kafka: + autostart: false + server: ${SPRING_KAFKA_SERVER:localhost:9092} + topic: mytopic + group: group-1 + id: myListener + consumer: + value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer \ No newline at end of file diff --git a/sample-services/ics-simple-producer-consumer/kafka-consumer/src/test/java/com/demo/kafkaconsumer/KafkaConsumerApplicationTests.java b/sample-services/ics-simple-producer-consumer/kafka-consumer/src/test/java/com/demo/kafkaconsumer/KafkaConsumerApplicationTests.java new file mode 100644 index 00000000..f4cb07f6 --- /dev/null +++ b/sample-services/ics-simple-producer-consumer/kafka-consumer/src/test/java/com/demo/kafkaconsumer/KafkaConsumerApplicationTests.java @@ -0,0 +1,31 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ +package com.demo.kafkaconsumer; + +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; + +@SpringBootTest +class KafkaConsumerApplicationTests { + + @Test + void contextLoads() { + } +} diff --git a/sample-services/ics-simple-producer-consumer/kafka-consumer/src/test/java/com/demo/kafkaconsumer/KafkaListenerControllerTest.java b/sample-services/ics-simple-producer-consumer/kafka-consumer/src/test/java/com/demo/kafkaconsumer/KafkaListenerControllerTest.java new file mode 100644 index 00000000..b60f7cc3 --- /dev/null +++ b/sample-services/ics-simple-producer-consumer/kafka-consumer/src/test/java/com/demo/kafkaconsumer/KafkaListenerControllerTest.java @@ -0,0 +1,127 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * + * Copyright (C) 2024: OpenInfra Foundation Europe + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ +package com.demo.kafkaconsumer; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import org.springframework.kafka.config.KafkaListenerEndpointRegistry; +import org.springframework.kafka.listener.MessageListenerContainer; + +import com.demo.kafkaconsumer.controllers.CallbacksController; +import com.demo.kafkaconsumer.controllers.KafkaListenerController; + +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.*; + +@SpringBootTest +public class KafkaListenerControllerTest { + + @Mock + private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry; + + @Mock + private MessageListenerContainer listenerContainer; + + @InjectMocks + private KafkaListenerController kafkaListenerController; + + @Autowired + private KafkaListenerController realKafkaListenerController; + + @Autowired + private CallbacksController callbacksController; + + private String id = "testid"; + + @BeforeEach + public void setUp() { + MockitoAnnotations.openMocks(this); + when(kafkaListenerEndpointRegistry.getListenerContainer(id)).thenReturn(listenerContainer); + } + + @AfterEach + public void tearDown() { + reset(kafkaListenerEndpointRegistry, listenerContainer); + } + + @Test + public void testStartKafkaListener() { + when(listenerContainer.isRunning()).thenReturn(true); + + Mono result = realKafkaListenerController.startKafkaListener(); + + StepVerifier.create(result) + .expectNext("Kafka listener is already running") + .verifyComplete(); + } + + @Test + public void testStartKafkaListenerContainerNotFound() { + when(kafkaListenerEndpointRegistry.getListenerContainer(id)).thenReturn(null); + + Mono result = kafkaListenerController.startKafkaListener(); + + StepVerifier.create(result) + .expectNext("Kafka listener container not found") + .verifyComplete(); + } + + @Test + public void testStopKafkaListenerContainerNotFound() { + when(kafkaListenerEndpointRegistry.getListenerContainer(id)).thenReturn(null); + + Mono result = kafkaListenerController.stopKafkaListener(); + + StepVerifier.create(result) + .expectNext("Kafka listener container not found") + .verifyComplete(); + } + + @Test + public void testGetStatusNotification_Deregistered() { + String requestBody = "DEREGISTERED"; + + ResponseEntity response = callbacksController.getStatusNotification(requestBody).block(); + + assertEquals(HttpStatus.OK, response.getStatusCode()); + assertEquals("Kafka listener stopped", response.getBody()); + } + + @Test + public void testGetStatusNotification_Registered() { + String requestBody = "REGISTERED"; + + ResponseEntity response = callbacksController.getStatusNotification(requestBody).block(); + + assertEquals(HttpStatus.OK, response.getStatusCode()); + assertEquals("Kafka listener started", response.getBody()); + } +}