2 * ========================LICENSE_START=================================
5 * Copyright (C) 2019 Nordix Foundation
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ========================LICENSE_END===================================
21 package org.oransc.policyagent.dmaap;
23 import com.fasterxml.jackson.core.JsonProcessingException;
24 import com.fasterxml.jackson.databind.ObjectMapper;
25 import java.io.IOException;
26 import java.util.Optional;
27 import java.util.Properties;
28 import org.json.JSONException;
29 import org.json.JSONObject;
30 import org.json.JSONTokener;
31 import org.oransc.policyagent.clients.AsyncRestClient;
32 import org.oransc.policyagent.configuration.ApplicationConfig;
33 import org.oransc.policyagent.controllers.PolicyController;
34 import org.oransc.policyagent.model.DmaapRequestMessage;
35 import org.oransc.policyagent.model.DmaapResponseMessage;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38 import org.springframework.beans.factory.annotation.Autowired;
39 import org.springframework.http.ResponseEntity;
40 import org.springframework.scheduling.annotation.Async;
41 import org.springframework.stereotype.Component;
44 public class DmaapMessageHandler {
46 private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandler.class);
49 private ObjectMapper mapper;
51 private PolicyController policyController;
52 private AsyncRestClient restClient;
54 private ApplicationConfig applicationConfig;
55 private String topic = "";
57 // The publish properties is corrupted. It contains the subscribe property values.
58 @Async("threadPoolTaskExecutor")
59 public void handleDmaapMsg(String msg) {
60 logger.debug("Message ---------->{}", msg);
62 DmaapRequestMessage dmaapRequestMessage = null;
63 Optional<String> dmaapResponse = null;
64 // Process the message
66 * Sample Request Message from DMAAP { "type": "request", "correlationId":
67 * "c09ac7d1-de62-0016-2000-e63701125557-201", "target": "policy-agent", "timestamp": "2019-05-14T11:44:51.36Z",
68 * "apiVersion": "1.0", "originatorId": "849e6c6b420", "requestId": "23343221", "operation": "getPolicySchemas",
69 * "payload": "{\"ricName\":\"ric1\"}" }
71 * -------------------------------------------------------------------------------------------------------------
72 * Sample Response Message to DMAAP {type=response, correlationId=c09ac7d1-de62-0016-2000-e63701125557-201,
73 * timestamp=null, originatorId=849e6c6b420, requestId=23343221, status=200 OK, message=[]}
74 * -------------------------------------------------------------------------------------------------------------
77 dmaapRequestMessage = mapper.readValue(msg, DmaapRequestMessage.class);
78 // Call the Controller
79 logger.debug("Invoke the Policy Agent Controller");
80 dmaapResponse = invokeController(dmaapRequestMessage);
81 // Post the Response message to the DMAAP bus
82 logger.debug("DMAAP Response Message to Client- {}", dmaapResponse);
83 if (dmaapResponse.isPresent()) {
84 restClient.post("A1-POLICY-AGENT-WRITE", dmaapResponse.get()).block(); //
86 } catch (IOException e) {
87 logger.error("Exception occured during message processing", e);
91 private Optional<String> invokeController(DmaapRequestMessage dmaapRequestMessage) {
92 String formattedString = "";
96 logger.debug("Payload from the Message - {}", dmaapRequestMessage.getPayload());
98 formattedString = new JSONTokener(dmaapRequestMessage.getPayload()).nextValue().toString();
99 logger.debug("Removed the Escape charater in payload- {}", formattedString);
100 } catch (JSONException e) {
101 logger.error("Exception occurred during formating Payload- {}", dmaapRequestMessage.getPayload());
103 JSONObject jsonObject = new JSONObject(formattedString);
104 switch (dmaapRequestMessage.getOperation()) {
105 case "getPolicySchemas":
106 ricName = (String) jsonObject.get("ricName");
107 logger.debug("Received the request for getPolicySchemas with Ric Name- {}", ricName);
108 return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicySchemas(ricName));
109 case "getPolicySchema":
110 String policyTypeId = (String) jsonObject.get("id");
111 logger.debug("Received the request for getPolicySchema with Policy Type Id- {}", policyTypeId);
112 return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicySchema(policyTypeId));
113 case "getPolicyTypes":
114 ricName = (String) jsonObject.get("ricName");
115 logger.debug("Received the request for getPolicyTypes with Ric Name- {}", ricName);
116 return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicyTypes(ricName));
118 instance = (String) jsonObject.get("instance");
119 logger.debug("Received the request for getPolicy with Instance- {}", instance);
120 return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicy(instance));
122 instance = (String) jsonObject.get("instance");
123 logger.debug("Received the request for deletePolicy with Instance- {}", instance);
124 return getDmaapResponseMessage(dmaapRequestMessage, policyController.deletePolicy(instance).block());
126 String type = (String) jsonObject.get("type");
127 String putPolicyInstance = (String) jsonObject.get("instance");
128 String putPolicyRic = (String) jsonObject.get("ric");
129 String service = (String) jsonObject.get("service");
130 jsonBody = (String) jsonObject.get("jsonBody");
131 return getDmaapResponseMessage(dmaapRequestMessage,
132 policyController.putPolicy(type, putPolicyInstance, putPolicyRic, service, jsonBody).block());
134 String getPolicyType = (String) jsonObject.get("type");
135 instance = (String) jsonObject.get("instance");
136 String getPolicyRic = (String) jsonObject.get("ric");
137 String getPolicyService = (String) jsonObject.get("service");
138 jsonBody = (String) jsonObject.get("jsonBody");
139 return getDmaapResponseMessage(dmaapRequestMessage, policyController
140 .putPolicy(getPolicyType, instance, getPolicyRic, getPolicyService, jsonBody).block());
144 return Optional.empty();
147 private Optional<String> getDmaapResponseMessage(DmaapRequestMessage dmaapRequestMessage,
148 ResponseEntity<?> policySchemas) {
149 DmaapResponseMessage dmaapResponseMessage = DmaapResponseMessage.builder()
150 .status(policySchemas.getStatusCode().toString()).message(policySchemas.getBody().toString())
151 .type("response").correlationId(dmaapRequestMessage.getCorrelationId())
152 .originatorId(dmaapRequestMessage.getOriginatorId()).requestId(dmaapRequestMessage.getRequestId())
155 return Optional.of(mapper.writeValueAsString(dmaapResponseMessage));
156 } catch (JsonProcessingException e) {
157 logger.error("Exception occured during getDmaapResponseMessage", e);
159 return Optional.empty();
163 logger.debug("Reading DMAAP Publisher bus details from Application Config");
164 Properties dmaapPublisherConfig = applicationConfig.getDmaapPublisherConfig();
165 String host = (String) dmaapPublisherConfig.get("ServiceName");
166 topic = dmaapPublisherConfig.getProperty("topic");
167 logger.debug("Read the topic & Service Name - {} , {}", host, topic);
168 this.restClient = new AsyncRestClient("http://" + host + "/"); // get this value from application config