import org.onap.dmaap.mr.client.MRBatchingPublisher;
import org.oransc.policyagent.clients.AsyncRestClient;
-import org.oransc.policyagent.dmaap.DmaapRequestMessage.Operation;
+import org.oransc.policyagent.exceptions.ServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
DmaapRequestMessage dmaapRequestMessage = gson.fromJson(msg, ImmutableDmaapRequestMessage.class);
return this.invokePolicyAgent(dmaapRequestMessage) //
- .onErrorResume(t -> handleAgentCallError(t, dmaapRequestMessage)) //
+ .onErrorResume(t -> handleAgentCallError(t, msg, dmaapRequestMessage)) //
.flatMap(response -> sendDmaapResponse(response, dmaapRequestMessage, HttpStatus.OK));
} catch (Exception e) {
}
}
- private Mono<String> handleAgentCallError(Throwable t, DmaapRequestMessage dmaapRequestMessage) {
+ private Mono<String> handleAgentCallError(Throwable t, String origianalMessage,
+ DmaapRequestMessage dmaapRequestMessage) {
logger.debug("Agent call failed: {}", t.getMessage());
- return sendDmaapResponse(t.toString(), dmaapRequestMessage, HttpStatus.NOT_FOUND) //
- .flatMap(notUsed -> Mono.empty());
+ if (t instanceof ServiceException) {
+ String errorMessage = prepareBadOperationErrorMessage(t, origianalMessage);
+ return sendDmaapResponse(errorMessage, dmaapRequestMessage, HttpStatus.NOT_FOUND) //
+ .flatMap(notUsed -> Mono.empty());
+ } else {
+ return sendDmaapResponse(t.toString(), dmaapRequestMessage, HttpStatus.NOT_FOUND) //
+ .flatMap(notUsed -> Mono.empty());
+ }
+ }
+
+ private String prepareBadOperationErrorMessage(Throwable t, String origianalMessage) {
+ String badOperation = origianalMessage.substring(origianalMessage.indexOf("operation\":\"") + 12,
+ origianalMessage.indexOf(",\"url\":"));
+ String errorMessage = t.getMessage().replace("null", badOperation);
+ return errorMessage;
}
private Mono<String> invokePolicyAgent(DmaapRequestMessage dmaapRequestMessage) {
DmaapRequestMessage.Operation operation = dmaapRequestMessage.operation();
+ if (operation == null) {
+ return Mono.error(new ServiceException("Not implemented operation: " + operation));
+ }
Mono<String> result = null;
String uri = dmaapRequestMessage.url();
- if (operation == Operation.DELETE) {
- result = agentClient.delete(uri);
- } else if (operation == Operation.GET) {
- result = agentClient.get(uri);
- } else if (operation == Operation.PUT) {
- result = agentClient.put(uri, payload(dmaapRequestMessage));
- } else if (operation == Operation.POST) {
- result = agentClient.post(uri, payload(dmaapRequestMessage));
- } else {
- return Mono.error(new Exception("Not implemented operation: " + operation));
+ switch (operation) {
+ case DELETE:
+ result = agentClient.delete(uri);
+ break;
+ case GET:
+ result = agentClient.get(uri);
+ break;
+ case PUT:
+ result = agentClient.put(uri, payload(dmaapRequestMessage));
+ break;
+ case POST:
+ result = agentClient.post(uri, payload(dmaapRequestMessage));
+ break;
+ default:
+ // Nothing, can never get here.
}
return result;
}
}
private Mono<String> handleResponseCallError(Throwable t) {
- logger.debug("Failed to respond: {}", t.getMessage());
+ logger.debug("Failed to send respons to DMaaP: {}", t.getMessage());
return Mono.empty();
}
package org.oransc.policyagent.dmaap;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.read.ListAppender;
+
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonObject;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
import org.onap.dmaap.mr.client.MRBatchingPublisher;
import org.onap.dmaap.mr.client.response.MRPublisherResponse;
import org.oransc.policyagent.clients.AsyncRestClient;
import org.oransc.policyagent.dmaap.DmaapRequestMessage.Operation;
import org.oransc.policyagent.repository.ImmutablePolicyType;
import org.oransc.policyagent.repository.PolicyType;
+import org.oransc.policyagent.utils.LoggingUtils;
+import org.springframework.http.HttpStatus;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
assertTrue(parsedMessage.payload().isPresent());
}
+ @Test
+ public void unparseableMessage_thenWarning() {
+ final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DmaapMessageHandler.class);
+
+ testedObject.handleDmaapMsg("bad message");
+
+ assertThat(logAppender.list.get(0).getLevel()).isEqualTo(Level.WARN);
+ assertThat(logAppender.list.toString().contains("handleDmaapMsg failure ")).isTrue();
+ }
+
@Test
public void successfulDelete() throws IOException {
doReturn(Mono.just("OK")).when(agentClient).delete(anyString());
.expectNext("OK") //
.verifyComplete(); //
- verify(agentClient, times(1)).delete(URL);
+ verify(agentClient).delete(URL);
verifyNoMoreInteractions(agentClient);
- verify(dmaapClient, times(1)).send(anyString());
- verify(dmaapClient, times(1)).sendBatchWithResponse();
+ verify(dmaapClient).send(anyString());
+ verify(dmaapClient).sendBatchWithResponse();
verifyNoMoreInteractions(dmaapClient);
}
.expectNext("OK") //
.verifyComplete(); //
- verify(agentClient, times(1)).get(URL);
+ verify(agentClient).get(URL);
verifyNoMoreInteractions(agentClient);
- verify(dmaapClient, times(1)).send(anyString());
- verify(dmaapClient, times(1)).sendBatchWithResponse();
+ verify(dmaapClient).send(anyString());
+ verify(dmaapClient).sendBatchWithResponse();
verifyNoMoreInteractions(dmaapClient);
}
.expectNext("OK") //
.verifyComplete(); //
- verify(agentClient, times(1)).put(URL, payloadAsString());
+ verify(agentClient).put(URL, payloadAsString());
verifyNoMoreInteractions(agentClient);
- verify(dmaapClient, times(1)).send(anyString());
- verify(dmaapClient, times(1)).sendBatchWithResponse();
+ verify(dmaapClient).send(anyString());
+ verify(dmaapClient).sendBatchWithResponse();
verifyNoMoreInteractions(dmaapClient);
}
.expectNext("OK") //
.verifyComplete(); //
- verify(agentClient, times(1)).post(URL, payloadAsString());
+ verify(agentClient).post(URL, payloadAsString());
verifyNoMoreInteractions(agentClient);
- verify(dmaapClient, times(1)).send(anyString());
- verify(dmaapClient, times(1)).sendBatchWithResponse();
+ verify(dmaapClient).send(anyString());
+ verify(dmaapClient).sendBatchWithResponse();
verifyNoMoreInteractions(dmaapClient);
}
@Test
- public void errorCase() throws IOException {
- doReturn(Mono.error(new Exception("Refused"))).when(agentClient).put(anyString(), any());
+ public void exceptionWhenCallingPolicyAgent_thenNotFoundResponse() throws IOException {
+ String errorCause = "Refused";
+ doReturn(Mono.error(new Exception(errorCause))).when(agentClient).put(anyString(), any());
doReturn(1).when(dmaapClient).send(anyString());
doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse();
+
StepVerifier //
.create(testedObject.createTask(dmaapInputMessage(Operation.PUT))) //
.expectSubscription() //
.verifyComplete(); //
- verify(agentClient, times(1)).put(anyString(), anyString());
+ verify(agentClient).put(anyString(), anyString());
verifyNoMoreInteractions(agentClient);
- // Error response
- verify(dmaapClient, times(1)).send(anyString());
- verify(dmaapClient, times(1)).sendBatchWithResponse();
+ ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
+ verify(dmaapClient).send(captor.capture());
+ String actualMessage = captor.getValue();
+ assertThat(actualMessage.contains(HttpStatus.NOT_FOUND + "\",\"message\":\"java.lang.Exception: " + errorCause))
+ .isTrue();
+
+ verify(dmaapClient).sendBatchWithResponse();
+ verifyNoMoreInteractions(dmaapClient);
+ }
+
+ @Test
+ public void unsupportedOperationInMessage_thenNotFoundResponseWithNotImplementedOperation() throws Exception {
+ String message = dmaapInputMessage(Operation.PUT).toString();
+ String badOperation = "BAD";
+ message = message.replace(Operation.PUT.toString(), badOperation);
+
+ testedObject.handleDmaapMsg(message);
+
+ ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
+ verify(dmaapClient).send(captor.capture());
+ String actualMessage = captor.getValue();
+ assertThat(actualMessage
+ .contains(HttpStatus.NOT_FOUND + "\",\"message\":\"Not implemented operation: " + badOperation)).isTrue();
+
+ verify(dmaapClient).sendBatchWithResponse();
verifyNoMoreInteractions(dmaapClient);
}
+ @Test
+ public void putWithoutPayload_thenNotFoundResponseWithWarning() throws Exception {
+ String message = dmaapInputMessage(Operation.PUT).toString();
+ message = message.replace(",\"payload\":{\"name\":\"name\",\"schema\":\"schema\"}", "");
+
+ final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(DmaapMessageHandler.class);
+
+ testedObject.handleDmaapMsg(message);
+
+ assertThat(logAppender.list.get(0).getLevel()).isEqualTo(Level.WARN);
+ assertThat(logAppender.list.toString().contains("Expected payload in message from DMAAP: ")).isTrue();
+ }
}