X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=policy-agent%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Fdmaap%2FDmaapMessageHandler.java;h=3d5da6297a0e3b12f45ccb6fbfa46eb25f74e36a;hb=d29cf3d0088b86438722092e849d4750995f7a3b;hp=61c1e1e2993675d40e4ed4c3a955ac80b3ed5381;hpb=d14535ab85ced5cf9b1fcd5ca0e5d17ce267b573;p=nonrtric.git
diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java
index 61c1e1e2..3d5da629 100644
--- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java
+++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java
@@ -2,7 +2,7 @@
* ========================LICENSE_START=================================
* O-RAN-SC
* %%
- * Copyright (C) 2019 Nordix Foundation
+ * Copyright (C) 2020 Nordix Foundation
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,155 +20,154 @@
package org.oransc.policyagent.dmaap;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import com.google.gson.JsonObject;
+
import java.io.IOException;
import java.util.Optional;
-import java.util.Properties;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.json.JSONTokener;
+
+import org.onap.dmaap.mr.client.MRBatchingPublisher;
import org.oransc.policyagent.clients.AsyncRestClient;
-import org.oransc.policyagent.configuration.ApplicationConfig;
-import org.oransc.policyagent.controllers.PolicyController;
-import org.oransc.policyagent.model.DmaapRequestMessage;
-import org.oransc.policyagent.model.DmaapResponseMessage;
+import org.oransc.policyagent.dmaap.DmaapRequestMessage.Operation;
+import org.oransc.policyagent.exceptions.ServiceException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
-import org.springframework.scheduling.annotation.Async;
-import org.springframework.stereotype.Component;
+import org.springframework.web.reactive.function.client.WebClientResponseException;
+import reactor.core.publisher.Mono;
-@Component
+/**
+ * The class handles incoming requests from DMAAP.
+ *
+ * That means: invoke a REST call towards this services and to send back a
+ * response though DMAAP
+ */
public class DmaapMessageHandler {
-
private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandler.class);
+ private static Gson gson = new GsonBuilder() //
+ .create(); //
+ private final MRBatchingPublisher dmaapClient;
+ private final AsyncRestClient agentClient;
+
+ public DmaapMessageHandler(MRBatchingPublisher dmaapClient, AsyncRestClient agentClient) {
+ this.agentClient = agentClient;
+ this.dmaapClient = dmaapClient;
+ }
- private boolean initialize = false;
- @Autowired
- private ObjectMapper mapper;
- @Autowired
- private PolicyController policyController;
- private AsyncRestClient restClient;
- @Autowired
- private ApplicationConfig applicationConfig;
- private String topic = "";
-
- // The publish properties is corrupted. It contains the subscribe property values.
- @Async("threadPoolTaskExecutor")
public void handleDmaapMsg(String msg) {
- if (!initialize) {
- init();
- }
- DmaapRequestMessage dmaapRequestMessage = null;
- Optional dmaapResponse = null;
- // Process the message
- /**
- * Sample Request Message from DMAAP { "type": "request", "correlationId":
- * "c09ac7d1-de62-0016-2000-e63701125557-201", "target": "policy-agent", "timestamp": "2019-05-14T11:44:51.36Z",
- * "apiVersion": "1.0", "originatorId": "849e6c6b420", "requestId": "23343221", "operation": "getPolicySchemas",
- * "payload": "{\"ricName\":\"ric1\"}" }
- *
- * -------------------------------------------------------------------------------------------------------------
- * Sample Response Message to DMAAP {type=response, correlationId=c09ac7d1-de62-0016-2000-e63701125557-201,
- * timestamp=null, originatorId=849e6c6b420, requestId=23343221, status=200 OK, message=[]}
- * -------------------------------------------------------------------------------------------------------------
- */
try {
- dmaapRequestMessage = mapper.readValue(msg, DmaapRequestMessage.class);
- // Call the Controller
- logger.debug("Invoke the Policy Agent Controller");
- dmaapResponse = invokeController(dmaapRequestMessage);
- // Post the Response message to the DMAAP bus
- logger.debug("DMAAP Response Message to Client- {}", dmaapResponse);
- if (dmaapResponse.isPresent()) {
- restClient.post("A1-POLICY-AGENT-WRITE", dmaapResponse.get()).block(); //
- }
- } catch (IOException e) {
- logger.error("Exception occured during message processing", e);
+ String result = this.createTask(msg).block();
+ logger.debug("handleDmaapMsg: {}", result);
+ } catch (Exception throwable) {
+ logger.warn("handleDmaapMsg failure {}", throwable.getMessage());
}
}
- private Optional invokeController(DmaapRequestMessage dmaapRequestMessage) {
- String formattedString = "";
- String ricName;
- String instance;
- String jsonBody;
- logger.debug("Payload from the Message - {}", dmaapRequestMessage.getPayload());
+ Mono createTask(String msg) {
try {
- formattedString = new JSONTokener(dmaapRequestMessage.getPayload()).nextValue().toString();
- logger.debug("Removed the Escape charater in payload- {}", formattedString);
- } catch (JSONException e) {
- logger.error("Exception occurred during formating Payload- {}", dmaapRequestMessage.getPayload());
+ DmaapRequestMessage dmaapRequestMessage = gson.fromJson(msg, ImmutableDmaapRequestMessage.class);
+ return this.invokePolicyAgent(dmaapRequestMessage) //
+ .onErrorResume(t -> handleAgentCallError(t, msg, dmaapRequestMessage)) //
+ .flatMap(
+ response -> sendDmaapResponse(response.getBody(), dmaapRequestMessage, response.getStatusCode()));
+ } catch (Exception e) {
+ logger.warn("Received unparsable message from DMAAP: {}", msg);
+ return Mono.error(e); // Cannot make any response
+ }
+ }
+
+ private Mono> handleAgentCallError(Throwable t, String originalMessage,
+ DmaapRequestMessage dmaapRequestMessage) {
+ logger.debug("Agent call failed: {}", t.getMessage());
+ HttpStatus status = HttpStatus.NOT_FOUND;
+ String errorMessage = t.getMessage();
+ if (t instanceof WebClientResponseException) {
+ WebClientResponseException exception = (WebClientResponseException) t;
+ status = exception.getStatusCode();
+ errorMessage = exception.getResponseBodyAsString();
+ } else if (t instanceof ServiceException) {
+ status = HttpStatus.BAD_REQUEST;
+ errorMessage = prepareBadOperationErrorMessage(t, originalMessage);
+
+ }
+ return sendDmaapResponse(errorMessage, dmaapRequestMessage, status) //
+ .flatMap(notUsed -> Mono.empty());
+ }
+
+ private String prepareBadOperationErrorMessage(Throwable t, String originalMessage) {
+ String operationParameterStart = "operation\":\"";
+ int indexOfOperationStart = originalMessage.indexOf(operationParameterStart) + operationParameterStart.length();
+ int indexOfOperationEnd = originalMessage.indexOf("\",\"", indexOfOperationStart);
+ String badOperation = originalMessage.substring(indexOfOperationStart, indexOfOperationEnd);
+ return t.getMessage().replace("null", badOperation);
+ }
+
+ private Mono> invokePolicyAgent(DmaapRequestMessage dmaapRequestMessage) {
+ DmaapRequestMessage.Operation operation = dmaapRequestMessage.operation();
+ String uri = dmaapRequestMessage.url();
+
+ if (operation == Operation.DELETE) {
+ return agentClient.deleteForEntity(uri);
+ } else if (operation == Operation.GET) {
+ return agentClient.getForEntity(uri);
+ } else if (operation == Operation.PUT) {
+ return agentClient.putForEntity(uri, payload(dmaapRequestMessage));
+ } else if (operation == Operation.POST) {
+ return agentClient.postForEntity(uri, payload(dmaapRequestMessage));
+ } else {
+ return Mono.error(new ServiceException("Not implemented operation: " + operation));
}
- JSONObject jsonObject = new JSONObject(formattedString);
- switch (dmaapRequestMessage.getOperation()) {
- case "getPolicySchemas":
- ricName = (String) jsonObject.get("ricName");
- logger.debug("Received the request for getPolicySchemas with Ric Name- {}", ricName);
- return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicySchemas(ricName));
- case "getPolicySchema":
- String policyTypeId = (String) jsonObject.get("id");
- logger.debug("Received the request for getPolicySchema with Policy Type Id- {}", policyTypeId);
- return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicySchema(policyTypeId));
- case "getPolicyTypes":
- ricName = (String) jsonObject.get("ricName");
- logger.debug("Received the request for getPolicyTypes with Ric Name- {}", ricName);
- return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicyTypes(ricName));
- case "getPolicy":
- instance = (String) jsonObject.get("instance");
- logger.debug("Received the request for getPolicy with Instance- {}", instance);
- return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicy(instance));
- case "deletePolicy":
- instance = (String) jsonObject.get("instance");
- logger.debug("Received the request for deletePolicy with Instance- {}", instance);
- return getDmaapResponseMessage(dmaapRequestMessage, policyController.deletePolicy(instance).block());
- case "putPolicy":
- String type = (String) jsonObject.get("type");
- String putPolicyInstance = (String) jsonObject.get("instance");
- String putPolicyRic = (String) jsonObject.get("ric");
- String service = (String) jsonObject.get("service");
- jsonBody = (String) jsonObject.get("jsonBody");
- return getDmaapResponseMessage(dmaapRequestMessage,
- policyController.putPolicy(type, putPolicyInstance, putPolicyRic, service, jsonBody).block());
- case "getPolicies":
- String getPolicyType = (String) jsonObject.get("type");
- instance = (String) jsonObject.get("instance");
- String getPolicyRic = (String) jsonObject.get("ric");
- String getPolicyService = (String) jsonObject.get("service");
- jsonBody = (String) jsonObject.get("jsonBody");
- return getDmaapResponseMessage(dmaapRequestMessage, policyController
- .putPolicy(getPolicyType, instance, getPolicyRic, getPolicyService, jsonBody).block());
- default:
- break;
+
+ }
+
+ private String payload(DmaapRequestMessage message) {
+ Optional payload = message.payload();
+ if (payload.isPresent()) {
+ return gson.toJson(payload.get());
+ } else {
+ logger.warn("Expected payload in message from DMAAP: {}", message);
+ return "";
}
- return Optional.empty();
}
- private Optional getDmaapResponseMessage(DmaapRequestMessage dmaapRequestMessage,
- ResponseEntity> policySchemas) {
- DmaapResponseMessage dmaapResponseMessage = DmaapResponseMessage.builder()
- .status(policySchemas.getStatusCode().toString()).message(policySchemas.getBody().toString())
- .type("response").correlationId(dmaapRequestMessage.getCorrelationId())
- .originatorId(dmaapRequestMessage.getOriginatorId()).requestId(dmaapRequestMessage.getRequestId())
- .build();
+ private Mono sendDmaapResponse(String response, DmaapRequestMessage dmaapRequestMessage,
+ HttpStatus status) {
+ return createDmaapResponseMessage(dmaapRequestMessage, response, status) //
+ .flatMap(this::sendToDmaap) //
+ .onErrorResume(this::handleResponseCallError);
+ }
+
+ private Mono sendToDmaap(String body) {
try {
- return Optional.of(mapper.writeValueAsString(dmaapResponseMessage));
- } catch (JsonProcessingException e) {
- logger.error("Exception occured during getDmaapResponseMessage", e);
+ logger.debug("sendToDmaap: {} ", body);
+ dmaapClient.send(body);
+ dmaapClient.sendBatchWithResponse();
+ return Mono.just("OK");
+ } catch (IOException e) {
+ return Mono.error(e);
}
- return Optional.empty();
}
- public void init() {
- logger.debug("Reading DMAAP Publisher bus details from Application Config");
- Properties dmaapPublisherConfig = applicationConfig.getDmaapPublisherConfig();
- String host = (String) dmaapPublisherConfig.get("ServiceName");
- topic = dmaapPublisherConfig.getProperty("topic");
- logger.debug("Read the topic & Service Name - {} , {}", host, topic);
- this.restClient = new AsyncRestClient("http://" + host + "/"); // get this value from application config
- initialize = true;
+ private Mono handleResponseCallError(Throwable t) {
+ logger.debug("Failed to send response to DMaaP: {}", t.getMessage());
+ return Mono.empty();
+ }
+
+ private Mono createDmaapResponseMessage(DmaapRequestMessage dmaapRequestMessage, String response,
+ HttpStatus status) {
+ DmaapResponseMessage dmaapResponseMessage = ImmutableDmaapResponseMessage.builder() //
+ .status(status.toString()) //
+ .message(response == null ? "" : response) //
+ .type("response") //
+ .correlationId(dmaapRequestMessage.correlationId() == null ? "" : dmaapRequestMessage.correlationId()) //
+ .originatorId(dmaapRequestMessage.originatorId() == null ? "" : dmaapRequestMessage.originatorId()) //
+ .requestId(dmaapRequestMessage.requestId() == null ? "" : dmaapRequestMessage.requestId()) //
+ .timestamp(dmaapRequestMessage.timestamp() == null ? "" : dmaapRequestMessage.timestamp()) //
+ .build();
+ String str = gson.toJson(dmaapResponseMessage);
+ return Mono.just(str);
}
}