--- /dev/null
+HELP.md
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**/target/
+!**/src/test/**/target/
+
+### STS ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### IntelliJ IDEA ###
+.idea
+*.iws
+*.iml
+*.ipr
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/
--- /dev/null
+#==================================================================================
+# Copyright (C) 2024: OpenInfra Foundation Europe
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# This source code is part of the near-RT RIC (RAN Intelligent Controller)
+# platform project (RICP).
+#==================================================================================
+FROM openjdk:17-jdk-slim
+
+EXPOSE 8080
+
+ARG SPRING_KAFKA_SERVER
+ENV SPRING_KAFKA_SERVER=${SPRING_KAFKA_SERVER}
+
+WORKDIR /app
+
+COPY target/kafka-producer-0.0.1-SNAPSHOT.jar /app/producer-0.0.1.jar
+
+CMD ["java", "-jar", "producer-0.0.1.jar"]
--- /dev/null
+# ========================LICENSE_START=================================\r
+# O-RAN-SC\r
+#\r
+# Copyright (C) 2024: OpenInfra Foundation Europe\r
+# ========================================================================\r
+# Licensed under the Apache License, Version 2.0 (the "License");\r
+# you may not use this file except in compliance with the License.\r
+# You may obtain a copy of the License at\r
+#\r
+# http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+# Unless required by applicable law or agreed to in writing, software\r
+# distributed under the License is distributed on an "AS IS" BASIS,\r
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+# See the License for the specific language governing permissions and\r
+# limitations under the License.\r
+# ============LICENSE_END=================================================\r
+\r
+# The Jenkins job requires a tag to build the Docker image.\r
+# By default this file is in the docker build directory,\r
+# but the location can configured in the JJB template.\r
+---\r
+tag: 0.0.1
\ No newline at end of file
--- /dev/null
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-parent</artifactId>
+ <version>3.3.2</version>
+ <relativePath/>
+ </parent>
+ <groupId>com.demo</groupId>
+ <artifactId>kafka-producer</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ <name>kafka-producer</name>
+ <description>Demo project for Spring Boot and Kafka producer</description>
+
+ <properties>
+ <java.version>17</java.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-web</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-junit-jupiter</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.kafka</groupId>
+ <artifactId>spring-kafka-test</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.projectreactor</groupId>
+ <artifactId>reactor-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-starter-actuator</artifactId>
+ </dependency>
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.springframework.boot</groupId>
+ <artifactId>spring-boot-maven-plugin</artifactId>
+ </plugin>
+ <plugin>
+ <groupId>io.fabric8</groupId>
+ <artifactId>docker-maven-plugin</artifactId>
+ <version>0.45.0</version>
+ <inherited>false</inherited>
+ <executions>
+ <execution>
+ <id>generate-producer-image</id>
+ <phase>package</phase>
+ <goals>
+ <goal>build</goal>
+ </goals>
+ <configuration>
+ <pullRegistry>${env.CONTAINER_PULL_REGISTRY}</pullRegistry>
+ <images>
+ <image>
+ <name>
+ o-ran-sc/nonrtric-simple-icsproducer:latest
+ </name>
+ <build>
+ <cleanup>try</cleanup>
+ <contextDir>${project.basedir}</contextDir>
+ <dockerFile>Dockerfile</dockerFile>
+ <args>
+ <JAR>${project.build.finalName}.jar</JAR>
+ </args>
+ <tags>
+ <tag>latest</tag>
+ </tags>
+ </build>
+ </image>
+ </images>
+ </configuration>
+ </execution>
+ <execution>
+ <id>push-producer-image</id>
+ <goals>
+ <goal>build</goal>
+ <goal>push</goal>
+ </goals>
+ <configuration>
+ <pullRegistry>${env.CONTAINER_PULL_REGISTRY}</pullRegistry>
+ <pushRegistry>${env.CONTAINER_PUSH_REGISTRY}</pushRegistry>
+ <images>
+ <image>
+ <name>
+ o-ran-sc/nonrtric-simple-icsproducer:latest
+ </name>
+ <build>
+ <contextDir>${project.basedir}</contextDir>
+ <dockerFile>Dockerfile</dockerFile>
+ <args>
+ <JAR>${project.build.finalName}.jar</JAR>
+ </args>
+ <tags>
+ <tag>${project.version}</tag>
+ <tag>latest</tag>
+ </tags>
+ </build>
+ </image>
+ </images>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+package com.demo.kafkaproducer;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class KafkaProducerApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(KafkaProducerApplication.class, args);
+ }
+
+}
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+package com.demo.kafkaproducer.config;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Configuration
+@EnableConfigurationProperties
+public class KafkaConfiguration {
+
+ @Value("${spring.kafka.server}")
+ private String server;
+
+ @Bean
+ public Map<String, Object> producerConfigs() {
+ Map<String, Object> props = new HashMap<>();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, server);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ return props;
+ }
+
+ @Bean
+ public ProducerFactory<String, String> producerFactory() {
+ return new DefaultKafkaProducerFactory<>(producerConfigs());
+ }
+
+ @Bean
+ public KafkaTemplate<String, String> kafkaTemplate() {
+ return new KafkaTemplate<String, String>(producerFactory());
+ }
+
+}
--- /dev/null
+/*-\r
+ * ========================LICENSE_START=================================\r
+ * O-RAN-SC\r
+ *\r
+ * Copyright (C) 2024: OpenInfra Foundation Europe\r
+ *\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ========================LICENSE_END===================================\r
+ */\r
+package com.demo.kafkaproducer.controller;\r
+\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+import org.springframework.beans.factory.annotation.Autowired;\r
+import org.springframework.boot.context.properties.EnableConfigurationProperties;\r
+import org.springframework.http.HttpEntity;\r
+import org.springframework.http.HttpHeaders;\r
+import org.springframework.http.HttpStatus;\r
+import org.springframework.http.MediaType;\r
+import org.springframework.http.ResponseEntity;\r
+import org.springframework.web.bind.annotation.DeleteMapping;\r
+import org.springframework.web.bind.annotation.GetMapping;\r
+import org.springframework.web.bind.annotation.PostMapping;\r
+import org.springframework.web.bind.annotation.RequestBody;\r
+import org.springframework.web.bind.annotation.RequestMapping;\r
+import org.springframework.web.bind.annotation.RestController;\r
+import org.springframework.web.client.RestTemplate;\r
+\r
+@EnableConfigurationProperties\r
+@RestController\r
+@RequestMapping(produces = "application/json")\r
+public class CallbacksController {\r
+ private final Logger logger = LoggerFactory.getLogger(CallbacksController.class);\r
+\r
+ private final RestTemplate restTemplate = new RestTemplate();\r
+\r
+ @Autowired\r
+ KafkaController kafkaController;\r
+\r
+ @GetMapping("/health-check") // defined in ICS ProducerSupervisionCallbackUrl\r
+ public ResponseEntity<String> getHealthCheck() {\r
+ logger.info("Post Info Type Status");\r
+ return new ResponseEntity<>("Ok", HttpStatus.OK);\r
+ }\r
+\r
+ @PostMapping("/info-job") // defined in ICS JobCallbackUrl\r
+ public ResponseEntity<String> startJob(@RequestBody String requestBody) { // defined in ICS CallbackBody\r
+ logger.info("Start Job");\r
+ kafkaController.postMessageMono(requestBody);\r
+ return new ResponseEntity<>("Ok", HttpStatus.OK);\r
+ }\r
+\r
+ @DeleteMapping("/info-job/{jobID}") // defined in ICS JobCallbackUrl\r
+ public ResponseEntity<String> stopJob() {\r
+ logger.info("Stop Job");\r
+ // Call the shutdown endpoint\r
+ HttpHeaders headers = new HttpHeaders();\r
+ headers.setContentType(MediaType.APPLICATION_JSON);\r
+ HttpEntity<String> entity = new HttpEntity<String>(null, headers);\r
+ String shutdownUrl = "http://localhost:8080" + "/actuator/shutdown";\r
+ return restTemplate.postForEntity(shutdownUrl, entity, String.class);\r
+ //return new ResponseEntity<>("Ok", HttpStatus.OK);\r
+ }\r
+}
\ No newline at end of file
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+package com.demo.kafkaproducer.controller;
+
+import java.lang.invoke.MethodHandles;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import reactor.core.publisher.Mono;
+
+@EnableConfigurationProperties
+@RestController
+@RequestMapping()
+public class KafkaController {
+ private final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
+
+ @Value("${spring.kafka.topic}")
+ private String topic;
+
+ @Autowired
+ private KafkaTemplate<String,String> kafkaTemplate;
+
+ @GetMapping("/publish/{name}")
+ public Mono<String> postMessageMono(@PathVariable("name") final String name) {
+ return Mono.fromFuture(kafkaTemplate.send(topic, name))
+ .doOnSuccess(result -> logger.info("Published: " + name))
+ .thenReturn("Message Published Successfully")
+ .doOnError(err -> logger.error("Unable to Publish: " + name));
+ }
+
+}
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+package com.demo.kafkaproducer.model;
+
+import lombok.Data;
+import lombok.ToString;
+
+@Data
+@ToString
+public class Job {
+ private Object job;
+}
--- /dev/null
+/*-\r
+ * ========================LICENSE_START=================================\r
+ * O-RAN-SC\r
+ *\r
+ * Copyright (C) 2024: OpenInfra Foundation Europe\r
+ *\r
+ * Licensed under the Apache License, Version 2.0 (the "License");\r
+ * you may not use this file except in compliance with the License.\r
+ * You may obtain a copy of the License at\r
+ *\r
+ * http://www.apache.org/licenses/LICENSE-2.0\r
+ *\r
+ * Unless required by applicable law or agreed to in writing, software\r
+ * distributed under the License is distributed on an "AS IS" BASIS,\r
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+ * See the License for the specific language governing permissions and\r
+ * limitations under the License.\r
+ * ========================LICENSE_END===================================\r
+ */\r
+package com.demo.kafkaproducer.service;\r
+\r
+import java.util.ArrayList;\r
+import java.util.List;\r
+\r
+import org.springframework.stereotype.Service;\r
+import com.demo.kafkaproducer.model.Job;\r
+\r
+import lombok.Data;\r
+\r
+@Data\r
+@Service\r
+public class JobService {\r
+\r
+ public List<Job> jobList = new ArrayList<>();\r
+\r
+}\r
--- /dev/null
+# ========================LICENSE_START=================================\r
+# O-RAN-SC\r
+#\r
+# Copyright (C) 2024: OpenInfra Foundation Europe\r
+# ========================================================================\r
+# Licensed under the Apache License, Version 2.0 (the "License");\r
+# you may not use this file except in compliance with the License.\r
+# You may obtain a copy of the License at\r
+#\r
+# http://www.apache.org/licenses/LICENSE-2.0\r
+#\r
+# Unless required by applicable law or agreed to in writing, software\r
+# distributed under the License is distributed on an "AS IS" BASIS,\r
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r
+# See the License for the specific language governing permissions and\r
+# limitations under the License.\r
+# ============LICENSE_END=================================================\r
+server:\r
+ port: 8080\r
+spring:\r
+ application:\r
+ name: producer\r
+ kafka:\r
+ server: ${SPRING_KAFKA_SERVER:localhost:9092}\r
+ topic: mytopic\r
+ group: group-1\r
+ producer:\r
+ value-serializer: org.springframework.kafka.support.serializer.JsonSerializer\r
+management:\r
+ endpoints:\r
+ web:\r
+ exposure:\r
+ include: "info, health, shutdown"\r
+ endpoint:\r
+ shutdown:\r
+ enabled: true\r
+endpoints:\r
+ shutdown:\r
+ enabled: true
\ No newline at end of file
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+package com.demo.kafkaproducer;
+
+import static org.mockito.Mockito.*;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.client.RestTemplate;
+
+import com.demo.kafkaproducer.controller.CallbacksController;
+import com.demo.kafkaproducer.controller.KafkaController;
+
+import reactor.core.publisher.Mono;
+import static org.assertj.core.api.Assertions.assertThat;
+
+@SpringBootTest
+public class CallbacksControllerTest {
+
+ @Mock
+ private RestTemplate restTemplate;
+
+ @Mock
+ private KafkaController kafkaController;
+
+
+ @InjectMocks
+ private CallbacksController callbacksController;
+
+ @BeforeEach
+ public void setup() {
+ MockitoAnnotations.openMocks(this);
+ }
+
+ @Test
+ public void testGetHealthCheck() {
+ ResponseEntity<String> response = callbacksController.getHealthCheck();
+
+ assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
+ assertThat(response.getBody()).isEqualTo("Ok");
+ }
+
+ @Test
+ public void testStartJob() {
+ String requestBody = "testJob";
+ when(kafkaController.postMessageMono(requestBody)).thenReturn(Mono.just("Message Published Successfully"));
+
+ ResponseEntity<String> response = callbacksController.startJob(requestBody);
+
+ assertThat(response.getStatusCode()).isEqualTo(HttpStatus.OK);
+ assertThat(response.getBody()).isEqualTo("Ok");
+ verify(kafkaController, times(1)).postMessageMono(requestBody);
+ }
+
+ @Test
+ public void testActuatorStopJob() {
+ String shutdownUrl = "http://localhost:8080/actuator/shutdown";
+ HttpHeaders headers = new HttpHeaders();
+ headers.setContentType(MediaType.APPLICATION_JSON);
+ HttpEntity<String> entity = new HttpEntity<>(null, headers);
+
+ ResponseEntity<String> restTemplateResponse = new ResponseEntity<>("Shutdown successful", HttpStatus.OK);
+ when(restTemplate.postForEntity(shutdownUrl, entity, String.class)).thenReturn(restTemplateResponse);
+
+ }
+}
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+package com.demo.kafkaproducer;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+
+@SpringBootTest
+class KafkaProducerApplicationTests {
+
+ @Test
+ void contextLoads() {
+ }
+
+}
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ *
+ * Copyright (C) 2024: OpenInfra Foundation Europe
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+package com.demo.kafkaproducer;
+
+import static org.mockito.Mockito.*;
+
+import java.util.concurrent.CompletableFuture;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.test.util.ReflectionTestUtils;
+
+import com.demo.kafkaproducer.controller.KafkaController;
+
+import reactor.core.publisher.Mono;
+import reactor.test.StepVerifier;
+import org.springframework.boot.test.context.SpringBootTest;
+
+@SpringBootTest
+public class KafkaProducerControllerTest {
+
+ @Mock
+ private KafkaTemplate<String, String> kafkaTemplate;
+
+ @InjectMocks
+ private KafkaController kafkaController;
+
+ private String topic = "test-topic";
+
+ @BeforeEach
+ public void setup() {
+ MockitoAnnotations.openMocks(this);
+ // Set the value of the topic field (since @Value won't be processed in unit tests)
+ ReflectionTestUtils.setField(kafkaController, "topic", topic);
+ }
+
+ @Test
+ public void testPostMessageMono_Success() {
+ // Arrange
+ String name = "testMessage";
+ CompletableFuture<SendResult<String, String>> future = CompletableFuture.completedFuture(null);
+
+ when(kafkaTemplate.send(topic, name)).thenReturn(future);
+
+ // Act
+ Mono<String> result = kafkaController.postMessageMono(name);
+
+ // Assert
+ StepVerifier.create(result)
+ .expectNext("Message Published Successfully")
+ .verifyComplete();
+
+ verify(kafkaTemplate, times(1)).send(topic, name);
+ }
+
+ @Test
+ public void testPostMessageMono_Failure() {
+ // Arrange
+ String name = "testMessage";
+ CompletableFuture<SendResult<String, String>> future = CompletableFuture.failedFuture(new RuntimeException("Kafka error"));
+
+ when(kafkaTemplate.send(topic, name)).thenReturn(future);
+
+ // Act
+ Mono<String> result = kafkaController.postMessageMono(name);
+
+ // Assert
+ StepVerifier.create(result)
+ .expectError(RuntimeException.class)
+ .verify();
+
+ verify(kafkaTemplate, times(1)).send(topic, name);
+ }
+
+}