713d483417e878049c3a67958ad6ff61fd96a47f
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / dmaap / DmaapMessageHandler.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2019 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oransc.policyagent.dmaap;
22
23 import com.fasterxml.jackson.core.JsonProcessingException;
24 import com.fasterxml.jackson.databind.ObjectMapper;
25 import java.io.IOException;
26 import java.util.Optional;
27 import java.util.Properties;
28 import org.json.JSONException;
29 import org.json.JSONObject;
30 import org.json.JSONTokener;
31 import org.oransc.policyagent.clients.AsyncRestClient;
32 import org.oransc.policyagent.configuration.ApplicationConfig;
33 import org.oransc.policyagent.controllers.PolicyController;
34 import org.oransc.policyagent.model.DmaapRequestMessage;
35 import org.oransc.policyagent.model.DmaapResponseMessage;
36 import org.slf4j.Logger;
37 import org.slf4j.LoggerFactory;
38 import org.springframework.beans.factory.annotation.Autowired;
39 import org.springframework.http.ResponseEntity;
40 import org.springframework.scheduling.annotation.Async;
41 import org.springframework.stereotype.Component;
42
43 @Component
44 public class DmaapMessageHandler {
45
46     private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandler.class);
47
48     @Autowired
49     private ObjectMapper mapper;
50     @Autowired
51     private PolicyController policyController;
52     private AsyncRestClient restClient;
53     private ApplicationConfig applicationConfig;
54     private String topic = "";
55
56     @Autowired
57     public DmaapMessageHandler(ApplicationConfig applicationConfig) {
58         this.applicationConfig = applicationConfig;
59     }
60
61     // The publish properties is corrupted. It contains the subscribe property values.
62     @Async("threadPoolTaskExecutor")
63     public void handleDmaapMsg(String msg) {
64         init();
65         DmaapRequestMessage dmaapRequestMessage = null;
66         Optional<String> dmaapResponse = null;
67         // Process the message
68         /**
69          * Sample Request Message from DMAAP { "type": "request", "correlationId":
70          * "c09ac7d1-de62-0016-2000-e63701125557-201", "target": "policy-agent", "timestamp": "2019-05-14T11:44:51.36Z",
71          * "apiVersion": "1.0", "originatorId": "849e6c6b420", "requestId": "23343221", "operation": "getPolicySchemas",
72          * "payload": "{\"ricName\":\"ric1\"}" }
73          *
74          * --------------------------------------------------------------------------------------------------------------
75          * Sample Response Message to DMAAP {type=response, correlationId=c09ac7d1-de62-0016-2000-e63701125557-201,
76          * timestamp=null, originatorId=849e6c6b420, requestId=23343221, status=200 OK, message=[]}
77          * -------------------------------------------------------------------------------------------------------------
78          */
79         try {
80             dmaapRequestMessage = mapper.readValue(msg, DmaapRequestMessage.class);
81             // Call the Controller
82             logger.debug("Invoke the Policy Agent Controller");
83             dmaapResponse = invokeController(dmaapRequestMessage);
84             // Post the Response message to the DMAAP bus
85             logger.debug("DMAAP Response Message to Client- {}", dmaapResponse);
86             if (dmaapResponse.isPresent()) {
87                 restClient.post("A1-POLICY-AGENT-WRITE", dmaapResponse.get()).block(); //
88             }
89         } catch (IOException e) {
90             logger.error("Exception occured during message processing", e);
91         }
92     }
93
94     private Optional<String> invokeController(DmaapRequestMessage dmaapRequestMessage) {
95         String formattedString = "";
96         String ricName;
97         String instance;
98         String jsonBody;
99         logger.debug("Payload from the Message - {}", dmaapRequestMessage.getPayload());
100         try {
101             formattedString = new JSONTokener(dmaapRequestMessage.getPayload()).nextValue().toString();
102             logger.debug("Removed the Escape charater in payload- {}", formattedString);
103         } catch (JSONException e) {
104             logger.error("Exception occurred during formating Payload- {}", dmaapRequestMessage.getPayload());
105         }
106         JSONObject jsonObject = new JSONObject(formattedString);
107         switch (dmaapRequestMessage.getOperation()) {
108             case "getPolicySchemas":
109                 ricName = (String) jsonObject.get("ricName");
110                 logger.debug("Received the request for getPolicySchemas with Ric Name- {}", ricName);
111                 return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicySchemas(ricName));
112             case "getPolicySchema":
113                 String policyTypeId = (String) jsonObject.get("id");
114                 logger.debug("Received the request for getPolicySchema with Policy Type Id- {}", policyTypeId);
115                 return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicySchema(policyTypeId));
116             case "getPolicyTypes":
117                 ricName = (String) jsonObject.get("ricName");
118                 logger.debug("Received the request for getPolicyTypes with Ric Name- {}", ricName);
119                 return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicyTypes(ricName));
120             case "getPolicy":
121                 instance = (String) jsonObject.get("instance");
122                 logger.debug("Received the request for getPolicy with Instance- {}", instance);
123                 return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicy(instance));
124             case "deletePolicy":
125                 instance = (String) jsonObject.get("instance");
126                 logger.debug("Received the request for deletePolicy with Instance- {}", instance);
127                 return getDmaapResponseMessage(dmaapRequestMessage, policyController.deletePolicy(instance).block());
128             case "putPolicy":
129                 String type = (String) jsonObject.get("type");
130                 String putPolicyInstance = (String) jsonObject.get("instance");
131                 String putPolicyRic = (String) jsonObject.get("ric");
132                 String service = (String) jsonObject.get("service");
133                 jsonBody = (String) jsonObject.get("jsonBody");
134                 return getDmaapResponseMessage(dmaapRequestMessage,
135                         policyController.putPolicy(type, putPolicyInstance, putPolicyRic, service, jsonBody).block());
136             case "getPolicies":
137                 String getPolicyType = (String) jsonObject.get("type");
138                 instance = (String) jsonObject.get("instance");
139                 String getPolicyRic = (String) jsonObject.get("ric");
140                 String getPolicyService = (String) jsonObject.get("service");
141                 jsonBody = (String) jsonObject.get("jsonBody");
142                 return getDmaapResponseMessage(dmaapRequestMessage, policyController
143                         .putPolicy(getPolicyType, instance, getPolicyRic, getPolicyService, jsonBody).block());
144             default:
145                 break;
146         }
147         return Optional.empty();
148     }
149
150     private Optional<String> getDmaapResponseMessage(DmaapRequestMessage dmaapRequestMessage,
151             ResponseEntity<?> policySchemas) {
152         DmaapResponseMessage dmaapResponseMessage = DmaapResponseMessage.builder()
153                 .status(policySchemas.getStatusCode().toString()).message(policySchemas.getBody().toString())
154                 .type("response").correlationId(dmaapRequestMessage.getCorrelationId())
155                 .originatorId(dmaapRequestMessage.getOriginatorId()).requestId(dmaapRequestMessage.getRequestId())
156                 .build();
157         try {
158             return Optional.of(mapper.writeValueAsString(dmaapResponseMessage));
159         } catch (JsonProcessingException e) {
160             logger.error("Exception occured during getDmaapResponseMessage", e);
161         }
162         return Optional.empty();
163     }
164
165     // @PostConstruct
166     // The application properties value is always NULL for the first time
167     // Need to fix this
168     public void init() {
169         logger.debug("Reading DMAAP Publisher bus details from Application Config");
170         Properties dmaapPublisherConfig = applicationConfig.getDmaapPublisherConfig();
171         String host = (String) dmaapPublisherConfig.get("ServiceName");
172         topic = dmaapPublisherConfig.getProperty("topic");
173         System.out.println("\"Read the topic ---------->" + topic);
174         logger.debug("Read the topic & Service Name - {} , {}", host, topic);
175         this.restClient = new AsyncRestClient("http://" + host + "/"); // get this value from application config
176
177     }
178 }