package org.oransc.policyagent.dmaap;
+import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException;
+import java.util.Optional;
import java.util.Properties;
-import org.apache.commons.lang3.StringUtils;
import org.json.JSONException;
import org.json.JSONObject;
import org.json.JSONTokener;
import org.oransc.policyagent.clients.AsyncRestClient;
import org.oransc.policyagent.configuration.ApplicationConfig;
import org.oransc.policyagent.controllers.PolicyController;
-import org.oransc.policyagent.model.DmaapMessage;
+import org.oransc.policyagent.model.DmaapRequestMessage;
+import org.oransc.policyagent.model.DmaapResponseMessage;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandler.class);
+ private boolean initialize = false;
@Autowired
private ObjectMapper mapper;
@Autowired
private PolicyController policyController;
private AsyncRestClient restClient;
+ @Autowired
private ApplicationConfig applicationConfig;
private String topic = "";
- @Autowired
- public DmaapMessageHandler(ApplicationConfig applicationConfig) {
- this.applicationConfig = applicationConfig;
- }
-
// The publish properties is corrupted. It contains the subscribe property values.
@Async("threadPoolTaskExecutor")
public void handleDmaapMsg(String msg) {
- init();
- DmaapMessage dmaapMessage = null;
- ResponseEntity<String> response = null;
+ if (!initialize) {
+ init();
+ }
+ DmaapRequestMessage dmaapRequestMessage = null;
+ Optional<String> dmaapResponse = null;
// Process the message
/**
* Sample Request Message from DMAAP { "type": "request", "correlationId":
* "c09ac7d1-de62-0016-2000-e63701125557-201", "target": "policy-agent", "timestamp": "2019-05-14T11:44:51.36Z",
* "apiVersion": "1.0", "originatorId": "849e6c6b420", "requestId": "23343221", "operation": "getPolicySchemas",
- * "payload": "{\"ric\":\"ric1\"}" }
- * --------------------------------------------------------------------------------------------------------------
- * Sample Response Message to DMAAP { "type": "response", "correlation-id":
- * "c09ac7d1-de62-0016-2000-e63701125557-201", "timestamp": "2019-05-14T11:44:51.36Z", "originator-id":
- * "849e6c6b420", "request-id": "23343221", "status" : "ACCEPTED", "message" : "" }
+ * "payload": "{\"ricName\":\"ric1\"}" }
+ *
+ * -------------------------------------------------------------------------------------------------------------
+ * Sample Response Message to DMAAP {type=response, correlationId=c09ac7d1-de62-0016-2000-e63701125557-201,
+ * timestamp=null, originatorId=849e6c6b420, requestId=23343221, status=200 OK, message=[]}
* -------------------------------------------------------------------------------------------------------------
- * Sample Response Message to DMAAP { "type": "response", "correlation-id":
- * "c09ac7d1-de62-0016-2000-e63701125557-201", "timestamp": "2019-05-14T11:44:51.36Z", "originator-id":
- * "849e6c6b420", "request-id": "23343221", "status" : "SUCCESS" "message" : "" }
*/
try {
- dmaapMessage = mapper.readValue(msg, DmaapMessage.class);
- // Post the accepted message to the DMAAP bus
- logger.debug("DMAAP Message- {}", dmaapMessage);
- logger.debug("Post Accepted Message to Client");
- restClient
- .post("A1-POLICY-AGENT-WRITE", buildDmaapResponse(dmaapMessage.getCorrelationId(),
- dmaapMessage.getOriginatorId(), dmaapMessage.getRequestId(), "ACCEPTED", StringUtils.EMPTY))
- .block(); //
+ dmaapRequestMessage = mapper.readValue(msg, DmaapRequestMessage.class);
// Call the Controller
logger.debug("Invoke the Policy Agent Controller");
- response = invokeController(dmaapMessage);
+ dmaapResponse = invokeController(dmaapRequestMessage);
// Post the Response message to the DMAAP bus
- logger.debug("DMAAP Response Message to Client- {}", response);
- restClient
- .post("A1-POLICY-AGENT-WRITE", buildDmaapResponse(dmaapMessage.getCorrelationId(),
- dmaapMessage.getOriginatorId(), dmaapMessage.getRequestId(), "SUCCESS", response.getBody()))
- .block(); //
+ logger.debug("DMAAP Response Message to Client- {}", dmaapResponse);
+ if (dmaapResponse.isPresent()) {
+ restClient.post("A1-POLICY-AGENT-WRITE", dmaapResponse.get()).block(); //
+ }
} catch (IOException e) {
logger.error("Exception occured during message processing", e);
}
}
- private ResponseEntity<String> invokeController(DmaapMessage dmaapMessage) {
+ private Optional<String> invokeController(DmaapRequestMessage dmaapRequestMessage) {
String formattedString = "";
String ricName;
String instance;
- logger.debug("Payload from the Message - {}", dmaapMessage.getPayload());
+ String jsonBody;
+ logger.debug("Payload from the Message - {}", dmaapRequestMessage.getPayload());
try {
- formattedString = new JSONTokener(dmaapMessage.getPayload()).nextValue().toString();
+ formattedString = new JSONTokener(dmaapRequestMessage.getPayload()).nextValue().toString();
logger.debug("Removed the Escape charater in payload- {}", formattedString);
} catch (JSONException e) {
- logger.error("Exception occurred during formating Payload- {}", dmaapMessage.getPayload());
+ logger.error("Exception occurred during formating Payload- {}", dmaapRequestMessage.getPayload());
}
JSONObject jsonObject = new JSONObject(formattedString);
- switch (dmaapMessage.getOperation()) {
+ switch (dmaapRequestMessage.getOperation()) {
case "getPolicySchemas":
ricName = (String) jsonObject.get("ricName");
logger.debug("Received the request for getPolicySchemas with Ric Name- {}", ricName);
- return policyController.getPolicySchemas(ricName);
+ return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicySchemas(ricName));
case "getPolicySchema":
String policyTypeId = (String) jsonObject.get("id");
logger.debug("Received the request for getPolicySchema with Policy Type Id- {}", policyTypeId);
- System.out.println("policyTypeId" + policyTypeId);
- return policyController.getPolicySchema(policyTypeId);
+ return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicySchema(policyTypeId));
case "getPolicyTypes":
ricName = (String) jsonObject.get("ricName");
logger.debug("Received the request for getPolicyTypes with Ric Name- {}", ricName);
- return policyController.getPolicyTypes(ricName);
+ return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicyTypes(ricName));
case "getPolicy":
instance = (String) jsonObject.get("instance");
logger.debug("Received the request for getPolicy with Instance- {}", instance);
- return policyController.getPolicy(instance);
+ return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicy(instance));
case "deletePolicy":
instance = (String) jsonObject.get("instance");
logger.debug("Received the request for deletePolicy with Instance- {}", instance);
- return null;// policyController.deletePolicy(deleteInstance);
+ return getDmaapResponseMessage(dmaapRequestMessage, policyController.deletePolicy(instance).block());
case "putPolicy":
String type = (String) jsonObject.get("type");
String putPolicyInstance = (String) jsonObject.get("instance");
String putPolicyRic = (String) jsonObject.get("ric");
String service = (String) jsonObject.get("service");
- String jsonBody = (String) jsonObject.get("jsonBody");
- return null;// policyController.putPolicy(type, putPolicyInstance, putPolicyRic, service, jsonBody);
+ jsonBody = (String) jsonObject.get("jsonBody");
+ return getDmaapResponseMessage(dmaapRequestMessage,
+ policyController.putPolicy(type, putPolicyInstance, putPolicyRic, service, jsonBody).block());
case "getPolicies":
String getPolicyType = (String) jsonObject.get("type");
+ instance = (String) jsonObject.get("instance");
String getPolicyRic = (String) jsonObject.get("ric");
String getPolicyService = (String) jsonObject.get("service");
- return policyController.getPolicies(getPolicyType, getPolicyRic, getPolicyService);
+ jsonBody = (String) jsonObject.get("jsonBody");
+ return getDmaapResponseMessage(dmaapRequestMessage, policyController
+ .putPolicy(getPolicyType, instance, getPolicyRic, getPolicyService, jsonBody).block());
default:
break;
}
- return null;
+ return Optional.empty();
}
- private String buildDmaapResponse(String correlationId, String originatorId, String requestId, String status,
- String message) {
- System.out.println("buildResponse ");
- return new JSONObject().put("type", "response").put(correlationId, correlationId).put("timestamp", "")
- .put("originatorId", originatorId).put("requestId", requestId).put("status", status)
- .put("message", message).toString();
+ private Optional<String> getDmaapResponseMessage(DmaapRequestMessage dmaapRequestMessage,
+ ResponseEntity<?> policySchemas) {
+ DmaapResponseMessage dmaapResponseMessage = DmaapResponseMessage.builder()
+ .status(policySchemas.getStatusCode().toString()).message(policySchemas.getBody().toString())
+ .type("response").correlationId(dmaapRequestMessage.getCorrelationId())
+ .originatorId(dmaapRequestMessage.getOriginatorId()).requestId(dmaapRequestMessage.getRequestId())
+ .build();
+ try {
+ return Optional.of(mapper.writeValueAsString(dmaapResponseMessage));
+ } catch (JsonProcessingException e) {
+ logger.error("Exception occured during getDmaapResponseMessage", e);
+ }
+ return Optional.empty();
}
- // @PostConstruct
- // The application properties value is always NULL for the first time
- // Need to fix this
public void init() {
logger.debug("Reading DMAAP Publisher bus details from Application Config");
Properties dmaapPublisherConfig = applicationConfig.getDmaapPublisherConfig();
topic = dmaapPublisherConfig.getProperty("topic");
logger.debug("Read the topic & Service Name - {} , {}", host, topic);
this.restClient = new AsyncRestClient("http://" + host + "/"); // get this value from application config
+ initialize = true;
}
}