2 * ========================LICENSE_START=================================
5 * Copyright (C) 2024: OpenInfra Foundation Europe
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ========================LICENSE_END===================================
21 package com.demo.producer;
23 import com.demo.producer.producer.SimpleProducer;
24 import org.apache.kafka.clients.producer.KafkaProducer;
26 import com.demo.producer.messages.KafkaMessageHandler;
27 import org.apache.kafka.clients.producer.ProducerRecord;
28 import org.junit.jupiter.api.AfterEach;
29 import org.junit.jupiter.api.BeforeEach;
30 import org.junit.jupiter.api.Test;
31 import org.mockito.InjectMocks;
32 import org.mockito.Mock;
33 import org.mockito.MockitoAnnotations;
34 import org.springframework.beans.factory.annotation.Autowired;
36 import static org.mockito.ArgumentMatchers.any;
37 import static org.mockito.Mockito.*;
39 class SimpleProducerTest {
41 private final int wait = 1000;
42 private final String topicName = "testTopic";
45 private KafkaProducer<String, String> kafkaProducer;
49 private SimpleProducer simpleProducer;
51 private AutoCloseable closable;
55 closable = MockitoAnnotations.openMocks(this);
59 public void close() throws Exception {
64 @SuppressWarnings("unchecked") //sending only Strings
65 void testRun() throws Exception {
66 int numberOfMessages = 10;
67 KafkaMessageHandler callback = mock(KafkaMessageHandler.class);
69 simpleProducer.run(topicName, numberOfMessages, callback);
71 verify(kafkaProducer, times(numberOfMessages)).send(any(ProducerRecord.class));
72 verify(kafkaProducer, times(1)).close();
76 @SuppressWarnings("unchecked") //sending only Strings
77 void testRunAlways() throws Exception {
78 KafkaMessageHandler callback = mock(KafkaMessageHandler.class);
79 simpleProducer.setTIME(wait);
80 // Mocking behavior to break out of the loop after a few iterations
81 doAnswer(invocation -> {
82 simpleProducer.shutdown();
84 }).when(kafkaProducer).send(any(ProducerRecord.class));
86 // Invoking runAlways() in a separate thread to avoid an infinite loop
87 Thread thread = new Thread(() -> {
89 simpleProducer.runAlways(topicName, callback);
90 } catch (Exception e) {
95 // Let the thread execute for some time (e.g., 1 second)
98 // Interrupting the thread to stop the infinite loop
101 verify(kafkaProducer, atLeastOnce()).send(any(ProducerRecord.class));
102 verify(kafkaProducer, times(1)).close();