+ private Mono<ResponseEntity<String>> okResponse() {
+ ResponseEntity<String> entity = new ResponseEntity<>("OK", HttpStatus.OK);
+ return Mono.just(entity);
+ }
+
+ private Mono<ResponseEntity<String>> notOkResponse() {
+ ResponseEntity<String> entity = new ResponseEntity<>("NOK", HttpStatus.BAD_GATEWAY);
+ return Mono.just(entity);
+ }
+
+ @Test
+ void testMessageParsing() {
+ String message = dmaapInputMessage(Operation.DELETE);
+ logger.info(message);
+ DmaapRequestMessage parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class);
+ assertNotNull(parsedMessage);
+ assertFalse(parsedMessage.payload().isPresent());
+
+ message = dmaapInputMessage(Operation.PUT);
+ logger.info(message);
+ parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class);
+ assertNotNull(parsedMessage);
+ assertTrue(parsedMessage.payload().isPresent());
+ }
+
+ @Test
+ void unparseableMessage_thenWarning() {
+ final ListAppender<ILoggingEvent> logAppender =
+ LoggingUtils.getLogListAppender(DmaapMessageHandler.class, WARN);
+
+ String msg = "bad message";
+ testedObject.handleDmaapMsg(msg);
+
+ assertThat(logAppender.list.get(0).getFormattedMessage()).startsWith(
+ "handleDmaapMsg failure org.oransc.policyagent.exceptions.ServiceException: Received unparsable "
+ + "message from DMAAP: \"" + msg + "\", reason: ");
+ }
+