import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
+/**
+ * The class fetched incoming requests from DMAAP on regular intervals. Each
+ * received request is proceesed by DmaapMessageHandler.
+ */
@Component
public class DmaapMessageConsumer implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumer.class);
- @SuppressWarnings("squid:S00116") // To avoid warning about DMAAP abbreviation.
- final Duration TIME_BETWEEN_DMAAP_POLLS = Duration.ofSeconds(10);
+ private static final Duration TIME_BETWEEN_DMAAP_POLLS = Duration.ofSeconds(10);
private final ApplicationConfig applicationConfig;
}
}
} catch (Exception e) {
- logger.warn("{}: cannot fetch because of ", this, e.getMessage(), e);
+ logger.warn("{}: cannot fetch because of {}", this, e.getMessage());
sleep(TIME_BETWEEN_DMAAP_POLLS);
}
}
private Iterable<String> fetchAllMessages() throws ServiceException, IOException {
Properties dmaapConsumerProperties = this.applicationConfig.getDmaapConsumerConfig();
- MRConsumer consumer = MRClientFactory.createConsumer(dmaapConsumerProperties);
+ MRConsumer consumer = getMessageRouterConsumer(dmaapConsumerProperties);
MRConsumerResponse response = consumer.fetchWithReturnConsumerResponse();
if (response == null || !"200".equals(response.getResponseCode())) {
- throw new ServiceException("DMaaP NULL response received");
+ String errorMessage = "DMaaP NULL response received";
+ if (response != null) {
+ errorMessage = "Error respons " + response.getResponseCode() + " " + response.getResponseMessage()
+ + " from DMaaP.";
+ }
+ throw new ServiceException(errorMessage);
} else {
logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(), response.getResponseMessage());
return response.getActualMessages();
private void processMsg(String msg) throws IOException {
logger.debug("Message Reveived from DMAAP : {}", msg);
- createDmaapMessageHandler().handleDmaapMsg(msg);
+ getDmaapMessageHandler().handleDmaapMsg(msg);
}
- private DmaapMessageHandler createDmaapMessageHandler() throws IOException {
+ private DmaapMessageHandler getDmaapMessageHandler() throws IOException {
String agentBaseUrl = "http://localhost:" + this.localServerPort;
- AsyncRestClient agentClient = new AsyncRestClient(agentBaseUrl);
+ AsyncRestClient agentClient = createRestClient(agentBaseUrl);
Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig();
- MRBatchingPublisher producer = MRClientFactory.createBatchingPublisher(dmaapPublisherProperties);
+ MRBatchingPublisher producer = getMessageRouterPublisher(dmaapPublisherProperties);
- return new DmaapMessageHandler(producer, agentClient);
+ return createDmaapMessageHandler(agentClient, producer);
}
- private boolean sleep(Duration duration) {
+ boolean sleep(Duration duration) {
try {
Thread.sleep(duration.toMillis());
return true;
return false;
}
}
+
+ MRConsumer getMessageRouterConsumer(Properties dmaapConsumerProperties) throws IOException {
+ return MRClientFactory.createConsumer(dmaapConsumerProperties);
+ }
+
+ DmaapMessageHandler createDmaapMessageHandler(AsyncRestClient agentClient, MRBatchingPublisher producer) {
+ return new DmaapMessageHandler(producer, agentClient);
+ }
+
+ AsyncRestClient createRestClient(String agentBaseUrl) {
+ return new AsyncRestClient(agentBaseUrl);
+ }
+
+ MRBatchingPublisher getMessageRouterPublisher(Properties dmaapPublisherProperties) throws IOException {
+ return MRClientFactory.createBatchingPublisher(dmaapPublisherProperties);
+ }
}