+ StepVerifier //
+ .create(testedObject.createTask(dmaapInputMessage(Operation.GET))) //
+ .expectSubscription() //
+ .expectNext("OK") //
+ .verifyComplete(); //
+
+ verify(agentClient, times(1)).get(URL);
+ verifyNoMoreInteractions(agentClient);
+
+ verify(dmaapClient, times(1)).send(anyString());
+ verify(dmaapClient, times(1)).sendBatchWithResponse();
+ verifyNoMoreInteractions(dmaapClient);
+ }
+
+ @Test
+ public void successfulPut() throws IOException {
+ doReturn(Mono.just("OK")).when(agentClient).put(anyString(), anyString());
+ doReturn(1).when(dmaapClient).send(anyString());
+ doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
+
+ StepVerifier //
+ .create(testedObject.createTask(dmaapInputMessage(Operation.PUT))) //
+ .expectSubscription() //
+ .expectNext("OK") //
+ .verifyComplete(); //
+
+ verify(agentClient, times(1)).put(URL, payloadAsString());
+ verifyNoMoreInteractions(agentClient);
+
+ verify(dmaapClient, times(1)).send(anyString());
+ verify(dmaapClient, times(1)).sendBatchWithResponse();
+ verifyNoMoreInteractions(dmaapClient);
+ }
+
+ @Test
+ public void successfulPost() throws IOException {
+ doReturn(Mono.just("OK")).when(agentClient).post(anyString(), anyString());
+ doReturn(1).when(dmaapClient).send(anyString());
+ doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
+
+ StepVerifier //
+ .create(testedObject.createTask(dmaapInputMessage(Operation.POST))) //
+ .expectSubscription() //
+ .expectNext("OK") //
+ .verifyComplete(); //
+
+ verify(agentClient, times(1)).post(URL, payloadAsString());
+ verifyNoMoreInteractions(agentClient);
+
+ verify(dmaapClient, times(1)).send(anyString());
+ verify(dmaapClient, times(1)).sendBatchWithResponse();
+ verifyNoMoreInteractions(dmaapClient);
+ }
+
+ @Test
+ public void errorCase() throws IOException {
+ doReturn(Mono.error(new Exception("Refused"))).when(agentClient).put(anyString(), any());
+ doReturn(1).when(dmaapClient).send(anyString());
+ doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();