import java.io.IOException;
import java.util.Properties;
+import javax.annotation.PostConstruct;
import org.onap.dmaap.mr.client.MRClientFactory;
import org.onap.dmaap.mr.client.MRConsumer;
import org.onap.dmaap.mr.client.response.MRConsumerResponse;
private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumerImpl.class);
private boolean alive = false;
+ private final ApplicationConfig applicationConfig;
protected MRConsumer consumer;
private MRConsumerResponse response = null;
+ @Autowired
+ private DmaapMessageHandler dmaapMessageHandler;
@Autowired
public DmaapMessageConsumerImpl(ApplicationConfig applicationConfig) {
- Properties dmaapConsumerConfig = applicationConfig.getDmaapConsumerConfig();
- init(dmaapConsumerConfig);
+ this.applicationConfig = applicationConfig;
}
- @Scheduled(fixedRate = 1000 * 60)
+ @Scheduled(fixedRate = 1000 * 10) // , initialDelay=60000)
@Override
public void run() {
- while (this.alive) {
+ /*
+ * if (!alive) { init(); }
+ */
+ if (this.alive) {
try {
Iterable<String> dmaapMsgs = fetchAllMessages();
+ logger.debug("Fetched all the messages from DMAAP and will start to process the messages");
for (String msg : dmaapMsgs) {
processMsg(msg);
}
return response.getActualMessages();
}
+ @PostConstruct
@Override
- public void init(Properties properties) {
- // Initialize the DMAAP with the properties
- // Do we need to do any validation of properties before calling the factory?
- Properties prop = new Properties();
- prop.setProperty("ServiceName", "localhost:6845/events");
- prop.setProperty("topic", "A1-P");
- prop.setProperty("host", "localhost:6845");
- prop.setProperty("contenttype", "application/json");
- prop.setProperty("username", "admin");
- prop.setProperty("password", "admin");
- prop.setProperty("group", "users");
- prop.setProperty("id", "policy-agent");
- prop.setProperty("TransportType", "HTTPNOAUTH");
- prop.setProperty("timeout", "15000");
- prop.setProperty("limit", "1000");
+ public void init() {
+ Properties dmaapConsumerProperties = applicationConfig.getDmaapConsumerConfig();
+ Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig();
+ // No need to start if there is no configuration.
+ if (dmaapConsumerProperties == null || dmaapPublisherProperties == null || dmaapConsumerProperties.size() == 0
+ || dmaapPublisherProperties.size() == 0) {
+ logger.error("DMaaP properties Failed to Load");
+ return;
+ }
try {
- consumer = MRClientFactory.createConsumer(prop);
+ logger.debug("Creating DMAAP Client");
+ consumer = MRClientFactory.createConsumer(dmaapConsumerProperties);
this.alive = true;
} catch (IOException e) {
logger.error("Exception occurred while creating Dmaap Consumer", e);
@Override
public void processMsg(String msg) throws Exception {
- System.out.println("sysout" + msg);
+ logger.debug("Message Reveived from DMAAP : {}", msg);
// Call the concurrent Task executor to handle the incoming request
- // Validate the Input & if its valid, post the ACCEPTED Response back to DMAAP
- // through REST CLIENT
- // Call the Controller with the extracted payload
+ dmaapMessageHandler.handleDmaapMsg(msg);
}
@Override