* ========================LICENSE_START=================================
* O-RAN-SC
* %%
- * Copyright (C) 2019 Nordix Foundation
+ * Copyright (C) 2020 Nordix Foundation
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
package org.oransc.policyagent.dmaap;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
import java.io.IOException;
-import java.util.Properties;
-import org.apache.commons.lang3.StringUtils;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.json.JSONTokener;
+
+import org.onap.dmaap.mr.client.MRBatchingPublisher;
import org.oransc.policyagent.clients.AsyncRestClient;
-import org.oransc.policyagent.configuration.ApplicationConfig;
-import org.oransc.policyagent.controllers.PolicyController;
-import org.oransc.policyagent.model.DmaapMessage;
+import org.oransc.policyagent.dmaap.DmaapRequestMessage.Operation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.ResponseEntity;
-import org.springframework.scheduling.annotation.Async;
-import org.springframework.stereotype.Component;
+import org.springframework.http.HttpStatus;
+import reactor.core.publisher.Mono;
-@Component
public class DmaapMessageHandler {
private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandler.class);
- @Autowired
- private ObjectMapper mapper;
- @Autowired
- private PolicyController policyController;
- private AsyncRestClient restClient;
- private ApplicationConfig applicationConfig;
- private String topic = "";
-
- @Autowired
- public DmaapMessageHandler(ApplicationConfig applicationConfig) {
- this.applicationConfig = applicationConfig;
+ private static Gson gson = new GsonBuilder() //
+ .create(); //
+
+ private final MRBatchingPublisher dmaapClient;
+ private final AsyncRestClient agentClient;
+
+ public DmaapMessageHandler(MRBatchingPublisher dmaapClient, AsyncRestClient agentClient) {
+ this.agentClient = agentClient;
+ this.dmaapClient = dmaapClient;
}
- // The publish properties is corrupted. It contains the subscribe property values.
- @Async("threadPoolTaskExecutor")
public void handleDmaapMsg(String msg) {
- init();
- DmaapMessage dmaapMessage = null;
- ResponseEntity<String> response = null;
- // Process the message
- /**
- * Sample Request Message from DMAAP { "type": "request", "correlationId":
- * "c09ac7d1-de62-0016-2000-e63701125557-201", "target": "policy-agent", "timestamp": "2019-05-14T11:44:51.36Z",
- * "apiVersion": "1.0", "originatorId": "849e6c6b420", "requestId": "23343221", "operation": "getPolicySchemas",
- * "payload": "{\"ric\":\"ric1\"}" }
- * --------------------------------------------------------------------------------------------------------------
- * Sample Response Message to DMAAP { "type": "response", "correlation-id":
- * "c09ac7d1-de62-0016-2000-e63701125557-201", "timestamp": "2019-05-14T11:44:51.36Z", "originator-id":
- * "849e6c6b420", "request-id": "23343221", "status" : "ACCEPTED", "message" : "" }
- * -------------------------------------------------------------------------------------------------------------
- * Sample Response Message to DMAAP { "type": "response", "correlation-id":
- * "c09ac7d1-de62-0016-2000-e63701125557-201", "timestamp": "2019-05-14T11:44:51.36Z", "originator-id":
- * "849e6c6b420", "request-id": "23343221", "status" : "SUCCESS" "message" : "" }
- */
+ this.createTask(msg) //
+ .subscribe(message -> logger.debug("handleDmaapMsg: {}", message), //
+ throwable -> logger.warn("handleDmaapMsg failure ", throwable), //
+ () -> logger.debug("handleDmaapMsg complete"));
+ }
+
+ Mono<String> createTask(String msg) {
try {
- dmaapMessage = mapper.readValue(msg, DmaapMessage.class);
- // Post the accepted message to the DMAAP bus
- logger.debug("DMAAP Message- {}", dmaapMessage);
- logger.debug("Post Accepted Message to Client");
- restClient
- .post("A1-POLICY-AGENT-WRITE", buildDmaapResponse(dmaapMessage.getCorrelationId(),
- dmaapMessage.getOriginatorId(), dmaapMessage.getRequestId(), "ACCEPTED", StringUtils.EMPTY))
- .block(); //
- // Call the Controller
- logger.debug("Invoke the Policy Agent Controller");
- response = invokeController(dmaapMessage);
- // Post the Response message to the DMAAP bus
- logger.debug("DMAAP Response Message to Client- {}", response);
- restClient
- .post("A1-POLICY-AGENT-WRITE", buildDmaapResponse(dmaapMessage.getCorrelationId(),
- dmaapMessage.getOriginatorId(), dmaapMessage.getRequestId(), "SUCCESS", response.getBody()))
- .block(); //
- } catch (IOException e) {
- logger.error("Exception occured during message processing", e);
+ DmaapRequestMessage dmaapRequestMessage = gson.fromJson(msg, ImmutableDmaapRequestMessage.class);
+
+ return this.invokePolicyAgent(dmaapRequestMessage) //
+ .onErrorResume(t -> handleAgentCallError(t, dmaapRequestMessage)) //
+ .flatMap(response -> sendDmaapResponse(response, dmaapRequestMessage, HttpStatus.OK));
+
+ } catch (Exception e) {
+ logger.warn("Received unparsable message from DMAAP: {}", msg);
+ return Mono.error(e);
}
}
- private ResponseEntity<String> invokeController(DmaapMessage dmaapMessage) {
- String formattedString = "";
- String ricName;
- String instance;
- logger.debug("Payload from the Message - {}", dmaapMessage.getPayload());
- try {
- formattedString = new JSONTokener(dmaapMessage.getPayload()).nextValue().toString();
- logger.debug("Removed the Escape charater in payload- {}", formattedString);
- } catch (JSONException e) {
- logger.error("Exception occurred during formating Payload- {}", dmaapMessage.getPayload());
+ private Mono<String> handleAgentCallError(Throwable t, DmaapRequestMessage dmaapRequestMessage) {
+ logger.debug("Agent call failed: {}", t.getMessage());
+ return sendDmaapResponse(t.toString(), dmaapRequestMessage, HttpStatus.NOT_FOUND) //
+ .flatMap(notUsed -> Mono.empty());
+ }
+
+ private Mono<String> invokePolicyAgent(DmaapRequestMessage dmaapRequestMessage) {
+ DmaapRequestMessage.Operation operation = dmaapRequestMessage.operation();
+ Mono<String> result = null;
+ String uri = dmaapRequestMessage.url();
+ if (operation == Operation.DELETE) {
+ result = agentClient.delete(uri);
+ } else if (operation == Operation.GET) {
+ result = agentClient.get(uri);
+ } else if (operation == Operation.PUT) {
+ result = agentClient.put(uri, payload(dmaapRequestMessage));
+ } else if (operation == Operation.POST) {
+ result = agentClient.post(uri, payload(dmaapRequestMessage));
+ } else {
+ return Mono.error(new Exception("Not implemented operation: " + operation));
}
- JSONObject jsonObject = new JSONObject(formattedString);
- switch (dmaapMessage.getOperation()) {
- case "getPolicySchemas":
- ricName = (String) jsonObject.get("ricName");
- logger.debug("Received the request for getPolicySchemas with Ric Name- {}", ricName);
- return policyController.getPolicySchemas(ricName);
- case "getPolicySchema":
- String policyTypeId = (String) jsonObject.get("id");
- logger.debug("Received the request for getPolicySchema with Policy Type Id- {}", policyTypeId);
- System.out.println("policyTypeId" + policyTypeId);
- return policyController.getPolicySchema(policyTypeId);
- case "getPolicyTypes":
- ricName = (String) jsonObject.get("ricName");
- logger.debug("Received the request for getPolicyTypes with Ric Name- {}", ricName);
- return policyController.getPolicyTypes(ricName);
- case "getPolicy":
- instance = (String) jsonObject.get("instance");
- logger.debug("Received the request for getPolicy with Instance- {}", instance);
- return policyController.getPolicy(instance);
- case "deletePolicy":
- instance = (String) jsonObject.get("instance");
- logger.debug("Received the request for deletePolicy with Instance- {}", instance);
- return null;// policyController.deletePolicy(deleteInstance);
- case "putPolicy":
- String type = (String) jsonObject.get("type");
- String putPolicyInstance = (String) jsonObject.get("instance");
- String putPolicyRic = (String) jsonObject.get("ric");
- String service = (String) jsonObject.get("service");
- String jsonBody = (String) jsonObject.get("jsonBody");
- return null;// policyController.putPolicy(type, putPolicyInstance, putPolicyRic, service, jsonBody);
- case "getPolicies":
- String getPolicyType = (String) jsonObject.get("type");
- String getPolicyRic = (String) jsonObject.get("ric");
- String getPolicyService = (String) jsonObject.get("service");
- return policyController.getPolicies(getPolicyType, getPolicyRic, getPolicyService);
- default:
- break;
+ return result;
+ }
+
+ private String payload(DmaapRequestMessage message) {
+ if (message.payload().isPresent()) {
+ return gson.toJson(message.payload().get());
+ } else {
+ logger.warn("Expected payload in message from DMAAP: {}", message);
+ return "";
}
- return null;
}
- private String buildDmaapResponse(String correlationId, String originatorId, String requestId, String status,
- String message) {
- System.out.println("buildResponse ");
- return new JSONObject().put("type", "response").put(correlationId, correlationId).put("timestamp", "")
- .put("originatorId", originatorId).put("requestId", requestId).put("status", status)
- .put("message", message).toString();
+ private Mono<String> sendDmaapResponse(String response, DmaapRequestMessage dmaapRequestMessage,
+ HttpStatus status) {
+ return getDmaapResponseMessage(dmaapRequestMessage, response, status) //
+ .flatMap(this::sendToDmaap) //
+ .onErrorResume(this::handleResponseCallError);
}
- // @PostConstruct
- // The application properties value is always NULL for the first time
- // Need to fix this
- public void init() {
- logger.debug("Reading DMAAP Publisher bus details from Application Config");
- Properties dmaapPublisherConfig = applicationConfig.getDmaapPublisherConfig();
- String host = (String) dmaapPublisherConfig.get("ServiceName");
- topic = dmaapPublisherConfig.getProperty("topic");
- logger.debug("Read the topic & Service Name - {} , {}", host, topic);
- this.restClient = new AsyncRestClient("http://" + host + "/"); // get this value from application config
+ private Mono<String> sendToDmaap(String body) {
+ try {
+ logger.debug("sendToDmaap: {} ", body);
+ dmaapClient.send(body);
+ dmaapClient.sendBatchWithResponse();
+ return Mono.just("OK");
+ } catch (IOException e) {
+ return Mono.error(e);
+ }
+ }
+
+ private Mono<String> handleResponseCallError(Throwable t) {
+ logger.debug("Failed to respond: {}", t.getMessage());
+ return Mono.empty();
+ }
+
+ private Mono<String> getDmaapResponseMessage(DmaapRequestMessage dmaapRequestMessage, String response,
+ HttpStatus status) {
+ DmaapResponseMessage dmaapResponseMessage = ImmutableDmaapResponseMessage.builder() //
+ .status(status.toString()) //
+ .message(response) //
+ .type("response") //
+ .correlationId(dmaapRequestMessage.correlationId()) //
+ .originatorId(dmaapRequestMessage.originatorId()) //
+ .requestId(dmaapRequestMessage.requestId()) //
+ .timestamp(dmaapRequestMessage.timestamp()) //
+ .build();
+ String str = gson.toJson(dmaapResponseMessage);
+
+ return Mono.just(str);
}
}