import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
-
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
import java.util.Vector;
-
import javax.validation.constraints.NotNull;
-
import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants;
import org.oransc.policyagent.exceptions.ServiceException;
import org.springframework.http.MediaType;
String urlPath = url.getPath();
DmaapUrlPath path = parseDmaapUrlPath(urlPath);
- dmaapProps.put("ServiceName", url.getHost());
+ dmaapProps.put("ServiceName", url.getHost()+":"+url.getPort()+"/events");
dmaapProps.put("topic", path.dmaapTopicName);
- dmaapProps.put("host", url.getHost());
+ dmaapProps.put("host", url.getHost()+":"+url.getPort());
dmaapProps.put("contenttype", MediaType.APPLICATION_JSON.toString());
dmaapProps.put("userName", userName);
dmaapProps.put("password", passwd);
dmaapProps.put("TransportType", ProtocolTypeConstants.HTTPNOAUTH.toString());
dmaapProps.put("timeout", 15000);
dmaapProps.put("limit", 1000);
- dmaapProps.put("port", url.getPort());
} catch (MalformedURLException e) {
throw new ServiceException("Could not parse the URL", e);
}
throw new ServiceException("The path has incorrect syntax: " + urlPath);
}
- final String dmaapTopicName = tokens[1] + "/" + tokens[2]; // /events/A1-P
+ final String dmaapTopicName = tokens[2]; // /events/A1-P
String consumerGroup = ""; // users
String consumerId = ""; // sdnc1
if (tokens.length == 5) {
import java.io.IOException;
import java.util.Properties;
-
+import javax.annotation.PostConstruct;
import org.onap.dmaap.mr.client.MRClientFactory;
import org.onap.dmaap.mr.client.MRConsumer;
import org.onap.dmaap.mr.client.response.MRConsumerResponse;
private final ApplicationConfig applicationConfig;
protected MRConsumer consumer;
private MRConsumerResponse response = null;
+ @Autowired
+ private DmaapMessageHandler dmaapMessageHandler;
@Autowired
public DmaapMessageConsumerImpl(ApplicationConfig applicationConfig) {
this.applicationConfig = applicationConfig;
}
- @Scheduled(fixedRate = 1000 * 60)
+ @Scheduled(fixedRate = 1000 * 10) // , initialDelay=60000)
@Override
public void run() {
- if (!alive) {
- init();
- }
+ /*
+ * if (!alive) { init(); }
+ */
if (this.alive) {
try {
Iterable<String> dmaapMsgs = fetchAllMessages();
+ logger.debug("Fetched all the messages from DMAAP and will start to process the messages");
for (String msg : dmaapMsgs) {
processMsg(msg);
}
logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(), response.getResponseMessage());
if (!"200".equals(response.getResponseCode())) {
logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(),
- response.getResponseMessage());
+ response.getResponseMessage());
}
}
return response.getActualMessages();
}
+ @PostConstruct
@Override
public void init() {
Properties dmaapConsumerProperties = applicationConfig.getDmaapConsumerConfig();
Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig();
// No need to start if there is no configuration.
if (dmaapConsumerProperties == null || dmaapPublisherProperties == null || dmaapConsumerProperties.size() == 0
- || dmaapPublisherProperties.size() == 0) {
+ || dmaapPublisherProperties.size() == 0) {
+ logger.error("DMaaP properties Failed to Load");
return;
}
- // Do we need to do any validation of properties before calling the factory?
try {
+ logger.debug("Creating DMAAP Client");
consumer = MRClientFactory.createConsumer(dmaapConsumerProperties);
this.alive = true;
} catch (IOException e) {
@Override
public void processMsg(String msg) throws Exception {
- System.out.println("sysout" + msg);
+ logger.debug("Message Reveived from DMAAP : {}", msg);
// Call the concurrent Task executor to handle the incoming request
- // Validate the Input & if its valid, post the ACCEPTED Response back to DMAAP
- // through REST CLIENT
- // Call the Controller with the extracted payload
+ dmaapMessageHandler.handleDmaapMsg(msg);
}
@Override
--- /dev/null
+/*-
+ * ========================LICENSE_START=================================
+ * O-RAN-SC
+ * %%
+ * Copyright (C) 2019 Nordix Foundation
+ * %%
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * ========================LICENSE_END===================================
+ */
+
+package org.oransc.policyagent.dmaap;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.util.Properties;
+import org.apache.commons.lang3.StringUtils;
+import org.json.JSONException;
+import org.json.JSONObject;
+import org.json.JSONTokener;
+import org.oransc.policyagent.clients.AsyncRestClient;
+import org.oransc.policyagent.configuration.ApplicationConfig;
+import org.oransc.policyagent.controllers.PolicyController;
+import org.oransc.policyagent.model.DmaapMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.ResponseEntity;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Component;
+
+@Component
+public class DmaapMessageHandler {
+
+ private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandler.class);
+
+ @Autowired
+ private ObjectMapper mapper;
+ @Autowired
+ private PolicyController policyController;
+ private AsyncRestClient restClient;
+ private ApplicationConfig applicationConfig;
+ private String topic = "";
+
+ @Autowired
+ public DmaapMessageHandler(ApplicationConfig applicationConfig) {
+ this.applicationConfig = applicationConfig;
+ }
+
+ // The publish properties is corrupted. It contains the subscribe property values.
+ @Async("threadPoolTaskExecutor")
+ public void handleDmaapMsg(String msg) {
+ init();
+ DmaapMessage dmaapMessage = null;
+ ResponseEntity<String> response = null;
+ // Process the message
+ /**
+ * Sample Request Message from DMAAP { "type": "request", "correlationId":
+ * "c09ac7d1-de62-0016-2000-e63701125557-201", "target": "policy-agent", "timestamp": "2019-05-14T11:44:51.36Z",
+ * "apiVersion": "1.0", "originatorId": "849e6c6b420", "requestId": "23343221", "operation": "getPolicySchemas",
+ * "payload": "{\"ric\":\"ric1\"}" }
+ * --------------------------------------------------------------------------------------------------------------
+ * Sample Response Message to DMAAP { "type": "response", "correlation-id":
+ * "c09ac7d1-de62-0016-2000-e63701125557-201", "timestamp": "2019-05-14T11:44:51.36Z", "originator-id":
+ * "849e6c6b420", "request-id": "23343221", "status" : "ACCEPTED", "message" : "" }
+ * -------------------------------------------------------------------------------------------------------------
+ * Sample Response Message to DMAAP { "type": "response", "correlation-id":
+ * "c09ac7d1-de62-0016-2000-e63701125557-201", "timestamp": "2019-05-14T11:44:51.36Z", "originator-id":
+ * "849e6c6b420", "request-id": "23343221", "status" : "SUCCESS" "message" : "" }
+ */
+ try {
+ dmaapMessage = mapper.readValue(msg, DmaapMessage.class);
+ // Post the accepted message to the DMAAP bus
+ logger.debug("DMAAP Message- {}", dmaapMessage);
+ logger.debug("Post Accepted Message to Client");
+ restClient
+ .post("A1-POLICY-AGENT-WRITE", buildDmaapResponse(dmaapMessage.getCorrelationId(),
+ dmaapMessage.getOriginatorId(), dmaapMessage.getRequestId(), "ACCEPTED", StringUtils.EMPTY))
+ .block(); //
+ // Call the Controller
+ logger.debug("Invoke the Policy Agent Controller");
+ response = invokeController(dmaapMessage);
+ // Post the Response message to the DMAAP bus
+ logger.debug("DMAAP Response Message to Client- {}", response);
+ restClient
+ .post("A1-POLICY-AGENT-WRITE", buildDmaapResponse(dmaapMessage.getCorrelationId(),
+ dmaapMessage.getOriginatorId(), dmaapMessage.getRequestId(), "SUCCESS", response.getBody()))
+ .block(); //
+ } catch (IOException e) {
+ logger.error("Exception occured during message processing", e);
+ }
+ }
+
+ private ResponseEntity<String> invokeController(DmaapMessage dmaapMessage) {
+ String formattedString = "";
+ String ricName;
+ String instance;
+ logger.debug("Payload from the Message - {}", dmaapMessage.getPayload());
+ try {
+ formattedString = new JSONTokener(dmaapMessage.getPayload()).nextValue().toString();
+ logger.debug("Removed the Escape charater in payload- {}", formattedString);
+ } catch (JSONException e) {
+ logger.error("Exception occurred during formating Payload- {}", dmaapMessage.getPayload());
+ }
+ JSONObject jsonObject = new JSONObject(formattedString);
+ switch (dmaapMessage.getOperation()) {
+ case "getPolicySchemas":
+ ricName = (String) jsonObject.get("ricName");
+ logger.debug("Received the request for getPolicySchemas with Ric Name- {}", ricName);
+ return policyController.getPolicySchemas(ricName);
+ case "getPolicySchema":
+ String policyTypeId = (String) jsonObject.get("id");
+ logger.debug("Received the request for getPolicySchema with Policy Type Id- {}", policyTypeId);
+ System.out.println("policyTypeId" + policyTypeId);
+ return policyController.getPolicySchema(policyTypeId);
+ case "getPolicyTypes":
+ ricName = (String) jsonObject.get("ricName");
+ logger.debug("Received the request for getPolicyTypes with Ric Name- {}", ricName);
+ return policyController.getPolicyTypes(ricName);
+ case "getPolicy":
+ instance = (String) jsonObject.get("instance");
+ logger.debug("Received the request for getPolicy with Instance- {}", instance);
+ return policyController.getPolicy(instance);
+ case "deletePolicy":
+ instance = (String) jsonObject.get("instance");
+ logger.debug("Received the request for deletePolicy with Instance- {}", instance);
+ return null;// policyController.deletePolicy(deleteInstance);
+ case "putPolicy":
+ String type = (String) jsonObject.get("type");
+ String putPolicyInstance = (String) jsonObject.get("instance");
+ String putPolicyRic = (String) jsonObject.get("ric");
+ String service = (String) jsonObject.get("service");
+ String jsonBody = (String) jsonObject.get("jsonBody");
+ return null;// policyController.putPolicy(type, putPolicyInstance, putPolicyRic, service, jsonBody);
+ case "getPolicies":
+ String getPolicyType = (String) jsonObject.get("type");
+ String getPolicyRic = (String) jsonObject.get("ric");
+ String getPolicyService = (String) jsonObject.get("service");
+ return policyController.getPolicies(getPolicyType, getPolicyRic, getPolicyService);
+ default:
+ break;
+ }
+ return null;
+ }
+
+ private String buildDmaapResponse(String correlationId, String originatorId, String requestId, String status,
+ String message) {
+ System.out.println("buildResponse ");
+ return new JSONObject().put("type", "response").put(correlationId, correlationId).put("timestamp", "")
+ .put("originatorId", originatorId).put("requestId", requestId).put("status", status)
+ .put("message", message).toString();
+ }
+
+ // @PostConstruct
+ // The application properties value is always NULL for the first time
+ // Need to fix this
+ public void init() {
+ logger.debug("Reading DMAAP Publisher bus details from Application Config");
+ Properties dmaapPublisherConfig = applicationConfig.getDmaapPublisherConfig();
+ String host = (String) dmaapPublisherConfig.get("ServiceName");
+ topic = dmaapPublisherConfig.getProperty("topic");
+ logger.debug("Read the topic & Service Name - {} , {}", host, topic);
+ this.restClient = new AsyncRestClient("http://" + host + "/"); // get this value from application config
+
+ }
+}