2 * ========================LICENSE_START=================================
5 * Copyright (C) 2019 Nordix Foundation
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ========================LICENSE_END===================================
21 package org.oransc.policyagent.dmaap;
23 import com.fasterxml.jackson.databind.ObjectMapper;
24 import java.io.IOException;
25 import java.util.Properties;
26 import org.apache.commons.lang3.StringUtils;
27 import org.json.JSONException;
28 import org.json.JSONObject;
29 import org.json.JSONTokener;
30 import org.oransc.policyagent.clients.AsyncRestClient;
31 import org.oransc.policyagent.configuration.ApplicationConfig;
32 import org.oransc.policyagent.controllers.PolicyController;
33 import org.oransc.policyagent.model.DmaapMessage;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36 import org.springframework.beans.factory.annotation.Autowired;
37 import org.springframework.http.ResponseEntity;
38 import org.springframework.scheduling.annotation.Async;
39 import org.springframework.stereotype.Component;
42 public class DmaapMessageHandler {
44 private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandler.class);
47 private ObjectMapper mapper;
49 private PolicyController policyController;
50 private AsyncRestClient restClient;
51 private ApplicationConfig applicationConfig;
52 private String topic = "";
55 public DmaapMessageHandler(ApplicationConfig applicationConfig) {
56 this.applicationConfig = applicationConfig;
59 // The publish properties is corrupted. It contains the subscribe property values.
60 @Async("threadPoolTaskExecutor")
61 public void handleDmaapMsg(String msg) {
63 DmaapMessage dmaapMessage = null;
64 ResponseEntity<String> response = null;
65 // Process the message
67 * Sample Request Message from DMAAP { "type": "request", "correlationId":
68 * "c09ac7d1-de62-0016-2000-e63701125557-201", "target": "policy-agent", "timestamp": "2019-05-14T11:44:51.36Z",
69 * "apiVersion": "1.0", "originatorId": "849e6c6b420", "requestId": "23343221", "operation": "getPolicySchemas",
70 * "payload": "{\"ric\":\"ric1\"}" }
71 * --------------------------------------------------------------------------------------------------------------
72 * Sample Response Message to DMAAP { "type": "response", "correlation-id":
73 * "c09ac7d1-de62-0016-2000-e63701125557-201", "timestamp": "2019-05-14T11:44:51.36Z", "originator-id":
74 * "849e6c6b420", "request-id": "23343221", "status" : "ACCEPTED", "message" : "" }
75 * -------------------------------------------------------------------------------------------------------------
76 * Sample Response Message to DMAAP { "type": "response", "correlation-id":
77 * "c09ac7d1-de62-0016-2000-e63701125557-201", "timestamp": "2019-05-14T11:44:51.36Z", "originator-id":
78 * "849e6c6b420", "request-id": "23343221", "status" : "SUCCESS" "message" : "" }
81 dmaapMessage = mapper.readValue(msg, DmaapMessage.class);
82 // Post the accepted message to the DMAAP bus
83 logger.debug("DMAAP Message- {}", dmaapMessage);
84 logger.debug("Post Accepted Message to Client");
86 .post("A1-POLICY-AGENT-WRITE", buildDmaapResponse(dmaapMessage.getCorrelationId(),
87 dmaapMessage.getOriginatorId(), dmaapMessage.getRequestId(), "ACCEPTED", StringUtils.EMPTY))
89 // Call the Controller
90 logger.debug("Invoke the Policy Agent Controller");
91 response = invokeController(dmaapMessage);
92 // Post the Response message to the DMAAP bus
93 logger.debug("DMAAP Response Message to Client- {}", response);
95 .post("A1-POLICY-AGENT-WRITE", buildDmaapResponse(dmaapMessage.getCorrelationId(),
96 dmaapMessage.getOriginatorId(), dmaapMessage.getRequestId(), "SUCCESS", response.getBody()))
98 } catch (IOException e) {
99 logger.error("Exception occured during message processing", e);
103 private ResponseEntity<String> invokeController(DmaapMessage dmaapMessage) {
104 String formattedString = "";
107 logger.debug("Payload from the Message - {}", dmaapMessage.getPayload());
109 formattedString = new JSONTokener(dmaapMessage.getPayload()).nextValue().toString();
110 logger.debug("Removed the Escape charater in payload- {}", formattedString);
111 } catch (JSONException e) {
112 logger.error("Exception occurred during formating Payload- {}", dmaapMessage.getPayload());
114 JSONObject jsonObject = new JSONObject(formattedString);
115 switch (dmaapMessage.getOperation()) {
116 case "getPolicySchemas":
117 ricName = (String) jsonObject.get("ricName");
118 logger.debug("Received the request for getPolicySchemas with Ric Name- {}", ricName);
119 return policyController.getPolicySchemas(ricName);
120 case "getPolicySchema":
121 String policyTypeId = (String) jsonObject.get("id");
122 logger.debug("Received the request for getPolicySchema with Policy Type Id- {}", policyTypeId);
123 System.out.println("policyTypeId" + policyTypeId);
124 return policyController.getPolicySchema(policyTypeId);
125 case "getPolicyTypes":
126 ricName = (String) jsonObject.get("ricName");
127 logger.debug("Received the request for getPolicyTypes with Ric Name- {}", ricName);
128 return policyController.getPolicyTypes(ricName);
130 instance = (String) jsonObject.get("instance");
131 logger.debug("Received the request for getPolicy with Instance- {}", instance);
132 return policyController.getPolicy(instance);
134 instance = (String) jsonObject.get("instance");
135 logger.debug("Received the request for deletePolicy with Instance- {}", instance);
136 return null;// policyController.deletePolicy(deleteInstance);
138 String type = (String) jsonObject.get("type");
139 String putPolicyInstance = (String) jsonObject.get("instance");
140 String putPolicyRic = (String) jsonObject.get("ric");
141 String service = (String) jsonObject.get("service");
142 String jsonBody = (String) jsonObject.get("jsonBody");
143 return null;// policyController.putPolicy(type, putPolicyInstance, putPolicyRic, service, jsonBody);
145 String getPolicyType = (String) jsonObject.get("type");
146 String getPolicyRic = (String) jsonObject.get("ric");
147 String getPolicyService = (String) jsonObject.get("service");
148 return policyController.getPolicies(getPolicyType, getPolicyRic, getPolicyService);
155 private String buildDmaapResponse(String correlationId, String originatorId, String requestId, String status,
157 System.out.println("buildResponse ");
158 return new JSONObject().put("type", "response").put(correlationId, correlationId).put("timestamp", "")
159 .put("originatorId", originatorId).put("requestId", requestId).put("status", status)
160 .put("message", message).toString();
164 // The application properties value is always NULL for the first time
167 logger.debug("Reading DMAAP Publisher bus details from Application Config");
168 Properties dmaapPublisherConfig = applicationConfig.getDmaapPublisherConfig();
169 String host = (String) dmaapPublisherConfig.get("ServiceName");
170 topic = dmaapPublisherConfig.getProperty("topic");
171 logger.debug("Read the topic & Service Name - {} , {}", host, topic);
172 this.restClient = new AsyncRestClient("http://" + host + "/"); // get this value from application config