/*- * ========================LICENSE_START================================= * O-RAN-SC * %% * Copyright (C) 2019 Nordix Foundation * %% * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * ========================LICENSE_END=================================== */ package org.oransc.policyagent.dmaap; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import java.util.Properties; import org.apache.commons.lang3.StringUtils; import org.json.JSONException; import org.json.JSONObject; import org.json.JSONTokener; import org.oransc.policyagent.clients.AsyncRestClient; import org.oransc.policyagent.configuration.ApplicationConfig; import org.oransc.policyagent.controllers.PolicyController; import org.oransc.policyagent.model.DmaapMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.ResponseEntity; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; @Component public class DmaapMessageHandler { private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandler.class); @Autowired private ObjectMapper mapper; @Autowired private PolicyController policyController; private AsyncRestClient restClient; private ApplicationConfig applicationConfig; private String topic = ""; @Autowired public DmaapMessageHandler(ApplicationConfig applicationConfig) { this.applicationConfig = applicationConfig; } // The publish properties is corrupted. It contains the subscribe property values. @Async("threadPoolTaskExecutor") public void handleDmaapMsg(String msg) { init(); DmaapMessage dmaapMessage = null; ResponseEntity response = null; // Process the message /** * Sample Request Message from DMAAP { "type": "request", "correlationId": * "c09ac7d1-de62-0016-2000-e63701125557-201", "target": "policy-agent", "timestamp": "2019-05-14T11:44:51.36Z", * "apiVersion": "1.0", "originatorId": "849e6c6b420", "requestId": "23343221", "operation": "getPolicySchemas", * "payload": "{\"ric\":\"ric1\"}" } * -------------------------------------------------------------------------------------------------------------- * Sample Response Message to DMAAP { "type": "response", "correlation-id": * "c09ac7d1-de62-0016-2000-e63701125557-201", "timestamp": "2019-05-14T11:44:51.36Z", "originator-id": * "849e6c6b420", "request-id": "23343221", "status" : "ACCEPTED", "message" : "" } * ------------------------------------------------------------------------------------------------------------- * Sample Response Message to DMAAP { "type": "response", "correlation-id": * "c09ac7d1-de62-0016-2000-e63701125557-201", "timestamp": "2019-05-14T11:44:51.36Z", "originator-id": * "849e6c6b420", "request-id": "23343221", "status" : "SUCCESS" "message" : "" } */ try { dmaapMessage = mapper.readValue(msg, DmaapMessage.class); // Post the accepted message to the DMAAP bus logger.debug("DMAAP Message- {}", dmaapMessage); logger.debug("Post Accepted Message to Client"); restClient .post("A1-POLICY-AGENT-WRITE", buildDmaapResponse(dmaapMessage.getCorrelationId(), dmaapMessage.getOriginatorId(), dmaapMessage.getRequestId(), "ACCEPTED", StringUtils.EMPTY)) .block(); // // Call the Controller logger.debug("Invoke the Policy Agent Controller"); response = invokeController(dmaapMessage); // Post the Response message to the DMAAP bus logger.debug("DMAAP Response Message to Client- {}", response); restClient .post("A1-POLICY-AGENT-WRITE", buildDmaapResponse(dmaapMessage.getCorrelationId(), dmaapMessage.getOriginatorId(), dmaapMessage.getRequestId(), "SUCCESS", response.getBody())) .block(); // } catch (IOException e) { logger.error("Exception occured during message processing", e); } } private ResponseEntity invokeController(DmaapMessage dmaapMessage) { String formattedString = ""; String ricName; String instance; logger.debug("Payload from the Message - {}", dmaapMessage.getPayload()); try { formattedString = new JSONTokener(dmaapMessage.getPayload()).nextValue().toString(); logger.debug("Removed the Escape charater in payload- {}", formattedString); } catch (JSONException e) { logger.error("Exception occurred during formating Payload- {}", dmaapMessage.getPayload()); } JSONObject jsonObject = new JSONObject(formattedString); switch (dmaapMessage.getOperation()) { case "getPolicySchemas": ricName = (String) jsonObject.get("ricName"); logger.debug("Received the request for getPolicySchemas with Ric Name- {}", ricName); return policyController.getPolicySchemas(ricName); case "getPolicySchema": String policyTypeId = (String) jsonObject.get("id"); logger.debug("Received the request for getPolicySchema with Policy Type Id- {}", policyTypeId); System.out.println("policyTypeId" + policyTypeId); return policyController.getPolicySchema(policyTypeId); case "getPolicyTypes": ricName = (String) jsonObject.get("ricName"); logger.debug("Received the request for getPolicyTypes with Ric Name- {}", ricName); return policyController.getPolicyTypes(ricName); case "getPolicy": instance = (String) jsonObject.get("instance"); logger.debug("Received the request for getPolicy with Instance- {}", instance); return policyController.getPolicy(instance); case "deletePolicy": instance = (String) jsonObject.get("instance"); logger.debug("Received the request for deletePolicy with Instance- {}", instance); return null;// policyController.deletePolicy(deleteInstance); case "putPolicy": String type = (String) jsonObject.get("type"); String putPolicyInstance = (String) jsonObject.get("instance"); String putPolicyRic = (String) jsonObject.get("ric"); String service = (String) jsonObject.get("service"); String jsonBody = (String) jsonObject.get("jsonBody"); return null;// policyController.putPolicy(type, putPolicyInstance, putPolicyRic, service, jsonBody); case "getPolicies": String getPolicyType = (String) jsonObject.get("type"); String getPolicyRic = (String) jsonObject.get("ric"); String getPolicyService = (String) jsonObject.get("service"); return policyController.getPolicies(getPolicyType, getPolicyRic, getPolicyService); default: break; } return null; } private String buildDmaapResponse(String correlationId, String originatorId, String requestId, String status, String message) { System.out.println("buildResponse "); return new JSONObject().put("type", "response").put(correlationId, correlationId).put("timestamp", "") .put("originatorId", originatorId).put("requestId", requestId).put("status", status) .put("message", message).toString(); } // @PostConstruct // The application properties value is always NULL for the first time // Need to fix this public void init() { logger.debug("Reading DMAAP Publisher bus details from Application Config"); Properties dmaapPublisherConfig = applicationConfig.getDmaapPublisherConfig(); String host = (String) dmaapPublisherConfig.get("ServiceName"); topic = dmaapPublisherConfig.getProperty("topic"); logger.debug("Read the topic & Service Name - {} , {}", host, topic); this.restClient = new AsyncRestClient("http://" + host + "/"); // get this value from application config } }