New simple kafka consumer microservice 62/13562/2
authorlapentafd <francesco.lapenta@est.tech>
Wed, 25 Sep 2024 09:06:01 +0000 (10:06 +0100)
committerFrancesco Davide Lapenta <francesco.lapenta@est.tech>
Thu, 10 Oct 2024 10:22:13 +0000 (10:22 +0000)
Using Spring kafka listener in a simple reactive way

Issue-ID: NONRTRIC-1009
Change-Id: I5cba8ab6fd83c8c49b3c46403486f5c6b44073c8
Signed-off-by: lapentafd <francesco.lapenta@est.tech>
12 files changed:
sample-services/ics-simple-producer-consumer/kafka-consumer/Dockerfile [new file with mode: 0644]
sample-services/ics-simple-producer-consumer/kafka-consumer/container-tag.yaml [new file with mode: 0644]
sample-services/ics-simple-producer-consumer/kafka-consumer/pom.xml [new file with mode: 0644]
sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/java/com/demo/kafkaconsumer/KafkaConsumerApplication.java [new file with mode: 0644]
sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/java/com/demo/kafkaconsumer/configuration/KafkaConfiguration.java [new file with mode: 0644]
sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/java/com/demo/kafkaconsumer/controllers/CallbacksController.java [new file with mode: 0644]
sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/java/com/demo/kafkaconsumer/controllers/KafkaListenerController.java [new file with mode: 0644]
sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/java/com/demo/kafkaconsumer/model/Job.java [new file with mode: 0644]
sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/java/com/demo/kafkaconsumer/service/KafkaConsumer.java [new file with mode: 0644]
sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/resources/application.yaml [new file with mode: 0644]
sample-services/ics-simple-producer-consumer/kafka-consumer/src/test/java/com/demo/kafkaconsumer/KafkaConsumerApplicationTests.java [new file with mode: 0644]
sample-services/ics-simple-producer-consumer/kafka-consumer/src/test/java/com/demo/kafkaconsumer/KafkaListenerControllerTest.java [new file with mode: 0644]

