New simple kafka producer microservice 07/13507/2
authorlapentafd <francesco.lapenta@est.tech>
Wed, 25 Sep 2024 09:01:59 +0000 (10:01 +0100)
committerFrancesco Davide Lapenta <francesco.lapenta@est.tech>
Thu, 10 Oct 2024 10:20:36 +0000 (10:20 +0000)
Using Spring kafka template in a simple reactive way

Issue-ID: NONRTRIC-1009
Change-Id: I5af9dfeb546531e173785b63c52c52240bbd938a
Signed-off-by: lapentafd <francesco.lapenta@est.tech>
14 files changed:
sample-services/ics-simple-producer-consumer/kafka-producer/.gitignore [new file with mode: 0644]
sample-services/ics-simple-producer-consumer/kafka-producer/Dockerfile [new file with mode: 0644]
sample-services/ics-simple-producer-consumer/kafka-producer/container-tag.yaml [new file with mode: 0644]
sample-services/ics-simple-producer-consumer/kafka-producer/pom.xml [new file with mode: 0644]
sample-services/ics-simple-producer-consumer/kafka-producer/src/main/java/com/demo/kafkaproducer/KafkaProducerApplication.java [new file with mode: 0644]
sample-services/ics-simple-producer-consumer/kafka-producer/src/main/java/com/demo/kafkaproducer/config/KafkaConfiguration.java [new file with mode: 0644]
sample-services/ics-simple-producer-consumer/kafka-producer/src/main/java/com/demo/kafkaproducer/controller/CallbacksController.java [new file with mode: 0644]
sample-services/ics-simple-producer-consumer/kafka-producer/src/main/java/com/demo/kafkaproducer/controller/KafkaController.java [new file with mode: 0644]
sample-services/ics-simple-producer-consumer/kafka-producer/src/main/java/com/demo/kafkaproducer/model/Job.java [new file with mode: 0644]
sample-services/ics-simple-producer-consumer/kafka-producer/src/main/java/com/demo/kafkaproducer/service/JobService.java [new file with mode: 0644]
sample-services/ics-simple-producer-consumer/kafka-producer/src/main/resources/application.yaml [new file with mode: 0644]
sample-services/ics-simple-producer-consumer/kafka-producer/src/test/java/com/demo/kafkaproducer/CallbacksControllerTest.java [new file with mode: 0644]
sample-services/ics-simple-producer-consumer/kafka-producer/src/test/java/com/demo/kafkaproducer/KafkaProducerApplicationTests.java [new file with mode: 0644]
sample-services/ics-simple-producer-consumer/kafka-producer/src/test/java/com/demo/kafkaproducer/KafkaProducerControllerTest.java [new file with mode: 0644]

