public class DmaapMessageHandler {
private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandler.class);
private static Gson gson = new GsonBuilder() //
- .create(); //
+ .create(); //
private final AsyncRestClient dmaapClient;
private final AsyncRestClient agentClient;
try {
DmaapRequestMessage dmaapRequestMessage = gson.fromJson(msg, ImmutableDmaapRequestMessage.class);
return this.invokePolicyAgent(dmaapRequestMessage) //
- .onErrorResume(t -> handleAgentCallError(t, dmaapRequestMessage)) //
- .flatMap(response -> sendDmaapResponse(response.getBody(), dmaapRequestMessage,
- response.getStatusCode()));
+ .onErrorResume(t -> handleAgentCallError(t, dmaapRequestMessage)) //
+ .flatMap(
+ response -> sendDmaapResponse(response.getBody(), dmaapRequestMessage, response.getStatusCode()));
} catch (Exception e) {
String errorMsg = "Received unparsable message from DMAAP: \"" + msg + "\", reason: " + e.getMessage();
return Mono.error(new ServiceException(errorMsg)); // Cannot make any response
}
private Mono<ResponseEntity<String>> handleAgentCallError(Throwable error,
- DmaapRequestMessage dmaapRequestMessage) {
+ DmaapRequestMessage dmaapRequestMessage) {
logger.debug("Agent call failed: {}", error.getMessage());
HttpStatus status = HttpStatus.INTERNAL_SERVER_ERROR;
String errorMessage = error.getMessage();
logger.warn("Unexpected exception ", error);
}
return sendDmaapResponse(errorMessage, dmaapRequestMessage, status) //
- .flatMap(notUsed -> Mono.empty());
+ .flatMap(notUsed -> Mono.empty());
}
private Mono<ResponseEntity<String>> invokePolicyAgent(DmaapRequestMessage dmaapRequestMessage) {
}
private Mono<String> sendDmaapResponse(String response, DmaapRequestMessage dmaapRequestMessage,
- HttpStatus status) {
+ HttpStatus status) {
return createDmaapResponseMessage(dmaapRequestMessage, response, status) //
- .flatMap(this::sendToDmaap) //
- .onErrorResume(this::handleResponseCallError);
+ .flatMap(this::sendToDmaap) //
+ .onErrorResume(this::handleResponseCallError);
}
private Mono<String> sendToDmaap(String body) {
}
private Mono<String> createDmaapResponseMessage(DmaapRequestMessage dmaapRequestMessage, String response,
- HttpStatus status) {
+ HttpStatus status) {
DmaapResponseMessage dmaapResponseMessage = ImmutableDmaapResponseMessage.builder() //
- .status(status.toString()) //
- .message(response == null ? "" : response) //
- .type("response") //
- .correlationId(dmaapRequestMessage.correlationId() == null ? "" : dmaapRequestMessage.correlationId()) //
- .originatorId(dmaapRequestMessage.originatorId() == null ? "" : dmaapRequestMessage.originatorId()) //
- .requestId(dmaapRequestMessage.requestId() == null ? "" : dmaapRequestMessage.requestId()) //
- .timestamp(dmaapRequestMessage.timestamp() == null ? "" : dmaapRequestMessage.timestamp()) //
- .build();
+ .status(status.toString()) //
+ .message(response == null ? "" : response) //
+ .type("response") //
+ .correlationId(dmaapRequestMessage.correlationId() == null ? "" : dmaapRequestMessage.correlationId()) //
+ .originatorId(dmaapRequestMessage.originatorId() == null ? "" : dmaapRequestMessage.originatorId()) //
+ .requestId(dmaapRequestMessage.requestId() == null ? "" : dmaapRequestMessage.requestId()) //
+ .timestamp(dmaapRequestMessage.timestamp() == null ? "" : dmaapRequestMessage.timestamp()) //
+ .build();
String str = gson.toJson(dmaapResponseMessage);
return Mono.just(str);
for (Ric ric : this.rics.getRics()) {
ric.getLock().lockBlocking(LockType.EXCLUSIVE);
ric.getLock().unlockBlocking();
- assertThat(ric.getLock().getLockCounter()).isEqualTo(0);
+ assertThat(ric.getLock().getLockCounter()).isZero();
assertThat(ric.getState()).isEqualTo(Ric.RicState.AVAILABLE);
}
}
// This tests also validation of trusted certs restClient(true)
rsp = restClient(true).get(url).block();
- assertThat(rsp).contains("ric2");
- assertThat(rsp).doesNotContain("ric1");
- assertThat(rsp).contains("AVAILABLE");
+ assertThat(rsp).contains("ric2") //
+ .doesNotContain("ric1") //
+ .contains("AVAILABLE");
// All RICs
rsp = restClient().get("/rics").block();
- assertThat(rsp).contains("ric2");
- assertThat(rsp).contains("ric1");
+ assertThat(rsp).contains("ric2") //
+ .contains("ric1");
// Non existing policy type
url = "/rics?policyType=XXXX";
assertThat(ricPolicy.json()).isEqualTo(policy.json());
// Both types should be in the agent storage after the synch
- assertThat(ric1.getSupportedPolicyTypes().size()).isEqualTo(2);
- assertThat(ric2.getSupportedPolicyTypes().size()).isEqualTo(2);
+ assertThat(ric1.getSupportedPolicyTypes()).hasSize(2);
+ assertThat(ric2.getSupportedPolicyTypes()).hasSize(2);
}
@Test
assertThat(policy.id()).isEqualTo(policyInstanceId);
assertThat(policy.ownerServiceName()).isEqualTo(serviceName);
assertThat(policy.ric().name()).isEqualTo("ric1");
- assertThat(policy.isTransient()).isEqualTo(true);
+ assertThat(policy.isTransient()).isTrue();
// Put a non transient policy
url = putPolicyUrl(serviceName, ricName, policyTypeName, policyInstanceId);
restClient().put(url, policyBody).block();
policy = policies.getPolicy(policyInstanceId);
- assertThat(policy.isTransient()).isEqualTo(false);
+ assertThat(policy.isTransient()).isFalse();
url = "/policies";
String rsp = restClient().get(url).block();
@Test
/**
- * Test that HttpStatus and body from failing REST call to A1 is passed on to
- * the caller.
+ * Test that HttpStatus and body from failing REST call to A1 is passed on to the caller.
*
* @throws ServiceException
*/
String rsp = restClient().get("/policies").block();
List<PolicyInfo> info = parseList(rsp, PolicyInfo.class);
- assertThat(info.size()).isEqualTo(1);
+ assertThat(info).hasSize(1);
PolicyInfo policyInfo = info.get(0);
assertThat(policyInfo.id).isEqualTo("id1");
assertThat(policyInfo.type).isEmpty();
ResponseEntity<String> entity = restClient().deleteForEntity(url).block();
assertThat(entity.getStatusCode()).isEqualTo(HttpStatus.NO_CONTENT);
- assertThat(policies.size()).isEqualTo(0);
+ assertThat(policies.size()).isZero();
// Delete a non existing policy
testErrorCode(restClient().get(url), HttpStatus.NOT_FOUND);
String url = "/policy_schemas";
String rsp = this.restClient().get(url).block();
- assertThat(rsp).contains("type1");
- assertThat(rsp).contains("[{\"title\":\"type2\"}");
+ assertThat(rsp).contains("type1") //
+ .contains("[{\"title\":\"type2\"}");
List<String> info = parseSchemas(rsp);
- assertThat(info.size()).isEqualTo(2);
+ assertThat(info).hasSize(2);
url = "/policy_schemas?ric=ric1";
rsp = restClient().get(url).block();
assertThat(rsp).contains("type1");
info = parseSchemas(rsp);
- assertThat(info.size()).isEqualTo(1);
+ assertThat(info).hasSize(1);
// Get schema for non existing RIC
url = "/policy_schemas?ric=ric1XXX";
String url = "/policy_schema?id=type1";
String rsp = restClient().get(url).block();
logger.info(rsp);
- assertThat(rsp).contains("type1");
- assertThat(rsp).contains("title");
+ assertThat(rsp).contains("type1") //
+ .contains("title");
// Get non existing schema
url = "/policy_schema?id=type1XX";
String rsp = restClient().get(url).block();
logger.info(rsp);
List<PolicyInfo> info = parseList(rsp, PolicyInfo.class);
- assertThat(info).size().isEqualTo(1);
+ assertThat(info).hasSize(1);
PolicyInfo policyInfo = info.get(0);
assert (policyInfo.validate());
assertThat(policyInfo.id).isEqualTo("id1");
String url = "/policies?type=type1";
String rsp = restClient().get(url).block();
logger.info(rsp);
- assertThat(rsp).contains("id1");
- assertThat(rsp).contains("id2");
- assertThat(rsp.contains("id3")).isFalse();
+ assertThat(rsp).contains("id1") //
+ .contains("id2") //
+ .doesNotContain("id3");
url = "/policies?type=type1&service=service2";
rsp = restClient().get(url).block();
logger.info(rsp);
- assertThat(rsp.contains("id1")).isFalse();
- assertThat(rsp).contains("id2");
- assertThat(rsp.contains("id3")).isFalse();
+ assertThat(rsp).doesNotContain("id1") //
+ .contains("id2") //
+ .doesNotContain("id3");
// Test get policies for non existing type
url = "/policies?type=type1XXX";
String url = "/policy_ids?type=type1";
String rsp = restClient().get(url).block();
logger.info(rsp);
- assertThat(rsp).contains("id1");
- assertThat(rsp).contains("id2");
- assertThat(rsp.contains("id3")).isFalse();
+ assertThat(rsp).contains("id1") //
+ .contains("id2") //
+ .doesNotContain("id3");
url = "/policy_ids?type=type1&service=service1&ric=ric1";
rsp = restClient().get(url).block();
String url = "/services?name=name";
String rsp = restClient().get(url).block();
List<ServiceStatus> info = parseList(rsp, ServiceStatus.class);
- assertThat(info.size()).isEqualTo(1);
+ assertThat(info).hasSize(1);
ServiceStatus status = info.iterator().next();
assertThat(status.keepAliveIntervalSeconds).isEqualTo(0);
assertThat(status.serviceName).isEqualTo(serviceName);
when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response);
final ListAppender<ILoggingEvent> logAppender =
- LoggingUtils.getLogListAppender(DmaapMessageConsumer.class, WARN);
+ LoggingUtils.getLogListAppender(DmaapMessageConsumer.class, WARN);
messageConsumerUnderTest.start().join();
assertThat(logAppender.list.get(0).getFormattedMessage())
- .isEqualTo("Cannot fetch because of Error respons: 400 BAD_REQUEST Error");
+ .isEqualTo("Cannot fetch because of Error respons: 400 BAD_REQUEST Error");
verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
}
messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
String message = "{\"apiVersion\":\"1.0\"," //
- + "\"operation\":\"GET\"," //
- + "\"correlationId\":\"1592341013115594000\"," //
- + "\"originatorId\":\"849e6c6b420\"," //
- + "\"payload\":{}," //
- + "\"requestId\":\"23343221\", " //
- + "\"target\":\"policy-agent\"," //
- + "\"timestamp\":\"2020-06-16 20:56:53.115665\"," //
- + "\"type\":\"request\"," //
- + "\"url\":\"/rics\"}";
+ + "\"operation\":\"GET\"," //
+ + "\"correlationId\":\"1592341013115594000\"," //
+ + "\"originatorId\":\"849e6c6b420\"," //
+ + "\"payload\":{}," //
+ + "\"requestId\":\"23343221\", " //
+ + "\"target\":\"policy-agent\"," //
+ + "\"timestamp\":\"2020-06-16 20:56:53.115665\"," //
+ + "\"type\":\"request\"," //
+ + "\"url\":\"/rics\"}";
String messages = "[" + message + "]";
doReturn(false, true).when(messageConsumerUnderTest).isStopped();
import com.google.gson.JsonObject;
import java.io.IOException;
+import java.nio.charset.Charset;
import java.util.Optional;
import org.junit.jupiter.api.BeforeEach;
import org.oransc.policyagent.utils.LoggingUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;
private final AsyncRestClient agentClient = mock(AsyncRestClient.class);
private DmaapMessageHandler testedObject;
private static Gson gson = new GsonBuilder() //
- .create(); //
+ .create(); //
@BeforeEach
private void setUp() throws Exception {
DmaapRequestMessage dmaapRequestMessage(Operation operation) {
Optional<JsonObject> payload =
- ((operation == Operation.PUT || operation == Operation.POST) ? Optional.of(payloadAsJson())
- : Optional.empty());
+ ((operation == Operation.PUT || operation == Operation.POST) ? Optional.of(payloadAsJson())
+ : Optional.empty());
return ImmutableDmaapRequestMessage.builder() //
- .apiVersion("apiVersion") //
- .correlationId("correlationId") //
- .operation(operation) //
- .originatorId("originatorId") //
- .payload(payload) //
- .requestId("requestId") //
- .target("target") //
- .timestamp("timestamp") //
- .url(URL) //
- .build();
+ .apiVersion("apiVersion") //
+ .correlationId("correlationId") //
+ .operation(operation) //
+ .originatorId("originatorId") //
+ .payload(payload) //
+ .requestId("requestId") //
+ .target("target") //
+ .timestamp("timestamp") //
+ .url(URL) //
+ .build();
}
private String dmaapInputMessage(Operation operation) {
@Test
void unparseableMessage_thenWarning() {
final ListAppender<ILoggingEvent> logAppender =
- LoggingUtils.getLogListAppender(DmaapMessageHandler.class, WARN);
+ LoggingUtils.getLogListAppender(DmaapMessageHandler.class, WARN);
String msg = "bad message";
testedObject.handleDmaapMsg(msg);
assertThat(logAppender.list.get(0).getFormattedMessage()).startsWith(
- "handleDmaapMsg failure org.oransc.policyagent.exceptions.ServiceException: Received unparsable "
- + "message from DMAAP: \"" + msg + "\", reason: ");
+ "handleDmaapMsg failure org.oransc.policyagent.exceptions.ServiceException: Received unparsable "
+ + "message from DMAAP: \"" + msg + "\", reason: ");
}
@Test
String message = dmaapInputMessage(Operation.DELETE);
StepVerifier //
- .create(testedObject.createTask(message)) //
- .expectSubscription() //
- .expectNext("OK") //
- .verifyComplete(); //
+ .create(testedObject.createTask(message)) //
+ .expectSubscription() //
+ .expectNext("OK") //
+ .verifyComplete(); //
verify(agentClient).deleteForEntity(URL);
verifyNoMoreInteractions(agentClient);
doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
StepVerifier //
- .create(testedObject.createTask(dmaapInputMessage(Operation.GET))) //
- .expectSubscription() //
- .expectNext("OK") //
- .verifyComplete(); //
+ .create(testedObject.createTask(dmaapInputMessage(Operation.GET))) //
+ .expectSubscription() //
+ .expectNext("OK") //
+ .verifyComplete(); //
verify(agentClient).getForEntity(URL);
verifyNoMoreInteractions(agentClient);
verifyNoMoreInteractions(dmaapClient);
}
+ @Test
+ void exceptionFromAgentWhenGet_thenPostError() throws IOException {
+ String errorBody = "Unavailable";
+ WebClientResponseException webClientResponseException = new WebClientResponseException(
+ HttpStatus.SERVICE_UNAVAILABLE.value(), "", (HttpHeaders) null, errorBody.getBytes(), (Charset) null);
+ doReturn(Mono.error(webClientResponseException)).when(agentClient).getForEntity(anyString());
+ doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
+
+ StepVerifier //
+ .create(testedObject.createTask(dmaapInputMessage(Operation.GET))) //
+ .expectSubscription() //
+ .verifyComplete(); //
+
+ ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
+ verify(dmaapClient).post(anyString(), captor.capture());
+ String actualMessage = captor.getValue();
+ assertThat(actualMessage).contains(HttpStatus.SERVICE_UNAVAILABLE.toString()) //
+ .contains(errorBody);
+ }
+
@Test
void successfulPut() throws IOException {
doReturn(okResponse()).when(agentClient).putForEntity(anyString(), anyString());
doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
StepVerifier //
- .create(testedObject.createTask(dmaapInputMessage(Operation.PUT))) //
- .expectSubscription() //
- .expectNext("OK") //
- .verifyComplete(); //
+ .create(testedObject.createTask(dmaapInputMessage(Operation.PUT))) //
+ .expectSubscription() //
+ .expectNext("OK") //
+ .verifyComplete(); //
verify(agentClient).putForEntity(URL, payloadAsString());
verifyNoMoreInteractions(agentClient);
doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString());
StepVerifier //
- .create(testedObject.createTask(dmaapInputMessage(Operation.POST))) //
- .expectSubscription() //
- .expectNext("OK") //
- .verifyComplete(); //
+ .create(testedObject.createTask(dmaapInputMessage(Operation.POST))) //
+ .expectSubscription() //
+ .expectNext("OK") //
+ .verifyComplete(); //
verify(agentClient).postForEntity(URL, payloadAsString());
verifyNoMoreInteractions(agentClient);
verify(dmaapClient).post(anyString(), captor.capture());
String actualMessage = captor.getValue();
assertThat(actualMessage).as("Message \"%s\" sent to DMaaP contains %s", actualMessage, HttpStatus.BAD_GATEWAY)
- .contains(HttpStatus.BAD_GATEWAY.toString());
+ .contains(HttpStatus.BAD_GATEWAY.toString());
verifyNoMoreInteractions(dmaapClient);
}
ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
verify(dmaapClient).post(anyString(), captor.capture());
String actualMessage = captor.getValue();
- assertThat(actualMessage).contains("Not implemented operation");
- assertThat(actualMessage).contains("BAD_REQUEST");
+ assertThat(actualMessage).contains("Not implemented operation") //
+ .contains("BAD_REQUEST");
}
@Test
message = message.replace(",\"payload\":{\"name\":\"name\",\"schema\":\"schema\"}", "");
final ListAppender<ILoggingEvent> logAppender =
- LoggingUtils.getLogListAppender(DmaapMessageHandler.class, WARN);
+ LoggingUtils.getLogListAppender(DmaapMessageHandler.class, WARN);
testedObject.handleDmaapMsg(message);
assertThat(logAppender.list.get(0).getFormattedMessage())
- .startsWith("Expected payload in message from DMAAP: ");
+ .startsWith("Expected payload in message from DMAAP: ");
}
}
lock.lockBlocking(LockType.SHARED);
lock.unlockBlocking();
- assertThat(lock.getLockCounter()).isEqualTo(0);
+ assertThat(lock.getLockCounter()).isZero();
}
@Test
.expectNext(lock) //
.verifyComplete();
- assertThat(lock.getLockCounter()).isEqualTo(0);
+ assertThat(lock.getLockCounter()).isZero();
}
// Then
verify(refreshTaskUnderTest).loadConfigurationFromFile();
- assertThat(appConfig.getRicConfigs().size()).isEqualTo(0);
+ assertThat(appConfig.getRicConfigs()).hasSize(0);
await().until(() -> logAppender.list.size() > 0);
assertThat(logAppender.list.get(0).getFormattedMessage())
.thenCancel() //
.verify();
- assertThat(appConfig.getRicConfigs().size()).isEqualTo(2);
+ assertThat(appConfig.getRicConfigs()).hasSize(2);
assertThat(appConfig.getRic(RIC_1_NAME).baseUrl()).isEqualTo(newBaseUrl);
String ric2Name = "ric2";
assertThat(appConfig.getRic(ric2Name)).isNotNull();
assertThat(rics.get(RIC_1_NAME).getConfig().baseUrl()).isEqualTo(newBaseUrl);
assertThat(rics.get(ric2Name)).isNotNull();
- assertThat(policies.size()).isEqualTo(0);
+ assertThat(policies.size()).isZero();
}
@Test
for (Ric ric : this.rics.getRics()) {
ric.getLock().lockBlocking(LockType.EXCLUSIVE);
ric.getLock().unlockBlocking();
- assertThat(ric.getLock().getLockCounter()).isEqualTo(0);
+ assertThat(ric.getLock().getLockCounter()).isZero();
}
}
assertThat(policyTypes.size()).isEqualTo(1);
assertThat(policies.size()).isEqualTo(1);
assertThat(RIC_1.getState()).isEqualTo(RicState.SYNCHRONIZING);
- assertThat(RIC_1.getSupportedPolicyTypeNames().size()).isEqualTo(1);
+ assertThat(RIC_1.getSupportedPolicyTypeNames()).hasSize(1);
}
@Test
verifyNoMoreInteractions(restClientMock);
assertThat(policyTypes.size()).isEqualTo(1);
- assertThat(policies.size()).isEqualTo(0);
+ assertThat(policies.size()).isZero();
assertThat(RIC_1.getState()).isEqualTo(RicState.AVAILABLE);
}
assertThat(policyTypes.size()).isEqualTo(1);
assertThat(policyTypes.getType(POLICY_TYPE_1_NAME).schema()).isEqualTo(typeSchema);
- assertThat(policies.size()).isEqualTo(0);
+ assertThat(policies.size()).isZero();
assertThat(RIC_1.getState()).isEqualTo(RicState.AVAILABLE);
}
verify(a1ClientMock).putPolicy(POLICY_1);
verifyNoMoreInteractions(a1ClientMock);
- assertThat(policyTypes.size()).isEqualTo(0);
+ assertThat(policyTypes.size()).isZero();
assertThat(policies.size()).isEqualTo(1); // The transient policy shall be deleted
assertThat(RIC_1.getState()).isEqualTo(RicState.AVAILABLE);
}
verify(a1ClientMock, times(2)).deleteAllPolicies();
verifyNoMoreInteractions(a1ClientMock);
- assertThat(policyTypes.size()).isEqualTo(0);
- assertThat(policies.size()).isEqualTo(0);
+ assertThat(policyTypes.size()).isZero();
+ assertThat(policies.size()).isZero();
assertThat(RIC_1.getState()).isEqualTo(RicState.AVAILABLE);
}
verify(a1ClientMock, times(2)).deleteAllPolicies();
verifyNoMoreInteractions(a1ClientMock);
- assertThat(policyTypes.size()).isEqualTo(0);
- assertThat(policies.size()).isEqualTo(0);
+ assertThat(policyTypes.size()).isZero();
+ assertThat(policies.size()).isZero();
assertThat(RIC_1.getState()).isEqualTo(RicState.UNAVAILABLE);
}
serviceSupervisionUnderTest.checkAllServices().blockLast();
- assertThat(policies.size()).isEqualTo(0);
- assertThat(services.size()).isEqualTo(0);
+ assertThat(policies.size()).isZero();
+ assertThat(services.size()).isZero();
verify(a1ClientMock).deletePolicy(policy);
verifyNoMoreInteractions(a1ClientMock);
serviceSupervisionUnderTest.checkAllServices().blockLast();
- assertThat(policies.size()).isEqualTo(0);
- assertThat(services.size()).isEqualTo(0);
+ assertThat(policies.size()).isZero();
+ assertThat(services.size()).isZero();
ILoggingEvent loggingEvent = logAppender.list.get(0);
assertThat(loggingEvent.getLevel()).isEqualTo(WARN);