ICS sample producer and consumer
[nonrtric.git] / sample-services / ics-producer-consumer / producer / src / test / java / com / demo / producer / SimpleProducerTest.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  *
5  * Copyright (C) 2024: OpenInfra Foundation Europe
6  *
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package com.demo.producer;
22
23 import com.demo.producer.producer.SimpleProducer;
24 import org.apache.kafka.clients.producer.KafkaProducer;
25
26 import com.demo.producer.messages.KafkaMessageHandler;
27 import org.apache.kafka.clients.producer.ProducerRecord;
28 import org.junit.jupiter.api.AfterEach;
29 import org.junit.jupiter.api.BeforeEach;
30 import org.junit.jupiter.api.Test;
31 import org.mockito.InjectMocks;
32 import org.mockito.Mock;
33 import org.mockito.MockitoAnnotations;
34 import org.springframework.beans.factory.annotation.Autowired;
35
36 import static org.mockito.ArgumentMatchers.any;
37 import static org.mockito.Mockito.*;
38
39 class SimpleProducerTest {
40
41     private final int wait = 1000;
42     private final String topicName = "testTopic";
43
44     @Mock
45     private KafkaProducer<String, String> kafkaProducer;
46
47     @InjectMocks
48     @Autowired
49     private SimpleProducer simpleProducer;
50
51     private AutoCloseable closable;
52
53     @BeforeEach
54     void setUp() {
55         closable = MockitoAnnotations.openMocks(this);
56     }
57
58     @AfterEach
59     public void close() throws Exception {
60         closable.close();
61     }
62
63     @Test
64     @SuppressWarnings("unchecked") //sending only Strings
65     void testRun() throws Exception {
66         int numberOfMessages = 10;
67         KafkaMessageHandler callback = mock(KafkaMessageHandler.class);
68
69         simpleProducer.run(topicName, numberOfMessages, callback);
70
71         verify(kafkaProducer, times(numberOfMessages)).send(any(ProducerRecord.class));
72         verify(kafkaProducer, times(1)).close();
73     }
74
75     @Test
76     @SuppressWarnings("unchecked") //sending only Strings
77     void testRunAlways() throws Exception {
78         KafkaMessageHandler callback = mock(KafkaMessageHandler.class);
79         simpleProducer.setTIME(wait);
80         // Mocking behavior to break out of the loop after a few iterations
81         doAnswer(invocation -> {
82             simpleProducer.shutdown();
83             return null;
84         }).when(kafkaProducer).send(any(ProducerRecord.class));
85
86         // Invoking runAlways() in a separate thread to avoid an infinite loop
87         Thread thread = new Thread(() -> {
88             try {
89                 simpleProducer.runAlways(topicName, callback);
90             } catch (Exception e) {
91             }
92         });
93         thread.start();
94
95         // Let the thread execute for some time (e.g., 1 second)
96         Thread.sleep(wait);
97
98         // Interrupting the thread to stop the infinite loop
99         thread.interrupt();
100
101         verify(kafkaProducer, atLeastOnce()).send(any(ProducerRecord.class));
102         verify(kafkaProducer, times(1)).close();
103     }
104
105 }