return result;
}
- protected Iterable<String> fetchAllMessages() throws ServiceException, IOException {
+ protected Iterable<String> fetchAllMessages() throws ServiceException {
String topicUrl = this.applicationConfig.getDmaapConsumerTopicUrl();
AsyncRestClient consumer = getMessageRouterConsumer();
ResponseEntity<String> response = consumer.getForEntity(topicUrl).block();
getDmaapMessageHandler().handleDmaapMsg(msg);
}
- protected DmaapMessageHandler getDmaapMessageHandler() throws IOException {
+ protected DmaapMessageHandler getDmaapMessageHandler() {
if (this.dmaapMessageHandler == null) {
String agentBaseUrl = "http://localhost:" + this.localServerHttpPort;
AsyncRestClient agentClient = new AsyncRestClient(agentBaseUrl);
/**
* The class handles incoming requests from DMAAP.
* <p>
- * That means: invoke a REST call towards this services and to send back a
- * response though DMAAP
+ * That means: invoke a REST call towards this services and to send back a response though DMAAP
*/
public class DmaapMessageHandler {
private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandler.class);
private static Gson gson = new GsonBuilder() //
- .create(); //
+ .create(); //
private final AsyncRestClient dmaapClient;
private final AsyncRestClient agentClient;
try {
DmaapRequestMessage dmaapRequestMessage = gson.fromJson(msg, ImmutableDmaapRequestMessage.class);
return this.invokePolicyAgent(dmaapRequestMessage) //
- .onErrorResume(t -> handleAgentCallError(t, msg, dmaapRequestMessage)) //
- .flatMap(
- response -> sendDmaapResponse(response.getBody(), dmaapRequestMessage, response.getStatusCode()));
+ .onErrorResume(t -> handleAgentCallError(t, dmaapRequestMessage)) //
+ .flatMap(response -> sendDmaapResponse(response.getBody(), dmaapRequestMessage,
+ response.getStatusCode()));
} catch (Exception e) {
String errorMsg = "Received unparsable message from DMAAP: \"" + msg + "\", reason: " + e.getMessage();
return Mono.error(new ServiceException(errorMsg)); // Cannot make any response
}
}
- private Mono<ResponseEntity<String>> handleAgentCallError(Throwable t, String originalMessage,
- DmaapRequestMessage dmaapRequestMessage) {
- logger.debug("Agent call failed: {}", t.getMessage());
+ private Mono<ResponseEntity<String>> handleAgentCallError(Throwable error,
+ DmaapRequestMessage dmaapRequestMessage) {
+ logger.debug("Agent call failed: {}", error.getMessage());
HttpStatus status = HttpStatus.INTERNAL_SERVER_ERROR;
- String errorMessage = t.getMessage();
- if (t instanceof WebClientResponseException) {
- WebClientResponseException exception = (WebClientResponseException) t;
+ String errorMessage = error.getMessage();
+ if (error instanceof WebClientResponseException) {
+ WebClientResponseException exception = (WebClientResponseException) error;
status = exception.getStatusCode();
errorMessage = exception.getResponseBodyAsString();
- } else if (t instanceof ServiceException) {
+ } else if (error instanceof ServiceException) {
status = HttpStatus.BAD_REQUEST;
- errorMessage = prepareBadOperationErrorMessage(t, originalMessage);
- } else if (!(t instanceof WebClientException)) {
- logger.warn("Unexpected exception ", t);
+ errorMessage = error.getMessage();
+ } else if (!(error instanceof WebClientException)) {
+ logger.warn("Unexpected exception ", error);
}
return sendDmaapResponse(errorMessage, dmaapRequestMessage, status) //
- .flatMap(notUsed -> Mono.empty());
- }
-
- private String prepareBadOperationErrorMessage(Throwable t, String originalMessage) {
- return t.getMessage();
+ .flatMap(notUsed -> Mono.empty());
}
private Mono<ResponseEntity<String>> invokePolicyAgent(DmaapRequestMessage dmaapRequestMessage) {
}
private Mono<String> sendDmaapResponse(String response, DmaapRequestMessage dmaapRequestMessage,
- HttpStatus status) {
+ HttpStatus status) {
return createDmaapResponseMessage(dmaapRequestMessage, response, status) //
- .flatMap(this::sendToDmaap) //
- .onErrorResume(this::handleResponseCallError);
+ .flatMap(this::sendToDmaap) //
+ .onErrorResume(this::handleResponseCallError);
}
private Mono<String> sendToDmaap(String body) {
}
private Mono<String> createDmaapResponseMessage(DmaapRequestMessage dmaapRequestMessage, String response,
- HttpStatus status) {
+ HttpStatus status) {
DmaapResponseMessage dmaapResponseMessage = ImmutableDmaapResponseMessage.builder() //
- .status(status.toString()) //
- .message(response == null ? "" : response) //
- .type("response") //
- .correlationId(dmaapRequestMessage.correlationId() == null ? "" : dmaapRequestMessage.correlationId()) //
- .originatorId(dmaapRequestMessage.originatorId() == null ? "" : dmaapRequestMessage.originatorId()) //
- .requestId(dmaapRequestMessage.requestId() == null ? "" : dmaapRequestMessage.requestId()) //
- .timestamp(dmaapRequestMessage.timestamp() == null ? "" : dmaapRequestMessage.timestamp()) //
- .build();
+ .status(status.toString()) //
+ .message(response == null ? "" : response) //
+ .type("response") //
+ .correlationId(dmaapRequestMessage.correlationId() == null ? "" : dmaapRequestMessage.correlationId()) //
+ .originatorId(dmaapRequestMessage.originatorId() == null ? "" : dmaapRequestMessage.originatorId()) //
+ .requestId(dmaapRequestMessage.requestId() == null ? "" : dmaapRequestMessage.requestId()) //
+ .timestamp(dmaapRequestMessage.timestamp() == null ? "" : dmaapRequestMessage.timestamp()) //
+ .build();
String str = gson.toJson(dmaapResponseMessage);
return Mono.just(str);
url = "/policies";
String rsp = restClient().get(url).block();
- assertThat(rsp.contains(policyInstanceId)).as("Response contains policy instance ID.").isTrue();
+ assertThat(rsp).contains(policyInstanceId).as("Response contains policy instance ID.");
url = "/policy?id=" + policyInstanceId;
rsp = restClient().get(url).block();
assertThat(info.size()).isEqualTo(1);
PolicyInfo policyInfo = info.get(0);
assertThat(policyInfo.id).isEqualTo("id1");
- assertThat(policyInfo.type).isEqualTo("");
+ assertThat(policyInfo.type).isEmpty();
}
@Test
// GET (all)
url = "/services";
rsp = restClient().get(url).block();
- assertThat(rsp.contains(serviceName)).as("Response contains service name").isTrue();
+ assertThat(rsp).contains(serviceName).as("Response contains service name");
logger.info(rsp);
// Keep alive
when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response);
final ListAppender<ILoggingEvent> logAppender =
- LoggingUtils.getLogListAppender(DmaapMessageConsumer.class, WARN);
+ LoggingUtils.getLogListAppender(DmaapMessageConsumer.class, WARN);
messageConsumerUnderTest.start().join();
assertThat(logAppender.list.get(0).getFormattedMessage())
- .isEqualTo("Cannot fetch because of Error respons: 400 BAD_REQUEST Error");
+ .isEqualTo("Cannot fetch because of Error respons: 400 BAD_REQUEST Error");
verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES);
}
setUpMrConfig();
messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock));
- String message =
- "{\"apiVersion\":\"1.0\",\"operation\":\"GET\",\"correlationId\":\"1592341013115594000\",\"originatorId\":\"849e6c6b420\",\"payload\":{},\"requestId\":\"23343221\", \"target\":\"policy-agent\",\"timestamp\":\"2020-06-16 20:56:53.115665\",\"type\":\"request\",\"url\":\"/rics\"}";
+ String message = "{\"apiVersion\":\"1.0\"," //
+ + "\"operation\":\"GET\"," //
+ + "\"correlationId\":\"1592341013115594000\"," //
+ + "\"originatorId\":\"849e6c6b420\"," //
+ + "\"payload\":{}," //
+ + "\"requestId\":\"23343221\", " //
+ + "\"target\":\"policy-agent\"," //
+ + "\"timestamp\":\"2020-06-16 20:56:53.115665\"," //
+ + "\"type\":\"request\"," //
+ + "\"url\":\"/rics\"}";
String messages = "[" + message + "]";
doReturn(false, true).when(messageConsumerUnderTest).isStopped();
ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
verify(messageHandlerMock).handleDmaapMsg(captor.capture());
String messageAfterJsonParsing = captor.getValue();
- assertThat(messageAfterJsonParsing.contains("apiVersion")).isTrue();
+ assertThat(messageAfterJsonParsing).contains("apiVersion");
verifyNoMoreInteractions(messageHandlerMock);
}
ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
verify(dmaapClient).post(anyString(), captor.capture());
String actualMessage = captor.getValue();
- assertThat(actualMessage.contains(HttpStatus.BAD_GATEWAY.toString()))
- .as("Message \"%s\" sent to DMaaP contains %s", actualMessage, HttpStatus.BAD_GATEWAY) //
- .isTrue();
+ assertThat(actualMessage).contains(HttpStatus.BAD_GATEWAY.toString())
+ .as("Message \"%s\" sent to DMaaP contains %s", actualMessage, HttpStatus.BAD_GATEWAY);
verifyNoMoreInteractions(dmaapClient);
}
ArgumentCaptor<String> captor = ArgumentCaptor.forClass(String.class);
verify(dmaapClient).post(anyString(), captor.capture());
String actualMessage = captor.getValue();
- assertThat(actualMessage.contains("Not implemented operation")).isTrue();
- assertThat(actualMessage.contains("BAD_REQUEST")).isTrue();
+ assertThat(actualMessage).contains("Not implemented operation");
+ assertThat(actualMessage).contains("BAD_REQUEST");
}
@Test
package org.o_ran_sc.nonrtric.sdnc_a1.northbound.provider;
-import com.google.common.util.concurrent.CheckedFuture;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.o_ran_sc.nonrtric.sdnc_a1.northbound.restadapter.RestAdapter;
import org.o_ran_sc.nonrtric.sdnc_a1.northbound.restadapter.RestAdapterImpl;
import org.opendaylight.controller.md.sal.binding.api.DataBroker;
import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService;
-import org.opendaylight.controller.md.sal.binding.api.WriteTransaction;
import org.opendaylight.controller.sal.binding.api.RpcProviderRegistry;
import org.opendaylight.yang.gen.v1.org.o_ran_sc.nonrtric.sdnc_a1.northbound.a1.adapter.rev200122.A1ADAPTERAPIService;
import org.opendaylight.yang.gen.v1.org.o_ran_sc.nonrtric.sdnc_a1.northbound.a1.adapter.rev200122.DeleteA1PolicyInput;