+ public void testMessageParsing() {
+ String message = dmaapInputMessage(Operation.DELETE);
+ System.out.println(message);
+ DmaapRequestMessage parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class);
+ assertTrue(parsedMessage != null);
+ assertFalse(parsedMessage.payload().isPresent());
+
+ message = dmaapInputMessage(Operation.PUT);
+ System.out.println(message);
+ parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class);
+ assertTrue(parsedMessage != null);
+ assertTrue(parsedMessage.payload().isPresent());
+ }
+
+ @Test
+ public void successfulDelete() throws IOException {
+ doReturn(Mono.just("OK")).when(agentClient).delete(anyString());
+ doReturn(1).when(dmaapClient).send(anyString());
+ doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
+
+ String message = dmaapInputMessage(Operation.DELETE);
+
+ StepVerifier //
+ .create(testedObject.createTask(message)) //
+ .expectSubscription() //
+ .expectNext("OK") //
+ .verifyComplete(); //
+
+ verify(agentClient, times(1)).delete(URL);
+ verifyNoMoreInteractions(agentClient);
+
+ verify(dmaapClient, times(1)).send(anyString());
+ verify(dmaapClient, times(1)).sendBatchWithResponse();
+ verifyNoMoreInteractions(dmaapClient);
+ }
+
+ @Test
+ public void successfulGet() throws IOException {
+ doReturn(Mono.just("OK")).when(agentClient).get(anyString());
+ doReturn(1).when(dmaapClient).send(anyString());
+ doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
+
+ StepVerifier //
+ .create(testedObject.createTask(dmaapInputMessage(Operation.GET))) //
+ .expectSubscription() //
+ .expectNext("OK") //
+ .verifyComplete(); //
+
+ verify(agentClient, times(1)).get(URL);
+ verifyNoMoreInteractions(agentClient);
+
+ verify(dmaapClient, times(1)).send(anyString());
+ verify(dmaapClient, times(1)).sendBatchWithResponse();
+ verifyNoMoreInteractions(dmaapClient);
+ }
+
+ @Test
+ public void successfulPut() throws IOException {
+ doReturn(Mono.just("OK")).when(agentClient).put(anyString(), anyString());
+ doReturn(1).when(dmaapClient).send(anyString());
+ doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
+
+ StepVerifier //
+ .create(testedObject.createTask(dmaapInputMessage(Operation.PUT))) //
+ .expectSubscription() //
+ .expectNext("OK") //
+ .verifyComplete(); //
+
+ verify(agentClient, times(1)).put(URL, payloadAsString());
+ verifyNoMoreInteractions(agentClient);
+
+ verify(dmaapClient, times(1)).send(anyString());
+ verify(dmaapClient, times(1)).sendBatchWithResponse();
+ verifyNoMoreInteractions(dmaapClient);
+ }
+
+ @Test
+ public void successfulPost() throws IOException {
+ doReturn(Mono.just("OK")).when(agentClient).post(anyString(), anyString());