return get(obj, memberName).getAsJsonArray();
}
- private Properties parseDmaapConfig(JsonObject consumerCfg) throws ServiceException {
- Set<Entry<String, JsonElement>> topics = consumerCfg.entrySet();
- if (topics.size() != 1) {
- throw new ServiceException("Invalid configuration, number of topic must be one, config: " + topics);
+ private Properties parseDmaapConfig(JsonObject streamCfg) throws ServiceException {
+ Set<Entry<String, JsonElement>> streamConfigEntries = streamCfg.entrySet();
+ if (streamConfigEntries.size() != 1) {
+ throw new ServiceException(
+ "Invalid configuration. Number of streams must be one, config: " + streamConfigEntries);
}
- JsonObject topic = topics.iterator().next().getValue().getAsJsonObject();
- JsonObject dmaapInfo = get(topic, "dmaap_info").getAsJsonObject();
+ JsonObject streamConfigEntry = streamConfigEntries.iterator().next().getValue().getAsJsonObject();
+ JsonObject dmaapInfo = get(streamConfigEntry, "dmaap_info").getAsJsonObject();
String topicUrl = getAsString(dmaapInfo, "topic_url");
Properties dmaapProps = new Properties();
return response.getActualMessages();
}
- // Properties are not loaded in first atempt. Need to fix this and then uncomment the post construct annotation
- // @PostConstruct
@Override
public void init() {
Properties dmaapConsumerProperties = applicationConfig.getDmaapConsumerConfig();
}
try {
logger.debug("Creating DMAAP Client");
- System.out.println("dmaapConsumerProperties--->"+dmaapConsumerProperties.getProperty("topic"));
- System.out.println("dmaapPublisherProperties--->"+dmaapPublisherProperties.getProperty("topic"));
+ logger.debug("dmaapConsumerProperties---> {}", dmaapConsumerProperties.getProperty("topic"));
+ logger.debug("dmaapPublisherProperties---> {}", dmaapPublisherProperties.getProperty("topic"));
consumer = MRClientFactory.createConsumer(dmaapConsumerProperties);
this.alive = true;
} catch (IOException e) {
@Autowired
private PolicyController policyController;
private AsyncRestClient restClient;
+ @Autowired
private ApplicationConfig applicationConfig;
private String topic = "";
- @Autowired
- public DmaapMessageHandler(ApplicationConfig applicationConfig) {
- this.applicationConfig = applicationConfig;
- }
-
// The publish properties is corrupted. It contains the subscribe property values.
@Async("threadPoolTaskExecutor")
public void handleDmaapMsg(String msg) {
+ logger.debug("Message ---------->{}", msg);
init();
DmaapRequestMessage dmaapRequestMessage = null;
Optional<String> dmaapResponse = null;
* "apiVersion": "1.0", "originatorId": "849e6c6b420", "requestId": "23343221", "operation": "getPolicySchemas",
* "payload": "{\"ricName\":\"ric1\"}" }
*
- * --------------------------------------------------------------------------------------------------------------
+ * -------------------------------------------------------------------------------------------------------------
* Sample Response Message to DMAAP {type=response, correlationId=c09ac7d1-de62-0016-2000-e63701125557-201,
* timestamp=null, originatorId=849e6c6b420, requestId=23343221, status=200 OK, message=[]}
* -------------------------------------------------------------------------------------------------------------
return Optional.empty();
}
- // @PostConstruct
- // The application properties value is always NULL for the first time
- // Need to fix this
public void init() {
logger.debug("Reading DMAAP Publisher bus details from Application Config");
Properties dmaapPublisherConfig = applicationConfig.getDmaapPublisherConfig();
String host = (String) dmaapPublisherConfig.get("ServiceName");
topic = dmaapPublisherConfig.getProperty("topic");
- System.out.println("\"Read the topic ---------->" + topic);
logger.debug("Read the topic & Service Name - {} , {}", host, topic);
this.restClient = new AsyncRestClient("http://" + host + "/"); // get this value from application config
@Autowired
private Services services;
- // Only for unittesting
+ // Only for unit testing
StartupService(ApplicationConfig appConfig, RefreshConfigTask refreshTask, Rics rics, PolicyTypes policyTypes,
A1ClientFactory a1ClientFactory, Policies policies, Services services) {
this.applicationConfig = appConfig;