diff --git a/sample-services/ics-simple-producer-consumer/kafka-producer/.gitignore b/sample-services/ics-simple-producer-consumer/kafka-producer/.gitignore
new file mode 100644 (file)
index 0000000..549e00a
--- /dev/null
@@ -0,0 +1,33 @@
+HELP.md
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**/target/
+!**/src/test/**/target/
+
+### STS ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### IntelliJ IDEA ###
+.idea
+*.iws
+*.iml
+*.ipr
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/
diff --git a/sample-services/ics-simple-producer-consumer/kafka-producer/Dockerfile b/sample-services/ics-simple-producer-consumer/kafka-producer/Dockerfile
new file mode 100644 (file)
index 0000000..c36376d
--- /dev/null
@@ -0,0 +1,30 @@
+#==================================================================================
+#   Copyright (C) 2024: OpenInfra Foundation Europe
+#
+#   Licensed under the Apache License, Version 2.0 (the "License");
+#   you may not use this file except in compliance with the License.
+#   You may obtain a copy of the License at
+#
+#       http://www.apache.org/licenses/LICENSE-2.0
+#
+#   Unless required by applicable law or agreed to in writing, software
+#   distributed under the License is distributed on an "AS IS" BASIS,
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#   See the License for the specific language governing permissions and
+#   limitations under the License.
+#
+#   This source code is part of the near-RT RIC (RAN Intelligent Controller)
+#   platform project (RICP).
+#==================================================================================
+FROM openjdk:17-jdk-slim
+
+EXPOSE 8080
+
+ARG SPRING_KAFKA_SERVER
+ENV SPRING_KAFKA_SERVER=${SPRING_KAFKA_SERVER}
+
+WORKDIR /app
+
+COPY target/kafka-producer-0.0.1-SNAPSHOT.jar /app/producer-0.0.1.jar
+
+CMD ["java", "-jar", "producer-0.0.1.jar"]
diff --git a/sample-services/ics-simple-producer-consumer/kafka-producer/container-tag.yaml b/sample-services/ics-simple-producer-consumer/kafka-producer/container-tag.yaml
new file mode 100644 (file)
index 0000000..48290af
--- /dev/null
@@ -0,0 +1,23 @@
+#  ========================LICENSE_START=================================\r
+#  O-RAN-SC\r
+#\r
+#  Copyright (C) 2024: OpenInfra Foundation Europe\r
+#  ========================================================================\r
+#  Licensed under the Apache License, Version 2.0 (the "License");\r
+#  you may not use this file except in compliance with the License.\r
+#  You may obtain a copy of the License at\r
+#\r
+#       http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+#  Unless required by applicable law or agreed to in writing, software\r
+#  distributed under the License is distributed on an "AS IS" BASIS,\r
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+#  See the License for the specific language governing permissions and\r
+#  limitations under the License.\r
+#  ============LICENSE_END=================================================\r
+\r
+# The Jenkins job requires a tag to build the Docker image.\r
+# By default this file is in the docker build directory,\r
+# but the location can configured in the JJB template.\r
+---\r
+tag: 0.0.1
\ No newline at end of file
diff --git a/sample-services/ics-simple-producer-consumer/kafka-producer/pom.xml b/sample-services/ics-simple-producer-consumer/kafka-producer/pom.xml
new file mode 100644 (file)
index 0000000..008484e
--- /dev/null
@@ -0,0 +1,142 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.springframework.boot</groupId>
+        <artifactId>spring-boot-starter-parent</artifactId>
+        <version>3.3.2</version>
+        <relativePath/>
+    </parent>
+    <groupId>com.demo</groupId>
+    <artifactId>kafka-producer</artifactId>
+    <version>0.0.1-SNAPSHOT</version>
+    <name>kafka-producer</name>
+    <description>Demo project for Spring Boot and Kafka producer</description>
+
+    <properties>
+        <java.version>17</java.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-junit-jupiter</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>io.projectreactor</groupId>
+            <artifactId>reactor-core</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-actuator</artifactId>
+        </dependency>
+    </dependencies>
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+            </plugin>
+            <plugin>
+                <groupId>io.fabric8</groupId>
+                <artifactId>docker-maven-plugin</artifactId>
+                <version>0.45.0</version>
+                <inherited>false</inherited>
+                <executions>
+                    <execution>
+                        <id>generate-producer-image</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>build</goal>
+                        </goals>
+                        <configuration>
+                            <pullRegistry>${env.CONTAINER_PULL_REGISTRY}</pullRegistry>
+                            <images>
+                                <image>
+                                    <name>
+                                        o-ran-sc/nonrtric-simple-icsproducer:latest
+                                    </name>
+                                    <build>
+                                        <cleanup>try</cleanup>
+                                        <contextDir>${project.basedir}</contextDir>
+                                        <dockerFile>Dockerfile</dockerFile>
+                                        <args>
+                                            <JAR>${project.build.finalName}.jar</JAR>
+                                        </args>
+                                        <tags>
+                                            <tag>latest</tag>
+                                        </tags>
+                                    </build>
+                                </image>
+                            </images>
+                        </configuration>
+                    </execution>
+                    <execution>
+                        <id>push-producer-image</id>
+                        <goals>
+                            <goal>build</goal>
+                            <goal>push</goal>
+                        </goals>
+                        <configuration>
+                            <pullRegistry>${env.CONTAINER_PULL_REGISTRY}</pullRegistry>
+                            <pushRegistry>${env.CONTAINER_PUSH_REGISTRY}</pushRegistry>
+                            <images>
+                                <image>
+                                    <name>
+                                        o-ran-sc/nonrtric-simple-icsproducer:latest
+                                    </name>
+                                    <build>
+                                        <contextDir>${project.basedir}</contextDir>
+                                        <dockerFile>Dockerfile</dockerFile>
+                                        <args>
+                                            <JAR>${project.build.finalName}.jar</JAR>
+                                        </args>
+                                        <tags>
+                                            <tag>${project.version}</tag>
+                                            <tag>latest</tag>
+                                        </tags>
+                                    </build>
+                                </image>
+                            </images>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>
diff --git a/sample-services/ics-simple-producer-consumer/kafka-producer/src/main/java/com/demo/kafkaproducer/KafkaProducerApplication.java b/sample-services/ics-simple-producer-consumer/kafka-producer/src/main/java/com/demo/kafkaproducer/KafkaProducerApplication.java
new file mode 100644 (file)
index 0000000..4de2dc4
--- /dev/null
@@ -0,0 +1,32 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+package com.demo.kafkaproducer;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class KafkaProducerApplication {
+
+    public static void main(String[] args) {
+        SpringApplication.run(KafkaProducerApplication.class, args);
+    }
+
+}
diff --git a/sample-services/ics-simple-producer-consumer/kafka-producer/src/main/java/com/demo/kafkaproducer/config/KafkaConfiguration.java b/sample-services/ics-simple-producer-consumer/kafka-producer/src/main/java/com/demo/kafkaproducer/config/KafkaConfiguration.java
new file mode 100644 (file)
index 0000000..309bce3
--- /dev/null
@@ -0,0 +1,62 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+package com.demo.kafkaproducer.config;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Configuration
+@EnableConfigurationProperties
+public class KafkaConfiguration {
+
+    @Value("${spring.kafka.server}")
+    private String server;
+
+    @Bean
+    public Map<String, Object> producerConfigs() {
+        Map<String, Object> props = new HashMap<>();
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        return props;
+    }
+
+    @Bean
+    public ProducerFactory<String, String> producerFactory() {
+        return new DefaultKafkaProducerFactory<>(producerConfigs());
+    }
+
+    @Bean
+    public KafkaTemplate<String, String> kafkaTemplate() {
+        return new KafkaTemplate<String, String>(producerFactory());
+    }
+
+}
diff --git a/sample-services/ics-simple-producer-consumer/kafka-producer/src/main/java/com/demo/kafkaproducer/controller/CallbacksController.java b/sample-services/ics-simple-producer-consumer/kafka-producer/src/main/java/com/demo/kafkaproducer/controller/CallbacksController.java
new file mode 100644 (file)
index 0000000..6495b96
--- /dev/null
@@ -0,0 +1,74 @@
+/*-\r
+ * ========================LICENSE_START=================================\r
+ * O-RAN-SC\r
+ *\r
+ * Copyright (C) 2024: OpenInfra Foundation Europe\r
+ *\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ *      http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ========================LICENSE_END===================================\r
+ */\r
+package com.demo.kafkaproducer.controller;\r
+\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+import org.springframework.beans.factory.annotation.Autowired;\r
+import org.springframework.boot.context.properties.EnableConfigurationProperties;\r
+import org.springframework.http.HttpEntity;\r
+import org.springframework.http.HttpHeaders;\r
+import org.springframework.http.HttpStatus;\r
+import org.springframework.http.MediaType;\r
+import org.springframework.http.ResponseEntity;\r
+import org.springframework.web.bind.annotation.DeleteMapping;\r
+import org.springframework.web.bind.annotation.GetMapping;\r
+import org.springframework.web.bind.annotation.PostMapping;\r
+import org.springframework.web.bind.annotation.RequestBody;\r
+import org.springframework.web.bind.annotation.RequestMapping;\r
+import org.springframework.web.bind.annotation.RestController;\r
+import org.springframework.web.client.RestTemplate;\r
+\r
+@EnableConfigurationProperties\r
+@RestController\r
+@RequestMapping(produces = "application/json")\r
+public class CallbacksController {\r
+    private final Logger logger = LoggerFactory.getLogger(CallbacksController.class);\r
+\r
+    private final RestTemplate restTemplate = new RestTemplate();\r
+\r
+    @Autowired\r
+    KafkaController kafkaController;\r
+\r
+    @GetMapping("/health-check") // defined in ICS ProducerSupervisionCallbackUrl\r
+    public ResponseEntity<String> getHealthCheck() {\r
+        logger.info("Post Info Type Status");\r
+        return new ResponseEntity<>("Ok", HttpStatus.OK);\r
+    }\r
+\r
+    @PostMapping("/info-job") // defined in ICS JobCallbackUrl\r
+    public ResponseEntity<String> startJob(@RequestBody String requestBody) { // defined in ICS CallbackBody\r
+        logger.info("Start Job");\r
+        kafkaController.postMessageMono(requestBody);\r
+        return new ResponseEntity<>("Ok", HttpStatus.OK);\r
+    }\r
+\r
+    @DeleteMapping("/info-job/{jobID}") // defined in ICS JobCallbackUrl\r
+    public ResponseEntity<String> stopJob() {\r
+        logger.info("Stop Job");\r
+        // Call the shutdown endpoint\r
+        HttpHeaders headers = new HttpHeaders();\r
+        headers.setContentType(MediaType.APPLICATION_JSON);\r
+        HttpEntity<String> entity = new HttpEntity<String>(null, headers);\r
+        String shutdownUrl = "http://localhost:8080" + "/actuator/shutdown";\r
+        return restTemplate.postForEntity(shutdownUrl, entity, String.class);\r
+        //return new ResponseEntity<>("Ok", HttpStatus.OK);\r
+    }\r
+}
\ No newline at end of file
diff --git a/sample-services/ics-simple-producer-consumer/kafka-producer/src/main/java/com/demo/kafkaproducer/controller/KafkaController.java b/sample-services/ics-simple-producer-consumer/kafka-producer/src/main/java/com/demo/kafkaproducer/controller/KafkaController.java
new file mode 100644 (file)
index 0000000..723592c
--- /dev/null
@@ -0,0 +1,56 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+package com.demo.kafkaproducer.controller;
+
+import java.lang.invoke.MethodHandles;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import reactor.core.publisher.Mono;
+
+@EnableConfigurationProperties
+@RestController
+@RequestMapping()
+public class KafkaController {
+    private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+    @Value("${spring.kafka.topic}")
+    private String topic;
+
+    @Autowired
+    private KafkaTemplate<String,String> kafkaTemplate;
+
+    @GetMapping("/publish/{name}")
+    public Mono<String> postMessageMono(@PathVariable("name") final String name) {
+        return Mono.fromFuture(kafkaTemplate.send(topic, name))
+                .doOnSuccess(result -> logger.info("Published: " + name))
+                .thenReturn("Message Published Successfully")
+                .doOnError(err -> logger.error("Unable to Publish: " + name));
+    }
+
+}
diff --git a/sample-services/ics-simple-producer-consumer/kafka-producer/src/main/java/com/demo/kafkaproducer/model/Job.java b/sample-services/ics-simple-producer-consumer/kafka-producer/src/main/java/com/demo/kafkaproducer/model/Job.java
new file mode 100644 (file)
index 0000000..d640be0
--- /dev/null
@@ -0,0 +1,29 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+package com.demo.kafkaproducer.model;
+
+import lombok.Data;
+import lombok.ToString;
+
+@Data
+@ToString
+public class Job {
+    private Object job;
+}
diff --git a/sample-services/ics-simple-producer-consumer/kafka-producer/src/main/java/com/demo/kafkaproducer/service/JobService.java b/sample-services/ics-simple-producer-consumer/kafka-producer/src/main/java/com/demo/kafkaproducer/service/JobService.java
new file mode 100644 (file)
index 0000000..80f69da
--- /dev/null
@@ -0,0 +1,36 @@
+/*-\r
+ * ========================LICENSE_START=================================\r
+ * O-RAN-SC\r
+ *\r
+ * Copyright (C) 2024: OpenInfra Foundation Europe\r
+ *\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ *      http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ========================LICENSE_END===================================\r
+ */\r
+package com.demo.kafkaproducer.service;\r
+\r
+import java.util.ArrayList;\r
+import java.util.List;\r
+\r
+import org.springframework.stereotype.Service;\r
+import com.demo.kafkaproducer.model.Job;\r
+\r
+import lombok.Data;\r
+\r
+@Data\r
+@Service\r
+public class JobService {\r
+\r
+    public List<Job> jobList = new ArrayList<>();\r
+\r
+}\r
diff --git a/sample-services/ics-simple-producer-consumer/kafka-producer/src/main/resources/application.yaml b/sample-services/ics-simple-producer-consumer/kafka-producer/src/main/resources/application.yaml
new file mode 100644 (file)
index 0000000..5bc0e85
--- /dev/null
@@ -0,0 +1,39 @@
+#  ========================LICENSE_START=================================\r
+#  O-RAN-SC\r
+#\r
+#  Copyright (C) 2024: OpenInfra Foundation Europe\r
+#  ========================================================================\r
+#  Licensed under the Apache License, Version 2.0 (the "License");\r
+#  you may not use this file except in compliance with the License.\r
+#  You may obtain a copy of the License at\r
+#\r
+#       http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+#  Unless required by applicable law or agreed to in writing, software\r
+#  distributed under the License is distributed on an "AS IS" BASIS,\r
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+#  See the License for the specific language governing permissions and\r
+#  limitations under the License.\r
+#  ============LICENSE_END=================================================\r
+server:\r
+  port: 8080\r
+spring:\r
+  application:\r
+    name: producer\r
+  kafka:\r
+    server: ${SPRING_KAFKA_SERVER:localhost:9092}\r
+    topic: mytopic\r
+    group: group-1\r
+    producer:\r
+      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer\r
+management:\r
+  endpoints:\r
+    web:\r
+      exposure:\r
+        include: "info, health, shutdown"\r
+  endpoint:\r
+    shutdown:\r
+      enabled: true\r
+endpoints:\r
+  shutdown:\r
+    enabled: true
\ No newline at end of file
diff --git a/sample-services/ics-simple-producer-consumer/kafka-producer/src/test/java/com/demo/kafkaproducer/CallbacksControllerTest.java b/sample-services/ics-simple-producer-consumer/kafka-producer/src/test/java/com/demo/kafkaproducer/CallbacksControllerTest.java
new file mode 100644 (file)
index 0000000..94a6ce9
--- /dev/null
@@ -0,0 +1,92 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+package com.demo.kafkaproducer;
+
+import static org.mockito.Mockito.*;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.client.RestTemplate;
+
+import com.demo.kafkaproducer.controller.CallbacksController;
+import com.demo.kafkaproducer.controller.KafkaController;
+
+import reactor.core.publisher.Mono;
+import static org.assertj.core.api.Assertions.assertThat;
+
+@SpringBootTest
+public class CallbacksControllerTest {
+
+    @Mock
+    private RestTemplate restTemplate;
+
+    @Mock
+    private KafkaController kafkaController;
+
+
+    @InjectMocks
+    private CallbacksController callbacksController;
+
+    @BeforeEach
+    public void setup() {
+        MockitoAnnotations.openMocks(this);
+    }
+
+    @Test
+    public void testGetHealthCheck() {
+        ResponseEntity<String> response = callbacksController.getHealthCheck();
+
+        assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
+        assertThat(response.getBody()).isEqualTo("Ok");
+    }
+
+    @Test
+    public void testStartJob() {
+        String requestBody = "testJob";
+        when(kafkaController.postMessageMono(requestBody)).thenReturn(Mono.just("Message Published Successfully"));
+
+        ResponseEntity<String> response = callbacksController.startJob(requestBody);
+
+        assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
+        assertThat(response.getBody()).isEqualTo("Ok");
+        verify(kafkaController, times(1)).postMessageMono(requestBody);
+    }
+
+    @Test
+    public void testActuatorStopJob() {
+        String shutdownUrl = "http://localhost:8080/actuator/shutdown";
+        HttpHeaders headers = new HttpHeaders();
+        headers.setContentType(MediaType.APPLICATION_JSON);
+        HttpEntity<String> entity = new HttpEntity<>(null, headers);
+
+        ResponseEntity<String> restTemplateResponse = new ResponseEntity<>("Shutdown successful", HttpStatus.OK);
+        when(restTemplate.postForEntity(shutdownUrl, entity, String.class)).thenReturn(restTemplateResponse);
+
+    }
+}
diff --git a/sample-services/ics-simple-producer-consumer/kafka-producer/src/test/java/com/demo/kafkaproducer/KafkaProducerApplicationTests.java b/sample-services/ics-simple-producer-consumer/kafka-producer/src/test/java/com/demo/kafkaproducer/KafkaProducerApplicationTests.java
new file mode 100644 (file)
index 0000000..97b2775
--- /dev/null
@@ -0,0 +1,32 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+package com.demo.kafkaproducer;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+
+@SpringBootTest
+class KafkaProducerApplicationTests {
+
+    @Test
+    void contextLoads() {
+    }
+
+}
diff --git a/sample-services/ics-simple-producer-consumer/kafka-producer/src/test/java/com/demo/kafkaproducer/KafkaProducerControllerTest.java b/sample-services/ics-simple-producer-consumer/kafka-producer/src/test/java/com/demo/kafkaproducer/KafkaProducerControllerTest.java
new file mode 100644 (file)
index 0000000..82201a1
--- /dev/null
@@ -0,0 +1,97 @@
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+package com.demo.kafkaproducer;
+
+import static org.mockito.Mockito.*;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.test.util.ReflectionTestUtils;
+
+import com.demo.kafkaproducer.controller.KafkaController;
+
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+import org.springframework.boot.test.context.SpringBootTest;
+
+@SpringBootTest
+public class KafkaProducerControllerTest {
+
+    @Mock
+    private KafkaTemplate<String, String> kafkaTemplate;
+
+    @InjectMocks
+    private KafkaController kafkaController;
+
+    private String topic = "test-topic";
+
+    @BeforeEach
+    public void setup() {
+        MockitoAnnotations.openMocks(this);
+        // Set the value of the topic field (since @Value won't be processed in unit tests)
+        ReflectionTestUtils.setField(kafkaController, "topic", topic);
+    }
+
+    @Test
+    public void testPostMessageMono_Success() {
+        // Arrange
+        String name = "testMessage";
+        CompletableFuture<SendResult<String, String>> future = CompletableFuture.completedFuture(null);
+
+        when(kafkaTemplate.send(topic, name)).thenReturn(future);
+
+        // Act
+        Mono<String> result = kafkaController.postMessageMono(name);
+
+        // Assert
+        StepVerifier.create(result)
+                .expectNext("Message Published Successfully")
+                .verifyComplete();
+
+        verify(kafkaTemplate, times(1)).send(topic, name);
+    }
+
+    @Test
+    public void testPostMessageMono_Failure() {
+        // Arrange
+        String name = "testMessage";
+        CompletableFuture<SendResult<String, String>> future = CompletableFuture.failedFuture(new RuntimeException("Kafka error"));
+
+        when(kafkaTemplate.send(topic, name)).thenReturn(future);
+
+        // Act
+        Mono<String> result = kafkaController.postMessageMono(name);
+
+        // Assert
+        StepVerifier.create(result)
+                .expectError(RuntimeException.class)
+                .verify();
+
+        verify(kafkaTemplate, times(1)).send(topic, name);
+    }
+
+}