--- /dev/null
+#==================================================================================
+# Copyright (C) 2024: OpenInfra Foundation Europe
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# This source code is part of the near-RT RIC (RAN Intelligent Controller)
+# platform project (RICP).
+#==================================================================================
+FROM openjdk:17-jdk-slim
+
+EXPOSE 9090
+
+ARG SPRING_KAFKA_SERVER
+ENV SPRING_KAFKA_SERVER=${SPRING_KAFKA_SERVER}
+
+WORKDIR /app
+
+COPY target/kafka-consumer-0.0.1-SNAPSHOT.jar /app/consumer-0.0.1.jar
+
+CMD ["java", "-jar", "consumer-0.0.1.jar"]
--- /dev/null
+# ========================LICENSE_START=================================\r
+# O-RAN-SC\r
+#\r
+# Copyright (C) 2024: OpenInfra Foundation Europe\r
+# ========================================================================\r
+# Licensed under the Apache License, Version 2.0 (the "License");\r
+# you may not use this file except in compliance with the License.\r
+# You may obtain a copy of the License at\r
+#\r
+# http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+# Unless required by applicable law or agreed to in writing, software\r
+# distributed under the License is distributed on an "AS IS" BASIS,\r
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+# See the License for the specific language governing permissions and\r
+# limitations under the License.\r
+# ============LICENSE_END=================================================\r
+\r
+# The Jenkins job requires a tag to build the Docker image.\r
+# By default this file is in the docker build directory,\r
+# but the location can configured in the JJB template.\r
+---\r
+tag: 0.0.1
\ No newline at end of file
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-parent</artifactId>
+ <version>3.3.2</version>
+ <relativePath/>
+ </parent>
+ <groupId>com.demo</groupId>
+ <artifactId>kafka-consumer</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <name>kafka-consumer</name>
+ <description>Demo project for Spring Boot and kafka consumer</description>
+
+ <properties>
+ <java.version>17</java.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-web</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>io.fabric8</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ <version>0.45.0</version>
+ <inherited>false</inherited>
+ <executions>
+ <execution>
+ <id>generate-producer-image</id>
+ <phase>package</phase>
+ <goals>
+ <goal>build</goal>
+ </goals>
+ <configuration>
+ <pullRegistry>${env.CONTAINER_PULL_REGISTRY}</pullRegistry>
+ <images>
+ <image>
+ <name>
+ o-ran-sc/nonrtric-simple-icsconsumer:latest
+ </name>
+ <build>
+ <cleanup>try</cleanup>
+ <contextDir>${project.basedir}</contextDir>
+ <dockerFile>Dockerfile</dockerFile>
+ <args>
+ <JAR>${project.build.finalName}.jar</JAR>
+ </args>
+ <tags>
+ <tag>latest</tag>
+ </tags>
+ </build>
+ </image>
+ </images>
+ </configuration>
+ </execution>
+ <execution>
+ <id>push-producer-image</id>
+ <goals>
+ <goal>build</goal>
+ <goal>push</goal>
+ </goals>
+ <configuration>
+ <pullRegistry>${env.CONTAINER_PULL_REGISTRY}</pullRegistry>
+ <pushRegistry>${env.CONTAINER_PUSH_REGISTRY}</pushRegistry>
+ <images>
+ <image>
+ <name>
+ o-ran-sc/nonrtric-simple-icsconsumer:latest
+ </name>
+ <build>
+ <contextDir>${project.basedir}</contextDir>
+ <dockerFile>Dockerfile</dockerFile>
+ <args>
+ <JAR>${project.build.finalName}.jar</JAR>
+ </args>
+ <tags>
+ <tag>${project.version}</tag>
+ <tag>latest</tag>
+ </tags>
+ </build>
+ </image>
+ </images>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+package com.demo.kafkaconsumer;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class KafkaConsumerApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(KafkaConsumerApplication.class, args);
+ }
+
+}
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+package com.demo.kafkaconsumer.configuration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.listener.DefaultErrorHandler;
+import org.springframework.util.backoff.FixedBackOff;
+
+import lombok.Getter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Getter
+@EnableKafka
+@Configuration
+@EnableConfigurationProperties
+public class KafkaConfiguration {
+
+ @Value("${spring.kafka.server}")
+ private String server;
+
+ @Value("${spring.kafka.group}")
+ private String group;
+
+ @Bean
+ public ConsumerFactory<String, String> consumerFactory() {
+ Map<String, Object> config = new HashMap<>();
+
+ config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
+ config.put(ConsumerConfig.GROUP_ID_CONFIG, group);
+ config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+ return new DefaultKafkaConsumerFactory<>(config);
+ }
+
+ @Bean
+ public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
+ ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
+ factory.setConsumerFactory(consumerFactory());
+ factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(2000L, 5))); // 2 seconds delay, 5 retry attempts
+ return factory;
+ }
+
+}
--- /dev/null
+/*-\r
+ * ========================LICENSE_START=================================\r
+ * O-RAN-SC\r
+ *\r
+ * Copyright (C) 2024: OpenInfra Foundation Europe\r
+ *\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ========================LICENSE_END===================================\r
+ */\r
+package com.demo.kafkaconsumer.controllers;\r
+\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+import org.springframework.beans.factory.annotation.Autowired;\r
+import org.springframework.boot.context.properties.EnableConfigurationProperties;\r
+import org.springframework.http.HttpStatus;\r
+import org.springframework.http.ResponseEntity;\r
+import org.springframework.web.bind.annotation.PostMapping;\r
+import org.springframework.web.bind.annotation.RequestBody;\r
+import org.springframework.web.bind.annotation.RequestMapping;\r
+import org.springframework.web.bind.annotation.RestController;\r
+\r
+import reactor.core.publisher.Mono;\r
+\r
+@EnableConfigurationProperties\r
+@RestController\r
+@RequestMapping(produces = "application/json")\r
+public class CallbacksController {\r
+ private final Logger logger = LoggerFactory.getLogger(CallbacksController.class);\r
+\r
+ @Autowired\r
+ KafkaListenerController kafkaListenerController;\r
+\r
+ @PostMapping("/info-type-status")\r
+ public Mono<ResponseEntity<String>> getStatusNotification(@RequestBody String requestBody) {\r
+ logger.info(requestBody);\r
+ if (requestBody.contains("DEREGISTERED")) {\r
+ logger.info("De-register info type");\r
+ return kafkaListenerController.stopKafkaListener()\r
+ .map(message -> new ResponseEntity<>(message, HttpStatus.OK));\r
+ } else {\r
+ logger.info("Register info Job");\r
+ return kafkaListenerController.startKafkaListener()\r
+ .map(message -> new ResponseEntity<>(message, HttpStatus.OK));\r
+ }\r
+ }\r
+\r
+}\r
--- /dev/null
+/*-\r
+ * ========================LICENSE_START=================================\r
+ * O-RAN-SC\r
+ *\r
+ * Copyright (C) 2024: OpenInfra Foundation Europe\r
+ *\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ========================LICENSE_END===================================\r
+ */\r
+package com.demo.kafkaconsumer.controllers;\r
+\r
+import org.springframework.beans.factory.annotation.Autowired;\r
+import org.springframework.beans.factory.annotation.Value;\r
+import org.springframework.boot.context.properties.EnableConfigurationProperties;\r
+import org.springframework.kafka.config.KafkaListenerEndpointRegistry;\r
+import org.springframework.kafka.listener.MessageListenerContainer;\r
+import org.springframework.web.bind.annotation.GetMapping;\r
+import org.springframework.web.bind.annotation.RestController;\r
+\r
+import reactor.core.publisher.Mono;\r
+\r
+@RestController\r
+@EnableConfigurationProperties\r
+public class KafkaListenerController {\r
+\r
+ @Autowired\r
+ private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;\r
+\r
+ @Value("${spring.kafka.id}")\r
+ private String id;\r
+\r
+ @GetMapping("/start")\r
+ public Mono<String> startKafkaListener() {\r
+ return Mono.defer(() -> {\r
+ MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(id);\r
+ if (listenerContainer != null && !listenerContainer.isRunning()) {\r
+ listenerContainer.start();\r
+ return Mono.just("Kafka listener started");\r
+ } else if (listenerContainer != null && listenerContainer.isRunning()) {\r
+ return Mono.just("Kafka listener is already running");\r
+ } else {\r
+ return Mono.just("Kafka listener container not found");\r
+ }\r
+ });\r
+ }\r
+\r
+ @GetMapping("/stop")\r
+ public Mono<String> stopKafkaListener() {\r
+ return Mono.defer(() -> {\r
+ MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(id);\r
+ if (listenerContainer != null && listenerContainer.isRunning()) {\r
+ listenerContainer.stop();\r
+ return Mono.just("Kafka listener stopped");\r
+ } else if (listenerContainer != null && !listenerContainer.isRunning()) {\r
+ return Mono.just("Kafka listener is not running");\r
+ } else {\r
+ return Mono.just("Kafka listener container not found");\r
+ }\r
+ });\r
+ }\r
+}\r
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+package com.demo.kafkaconsumer.model;
+
+import lombok.Data;
+import lombok.ToString;
+
+@Data
+@ToString
+public class Job {
+ private Object job;
+}
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+package com.demo.kafkaconsumer.service;
+
+import reactor.core.publisher.Mono;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.messaging.handler.annotation.Payload;
+import org.springframework.stereotype.Service;
+
+@Service
+@EnableConfigurationProperties
+public class KafkaConsumer {
+ private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
+
+ @KafkaListener(id = "${spring.kafka.id}", topics = "${spring.kafka.topic}", autoStartup = "${spring.kafka.autostart}")
+ public Mono<String> listen(@Payload String data) {
+ logger.info("Consumed: " + data);
+ return Mono.just(data);
+ }
+}
--- /dev/null
+# ========================LICENSE_START=================================\r
+# O-RAN-SC\r
+#\r
+# Copyright (C) 2024: OpenInfra Foundation Europe\r
+# ========================================================================\r
+# Licensed under the Apache License, Version 2.0 (the "License");\r
+# you may not use this file except in compliance with the License.\r
+# You may obtain a copy of the License at\r
+#\r
+# http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+# Unless required by applicable law or agreed to in writing, software\r
+# distributed under the License is distributed on an "AS IS" BASIS,\r
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+# See the License for the specific language governing permissions and\r
+# limitations under the License.\r
+# ============LICENSE_END=================================================\r
+server:\r
+ port: 9090\r
+spring:\r
+ application:\r
+ name: consumer\r
+ kafka:\r
+ autostart: false\r
+ server: ${SPRING_KAFKA_SERVER:localhost:9092}\r
+ topic: mytopic\r
+ group: group-1\r
+ id: myListener\r
+ consumer:\r
+ value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
\ No newline at end of file
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+package com.demo.kafkaconsumer;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+
+@SpringBootTest
+class KafkaConsumerApplicationTests {
+
+ @Test
+ void contextLoads() {
+ }
+}
--- /dev/null
+/*-\r
+ * ========================LICENSE_START=================================\r
+ * O-RAN-SC\r
+ *\r
+ * Copyright (C) 2024: OpenInfra Foundation Europe\r
+ *\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ========================LICENSE_END===================================\r
+ */\r
+package com.demo.kafkaconsumer;\r
+\r
+import org.junit.jupiter.api.AfterEach;\r
+import org.junit.jupiter.api.BeforeEach;\r
+import org.junit.jupiter.api.Test;\r
+import org.mockito.InjectMocks;\r
+import org.mockito.Mock;\r
+import org.mockito.MockitoAnnotations;\r
+import org.springframework.beans.factory.annotation.Autowired;\r
+import org.springframework.boot.test.context.SpringBootTest;\r
+import org.springframework.http.HttpStatus;\r
+import org.springframework.http.ResponseEntity;\r
+import org.springframework.kafka.config.KafkaListenerEndpointRegistry;\r
+import org.springframework.kafka.listener.MessageListenerContainer;\r
+\r
+import com.demo.kafkaconsumer.controllers.CallbacksController;\r
+import com.demo.kafkaconsumer.controllers.KafkaListenerController;\r
+\r
+import reactor.core.publisher.Mono;\r
+import reactor.test.StepVerifier;\r
+\r
+import static org.junit.jupiter.api.Assertions.assertEquals;\r
+import static org.mockito.Mockito.*;\r
+\r
+@SpringBootTest\r
+public class KafkaListenerControllerTest {\r
+\r
+ @Mock\r
+ private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;\r
+\r
+ @Mock\r
+ private MessageListenerContainer listenerContainer;\r
+\r
+ @InjectMocks\r
+ private KafkaListenerController kafkaListenerController;\r
+\r
+ @Autowired\r
+ private KafkaListenerController realKafkaListenerController;\r
+\r
+ @Autowired\r
+ private CallbacksController callbacksController;\r
+\r
+ private String id = "testid";\r
+\r
+ @BeforeEach\r
+ public void setUp() {\r
+ MockitoAnnotations.openMocks(this);\r
+ when(kafkaListenerEndpointRegistry.getListenerContainer(id)).thenReturn(listenerContainer);\r
+ }\r
+\r
+ @AfterEach\r
+ public void tearDown() {\r
+ reset(kafkaListenerEndpointRegistry, listenerContainer);\r
+ }\r
+\r
+ @Test\r
+ public void testStartKafkaListener() {\r
+ when(listenerContainer.isRunning()).thenReturn(true);\r
+\r
+ Mono<String> result = realKafkaListenerController.startKafkaListener();\r
+\r
+ StepVerifier.create(result)\r
+ .expectNext("Kafka listener is already running")\r
+ .verifyComplete();\r
+ }\r
+\r
+ @Test\r
+ public void testStartKafkaListenerContainerNotFound() {\r
+ when(kafkaListenerEndpointRegistry.getListenerContainer(id)).thenReturn(null);\r
+\r
+ Mono<String> result = kafkaListenerController.startKafkaListener();\r
+\r
+ StepVerifier.create(result)\r
+ .expectNext("Kafka listener container not found")\r
+ .verifyComplete();\r
+ }\r
+\r
+ @Test\r
+ public void testStopKafkaListenerContainerNotFound() {\r
+ when(kafkaListenerEndpointRegistry.getListenerContainer(id)).thenReturn(null);\r
+\r
+ Mono<String> result = kafkaListenerController.stopKafkaListener();\r
+\r
+ StepVerifier.create(result)\r
+ .expectNext("Kafka listener container not found")\r
+ .verifyComplete();\r
+ }\r
+\r
+ @Test\r
+ public void testGetStatusNotification_Deregistered() {\r
+ String requestBody = "DEREGISTERED";\r
+\r
+ ResponseEntity<String> response = callbacksController.getStatusNotification(requestBody).block();\r
+\r
+ assertEquals(HttpStatus.OK, response.getStatusCode());\r
+ assertEquals("Kafka listener stopped", response.getBody());\r
+ }\r
+\r
+ @Test\r
+ public void testGetStatusNotification_Registered() {\r
+ String requestBody = "REGISTERED";\r
+\r
+ ResponseEntity<String> response = callbacksController.getStatusNotification(requestBody).block();\r
+\r
+ assertEquals(HttpStatus.OK, response.getStatusCode());\r
+ assertEquals("Kafka listener started", response.getBody());\r
+ }\r
+}\r