diff --git a/sample-services/ics-simple-producer-consumer/kafka-consumer/Dockerfile b/sample-services/ics-simple-producer-consumer/kafka-consumer/Dockerfile
new file mode 100644 (file)
index 0000000..a88aff0
--- /dev/null
@@ -0,0 +1,30 @@
+#==================================================================================
+#   Copyright (C) 2024: OpenInfra Foundation Europe
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+#   This source code is part of the near-RT RIC (RAN Intelligent Controller)
+#   platform project (RICP).
+#==================================================================================
+FROM openjdk:17-jdk-slim
+
+EXPOSE 9090
+
+ARG SPRING_KAFKA_SERVER
+ENV SPRING_KAFKA_SERVER=${SPRING_KAFKA_SERVER}
+
+WORKDIR /app
+
+COPY target/kafka-consumer-0.0.1-SNAPSHOT.jar /app/consumer-0.0.1.jar
+
+CMD ["java", "-jar", "consumer-0.0.1.jar"]
diff --git a/sample-services/ics-simple-producer-consumer/kafka-consumer/container-tag.yaml b/sample-services/ics-simple-producer-consumer/kafka-consumer/container-tag.yaml
new file mode 100644 (file)
index 0000000..48290af
--- /dev/null
@@ -0,0 +1,23 @@
+#  ========================LICENSE_START=================================\r
+#  O-RAN-SC\r
+#\r
+#  Copyright (C) 2024: OpenInfra Foundation Europe\r
+#  ========================================================================\r
+#  Licensed under the Apache License, Version 2.0 (the "License");\r
+#  you may not use this file except in compliance with the License.\r
+#  You may obtain a copy of the License at\r
+#\r
+#       http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+#  Unless required by applicable law or agreed to in writing, software\r
+#  distributed under the License is distributed on an "AS IS" BASIS,\r
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+#  See the License for the specific language governing permissions and\r
+#  limitations under the License.\r
+#  ============LICENSE_END=================================================\r
+\r
+# The Jenkins job requires a tag to build the Docker image.\r
+# By default this file is in the docker build directory,\r
+# but the location can configured in the JJB template.\r
+---\r
+tag: 0.0.1
\ No newline at end of file
diff --git a/sample-services/ics-simple-producer-consumer/kafka-consumer/pom.xml b/sample-services/ics-simple-producer-consumer/kafka-consumer/pom.xml
new file mode 100644 (file)
index 0000000..d62b0e3
--- /dev/null
@@ -0,0 +1,128 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.springframework.boot</groupId>
+        <artifactId>spring-boot-starter-parent</artifactId>
+        <version>3.3.2</version>
+        <relativePath/>
+    </parent>
+    <groupId>com.demo</groupId>
+    <artifactId>kafka-consumer</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+    <name>kafka-consumer</name>
+    <description>Demo project for Spring Boot and kafka consumer</description>
+
+    <properties>
+        <java.version>17</java.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>io.fabric8</groupId>
+                <artifactId>docker-maven-plugin</artifactId>
+                <version>0.45.0</version>
+                <inherited>false</inherited>
+                <executions>
+                    <execution>
+                        <id>generate-producer-image</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>build</goal>
+                        </goals>
+                        <configuration>
+                            <pullRegistry>${env.CONTAINER_PULL_REGISTRY}</pullRegistry>
+                            <images>
+                                <image>
+                                    <name>
+                                        o-ran-sc/nonrtric-simple-icsconsumer:latest
+                                    </name>
+                                    <build>
+                                        <cleanup>try</cleanup>
+                                        <contextDir>${project.basedir}</contextDir>
+                                        <dockerFile>Dockerfile</dockerFile>
+                                        <args>
+                                            <JAR>${project.build.finalName}.jar</JAR>
+                                        </args>
+                                        <tags>
+                                            <tag>latest</tag>
+                                        </tags>
+                                    </build>
+                                </image>
+                            </images>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>push-producer-image</id>
+                        <goals>
+                            <goal>build</goal>
+                            <goal>push</goal>
+                        </goals>
+                        <configuration>
+                            <pullRegistry>${env.CONTAINER_PULL_REGISTRY}</pullRegistry>
+                            <pushRegistry>${env.CONTAINER_PUSH_REGISTRY}</pushRegistry>
+                            <images>
+                                <image>
+                                    <name>
+                                        o-ran-sc/nonrtric-simple-icsconsumer:latest
+                                    </name>
+                                    <build>
+                                        <contextDir>${project.basedir}</contextDir>
+                                        <dockerFile>Dockerfile</dockerFile>
+                                        <args>
+                                            <JAR>${project.build.finalName}.jar</JAR>
+                                        </args>
+                                        <tags>
+                                            <tag>${project.version}</tag>
+                                            <tag>latest</tag>
+                                        </tags>
+                                    </build>
+                                </image>
+                            </images>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/java/com/demo/kafkaconsumer/KafkaConsumerApplication.java b/sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/java/com/demo/kafkaconsumer/KafkaConsumerApplication.java
new file mode 100644 (file)
index 0000000..371a22f
--- /dev/null
@@ -0,0 +1,32 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+package com.demo.kafkaconsumer;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class KafkaConsumerApplication {
+
+    public static void main(String[] args) {
+        SpringApplication.run(KafkaConsumerApplication.class, args);
+    }
+
+}
diff --git a/sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/java/com/demo/kafkaconsumer/configuration/KafkaConfiguration.java b/sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/java/com/demo/kafkaconsumer/configuration/KafkaConfiguration.java
new file mode 100644 (file)
index 0000000..09389ad
--- /dev/null
@@ -0,0 +1,72 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+package com.demo.kafkaconsumer.configuration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.listener.DefaultErrorHandler;
+import org.springframework.util.backoff.FixedBackOff;
+
+import lombok.Getter;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Getter
+@EnableKafka
+@Configuration
+@EnableConfigurationProperties
+public class KafkaConfiguration {
+
+    @Value("${spring.kafka.server}")
+    private String server;
+
+    @Value("${spring.kafka.group}")
+    private String group;
+
+    @Bean
+    public ConsumerFactory<String, String> consumerFactory() {
+        Map<String, Object> config = new HashMap<>();
+
+        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
+        config.put(ConsumerConfig.GROUP_ID_CONFIG, group);
+        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+
+        return new DefaultKafkaConsumerFactory<>(config);
+    }
+
+    @Bean
+    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
+        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
+        factory.setConsumerFactory(consumerFactory());
+        factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(2000L, 5))); // 2 seconds delay, 5 retry attempts
+        return factory;
+    }
+
+}
diff --git a/sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/java/com/demo/kafkaconsumer/controllers/CallbacksController.java b/sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/java/com/demo/kafkaconsumer/controllers/CallbacksController.java
new file mode 100644 (file)
index 0000000..ec0b2db
--- /dev/null
@@ -0,0 +1,58 @@
+/*-\r
+ * ========================LICENSE_START=================================\r
+ * O-RAN-SC\r
+ *\r
+ * Copyright (C) 2024: OpenInfra Foundation Europe\r
+ *\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ *      http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ========================LICENSE_END===================================\r
+ */\r
+package com.demo.kafkaconsumer.controllers;\r
+\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+import org.springframework.beans.factory.annotation.Autowired;\r
+import org.springframework.boot.context.properties.EnableConfigurationProperties;\r
+import org.springframework.http.HttpStatus;\r
+import org.springframework.http.ResponseEntity;\r
+import org.springframework.web.bind.annotation.PostMapping;\r
+import org.springframework.web.bind.annotation.RequestBody;\r
+import org.springframework.web.bind.annotation.RequestMapping;\r
+import org.springframework.web.bind.annotation.RestController;\r
+\r
+import reactor.core.publisher.Mono;\r
+\r
+@EnableConfigurationProperties\r
+@RestController\r
+@RequestMapping(produces = "application/json")\r
+public class CallbacksController {\r
+    private final Logger logger = LoggerFactory.getLogger(CallbacksController.class);\r
+\r
+    @Autowired\r
+    KafkaListenerController kafkaListenerController;\r
+\r
+    @PostMapping("/info-type-status")\r
+    public Mono<ResponseEntity<String>> getStatusNotification(@RequestBody String requestBody) {\r
+        logger.info(requestBody);\r
+        if (requestBody.contains("DEREGISTERED")) {\r
+            logger.info("De-register info type");\r
+            return kafkaListenerController.stopKafkaListener()\r
+                .map(message -> new ResponseEntity<>(message, HttpStatus.OK));\r
+        } else {\r
+            logger.info("Register info Job");\r
+            return kafkaListenerController.startKafkaListener()\r
+                .map(message -> new ResponseEntity<>(message, HttpStatus.OK));\r
+        }\r
+    }\r
+\r
+}\r
diff --git a/sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/java/com/demo/kafkaconsumer/controllers/KafkaListenerController.java b/sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/java/com/demo/kafkaconsumer/controllers/KafkaListenerController.java
new file mode 100644 (file)
index 0000000..1fd0bfe
--- /dev/null
@@ -0,0 +1,71 @@
+/*-\r
+ * ========================LICENSE_START=================================\r
+ * O-RAN-SC\r
+ *\r
+ * Copyright (C) 2024: OpenInfra Foundation Europe\r
+ *\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ *      http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ========================LICENSE_END===================================\r
+ */\r
+package com.demo.kafkaconsumer.controllers;\r
+\r
+import org.springframework.beans.factory.annotation.Autowired;\r
+import org.springframework.beans.factory.annotation.Value;\r
+import org.springframework.boot.context.properties.EnableConfigurationProperties;\r
+import org.springframework.kafka.config.KafkaListenerEndpointRegistry;\r
+import org.springframework.kafka.listener.MessageListenerContainer;\r
+import org.springframework.web.bind.annotation.GetMapping;\r
+import org.springframework.web.bind.annotation.RestController;\r
+\r
+import reactor.core.publisher.Mono;\r
+\r
+@RestController\r
+@EnableConfigurationProperties\r
+public class KafkaListenerController {\r
+\r
+    @Autowired\r
+    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;\r
+\r
+    @Value("${spring.kafka.id}")\r
+    private String id;\r
+\r
+    @GetMapping("/start")\r
+    public Mono<String> startKafkaListener() {\r
+        return Mono.defer(() -> {\r
+            MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(id);\r
+            if (listenerContainer != null && !listenerContainer.isRunning()) {\r
+                listenerContainer.start();\r
+                return Mono.just("Kafka listener started");\r
+            } else if (listenerContainer != null && listenerContainer.isRunning()) {\r
+                return Mono.just("Kafka listener is already running");\r
+            } else {\r
+                return Mono.just("Kafka listener container not found");\r
+            }\r
+        });\r
+    }\r
+\r
+    @GetMapping("/stop")\r
+    public Mono<String> stopKafkaListener() {\r
+        return Mono.defer(() -> {\r
+            MessageListenerContainer listenerContainer = kafkaListenerEndpointRegistry.getListenerContainer(id);\r
+            if (listenerContainer != null && listenerContainer.isRunning()) {\r
+                listenerContainer.stop();\r
+                return Mono.just("Kafka listener stopped");\r
+            } else if (listenerContainer != null && !listenerContainer.isRunning()) {\r
+                return Mono.just("Kafka listener is not running");\r
+            } else {\r
+                return Mono.just("Kafka listener container not found");\r
+            }\r
+        });\r
+    }\r
+}\r
diff --git a/sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/java/com/demo/kafkaconsumer/model/Job.java b/sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/java/com/demo/kafkaconsumer/model/Job.java
new file mode 100644 (file)
index 0000000..d7c19ca
--- /dev/null
@@ -0,0 +1,29 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+package com.demo.kafkaconsumer.model;
+
+import lombok.Data;
+import lombok.ToString;
+
+@Data
+@ToString
+public class Job {
+    private Object job;
+}
diff --git a/sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/java/com/demo/kafkaconsumer/service/KafkaConsumer.java b/sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/java/com/demo/kafkaconsumer/service/KafkaConsumer.java
new file mode 100644 (file)
index 0000000..01cef9c
--- /dev/null
@@ -0,0 +1,41 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+package com.demo.kafkaconsumer.service;
+
+import reactor.core.publisher.Mono;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.messaging.handler.annotation.Payload;
+import org.springframework.stereotype.Service;
+
+@Service
+@EnableConfigurationProperties
+public class KafkaConsumer {
+    private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);
+
+    @KafkaListener(id = "${spring.kafka.id}", topics = "${spring.kafka.topic}", autoStartup = "${spring.kafka.autostart}")
+    public Mono<String> listen(@Payload String data) {
+        logger.info("Consumed: " + data);
+        return Mono.just(data);
+    }
+}
diff --git a/sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/resources/application.yaml b/sample-services/ics-simple-producer-consumer/kafka-consumer/src/main/resources/application.yaml
new file mode 100644 (file)
index 0000000..34e415c
--- /dev/null
@@ -0,0 +1,30 @@
+#  ========================LICENSE_START=================================\r
+#  O-RAN-SC\r
+#\r
+#  Copyright (C) 2024: OpenInfra Foundation Europe\r
+#  ========================================================================\r
+#  Licensed under the Apache License, Version 2.0 (the "License");\r
+#  you may not use this file except in compliance with the License.\r
+#  You may obtain a copy of the License at\r
+#\r
+#       http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+#  Unless required by applicable law or agreed to in writing, software\r
+#  distributed under the License is distributed on an "AS IS" BASIS,\r
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+#  See the License for the specific language governing permissions and\r
+#  limitations under the License.\r
+#  ============LICENSE_END=================================================\r
+server:\r
+  port: 9090\r
+spring:\r
+  application:\r
+    name: consumer\r
+  kafka:\r
+    autostart: false\r
+    server: ${SPRING_KAFKA_SERVER:localhost:9092}\r
+    topic: mytopic\r
+    group: group-1\r
+    id: myListener\r
+    consumer:\r
+      value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
\ No newline at end of file
diff --git a/sample-services/ics-simple-producer-consumer/kafka-consumer/src/test/java/com/demo/kafkaconsumer/KafkaConsumerApplicationTests.java b/sample-services/ics-simple-producer-consumer/kafka-consumer/src/test/java/com/demo/kafkaconsumer/KafkaConsumerApplicationTests.java
new file mode 100644 (file)
index 0000000..f4cb07f
--- /dev/null
@@ -0,0 +1,31 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+package com.demo.kafkaconsumer;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+
+@SpringBootTest
+class KafkaConsumerApplicationTests {
+
+    @Test
+    void contextLoads() {
+    }
+}
diff --git a/sample-services/ics-simple-producer-consumer/kafka-consumer/src/test/java/com/demo/kafkaconsumer/KafkaListenerControllerTest.java b/sample-services/ics-simple-producer-consumer/kafka-consumer/src/test/java/com/demo/kafkaconsumer/KafkaListenerControllerTest.java
new file mode 100644 (file)
index 0000000..b60f7cc
--- /dev/null
@@ -0,0 +1,127 @@
+/*-\r
+ * ========================LICENSE_START=================================\r
+ * O-RAN-SC\r
+ *\r
+ * Copyright (C) 2024: OpenInfra Foundation Europe\r
+ *\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ *      http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ========================LICENSE_END===================================\r
+ */\r
+package com.demo.kafkaconsumer;\r
+\r
+import org.junit.jupiter.api.AfterEach;\r
+import org.junit.jupiter.api.BeforeEach;\r
+import org.junit.jupiter.api.Test;\r
+import org.mockito.InjectMocks;\r
+import org.mockito.Mock;\r
+import org.mockito.MockitoAnnotations;\r
+import org.springframework.beans.factory.annotation.Autowired;\r
+import org.springframework.boot.test.context.SpringBootTest;\r
+import org.springframework.http.HttpStatus;\r
+import org.springframework.http.ResponseEntity;\r
+import org.springframework.kafka.config.KafkaListenerEndpointRegistry;\r
+import org.springframework.kafka.listener.MessageListenerContainer;\r
+\r
+import com.demo.kafkaconsumer.controllers.CallbacksController;\r
+import com.demo.kafkaconsumer.controllers.KafkaListenerController;\r
+\r
+import reactor.core.publisher.Mono;\r
+import reactor.test.StepVerifier;\r
+\r
+import static org.junit.jupiter.api.Assertions.assertEquals;\r
+import static org.mockito.Mockito.*;\r
+\r
+@SpringBootTest\r
+public class KafkaListenerControllerTest {\r
+\r
+    @Mock\r
+    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;\r
+\r
+    @Mock\r
+    private MessageListenerContainer listenerContainer;\r
+\r
+    @InjectMocks\r
+    private KafkaListenerController kafkaListenerController;\r
+\r
+    @Autowired\r
+    private KafkaListenerController realKafkaListenerController;\r
+\r
+    @Autowired\r
+    private CallbacksController callbacksController;\r
+\r
+    private String id = "testid";\r
+\r
+    @BeforeEach\r
+    public void setUp() {\r
+        MockitoAnnotations.openMocks(this);\r
+        when(kafkaListenerEndpointRegistry.getListenerContainer(id)).thenReturn(listenerContainer);\r
+    }\r
+\r
+    @AfterEach\r
+    public void tearDown() {\r
+        reset(kafkaListenerEndpointRegistry, listenerContainer);\r
+    }\r
+\r
+    @Test\r
+    public void testStartKafkaListener() {\r
+        when(listenerContainer.isRunning()).thenReturn(true);\r
+\r
+        Mono<String> result = realKafkaListenerController.startKafkaListener();\r
+\r
+        StepVerifier.create(result)\r
+                .expectNext("Kafka listener is already running")\r
+                .verifyComplete();\r
+    }\r
+\r
+    @Test\r
+    public void testStartKafkaListenerContainerNotFound() {\r
+        when(kafkaListenerEndpointRegistry.getListenerContainer(id)).thenReturn(null);\r
+\r
+        Mono<String> result = kafkaListenerController.startKafkaListener();\r
+\r
+        StepVerifier.create(result)\r
+                .expectNext("Kafka listener container not found")\r
+                .verifyComplete();\r
+    }\r
+\r
+    @Test\r
+    public void testStopKafkaListenerContainerNotFound() {\r
+        when(kafkaListenerEndpointRegistry.getListenerContainer(id)).thenReturn(null);\r
+\r
+        Mono<String> result = kafkaListenerController.stopKafkaListener();\r
+\r
+        StepVerifier.create(result)\r
+                .expectNext("Kafka listener container not found")\r
+                .verifyComplete();\r
+    }\r
+\r
+    @Test\r
+    public void testGetStatusNotification_Deregistered() {\r
+        String requestBody = "DEREGISTERED";\r
+\r
+        ResponseEntity<String> response = callbacksController.getStatusNotification(requestBody).block();\r
+\r
+        assertEquals(HttpStatus.OK, response.getStatusCode());\r
+        assertEquals("Kafka listener stopped", response.getBody());\r
+    }\r
+\r
+    @Test\r
+    public void testGetStatusNotification_Registered() {\r
+        String requestBody = "REGISTERED";\r
+\r
+        ResponseEntity<String> response = callbacksController.getStatusNotification(requestBody).block();\r
+\r
+        assertEquals(HttpStatus.OK, response.getStatusCode());\r
+        assertEquals("Kafka listener started", response.getBody());\r
+    }\r
+}\r