X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=policy-agent%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Fdmaap%2FDmaapMessageHandler.java;h=fc6e439a3884e8d31778f82fee98bdb6f4883b14;hb=2350dd8586d0084d30b953fd9cdd8e14fec3856f;hp=61c1e1e2993675d40e4ed4c3a955ac80b3ed5381;hpb=d14535ab85ced5cf9b1fcd5ca0e5d17ce267b573;p=nonrtric.git diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java index 61c1e1e2..fc6e439a 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java @@ -2,7 +2,7 @@ * ========================LICENSE_START================================= * O-RAN-SC * %% - * Copyright (C) 2019 Nordix Foundation + * Copyright (C) 2020 Nordix Foundation * %% * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,158 +17,154 @@ * limitations under the License. * ========================LICENSE_END=================================== */ - package org.oransc.policyagent.dmaap; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonObject; + import java.io.IOException; import java.util.Optional; -import java.util.Properties; -import org.json.JSONException; -import org.json.JSONObject; -import org.json.JSONTokener; + +import org.onap.dmaap.mr.client.MRBatchingPublisher; import org.oransc.policyagent.clients.AsyncRestClient; -import org.oransc.policyagent.configuration.ApplicationConfig; -import org.oransc.policyagent.controllers.PolicyController; -import org.oransc.policyagent.model.DmaapRequestMessage; -import org.oransc.policyagent.model.DmaapResponseMessage; +import org.oransc.policyagent.dmaap.DmaapRequestMessage.Operation; +import org.oransc.policyagent.exceptions.ServiceException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; -import org.springframework.scheduling.annotation.Async; -import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.client.WebClientResponseException; +import reactor.core.publisher.Mono; -@Component +/** + * The class handles incoming requests from DMAAP. + *

+ * That means: invoke a REST call towards this services and to send back a + * response though DMAAP + */ public class DmaapMessageHandler { - private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandler.class); + private static Gson gson = new GsonBuilder() // + .create(); // + private final MRBatchingPublisher dmaapClient; + private final AsyncRestClient agentClient; + + public DmaapMessageHandler(MRBatchingPublisher dmaapClient, AsyncRestClient agentClient) { + this.agentClient = agentClient; + this.dmaapClient = dmaapClient; + } - private boolean initialize = false; - @Autowired - private ObjectMapper mapper; - @Autowired - private PolicyController policyController; - private AsyncRestClient restClient; - @Autowired - private ApplicationConfig applicationConfig; - private String topic = ""; - - // The publish properties is corrupted. It contains the subscribe property values. - @Async("threadPoolTaskExecutor") public void handleDmaapMsg(String msg) { - if (!initialize) { - init(); - } - DmaapRequestMessage dmaapRequestMessage = null; - Optional dmaapResponse = null; - // Process the message - /** - * Sample Request Message from DMAAP { "type": "request", "correlationId": - * "c09ac7d1-de62-0016-2000-e63701125557-201", "target": "policy-agent", "timestamp": "2019-05-14T11:44:51.36Z", - * "apiVersion": "1.0", "originatorId": "849e6c6b420", "requestId": "23343221", "operation": "getPolicySchemas", - * "payload": "{\"ricName\":\"ric1\"}" } - * - * ------------------------------------------------------------------------------------------------------------- - * Sample Response Message to DMAAP {type=response, correlationId=c09ac7d1-de62-0016-2000-e63701125557-201, - * timestamp=null, originatorId=849e6c6b420, requestId=23343221, status=200 OK, message=[]} - * ------------------------------------------------------------------------------------------------------------- - */ + this.createTask(msg) // + .subscribe(message -> logger.debug("handleDmaapMsg: {}", message), // + throwable -> logger.warn("handleDmaapMsg failure ", throwable), // + () -> logger.debug("handleDmaapMsg complete")); + } + + Mono createTask(String msg) { try { - dmaapRequestMessage = mapper.readValue(msg, DmaapRequestMessage.class); - // Call the Controller - logger.debug("Invoke the Policy Agent Controller"); - dmaapResponse = invokeController(dmaapRequestMessage); - // Post the Response message to the DMAAP bus - logger.debug("DMAAP Response Message to Client- {}", dmaapResponse); - if (dmaapResponse.isPresent()) { - restClient.post("A1-POLICY-AGENT-WRITE", dmaapResponse.get()).block(); // - } - } catch (IOException e) { - logger.error("Exception occured during message processing", e); + DmaapRequestMessage dmaapRequestMessage = gson.fromJson(msg, ImmutableDmaapRequestMessage.class); + return this.invokePolicyAgent(dmaapRequestMessage) // + .onErrorResume(t -> handleAgentCallError(t, msg, dmaapRequestMessage)) // + .flatMap( + response -> sendDmaapResponse(response.getBody(), dmaapRequestMessage, response.getStatusCode())); + } catch (Exception e) { + logger.warn("Received unparsable message from DMAAP: {}", msg); + return Mono.error(e); // Cannot make any response } } - private Optional invokeController(DmaapRequestMessage dmaapRequestMessage) { - String formattedString = ""; - String ricName; - String instance; - String jsonBody; - logger.debug("Payload from the Message - {}", dmaapRequestMessage.getPayload()); - try { - formattedString = new JSONTokener(dmaapRequestMessage.getPayload()).nextValue().toString(); - logger.debug("Removed the Escape charater in payload- {}", formattedString); - } catch (JSONException e) { - logger.error("Exception occurred during formating Payload- {}", dmaapRequestMessage.getPayload()); + private Mono> handleAgentCallError(Throwable t, String originalMessage, + DmaapRequestMessage dmaapRequestMessage) { + logger.debug("Agent call failed: {}", t.getMessage()); + HttpStatus status = HttpStatus.NOT_FOUND; + String errorMessage = t.getMessage(); + if (t instanceof WebClientResponseException) { + WebClientResponseException exception = (WebClientResponseException) t; + status = exception.getStatusCode(); + errorMessage = exception.getResponseBodyAsString(); + } else if (t instanceof ServiceException) { + status = HttpStatus.BAD_REQUEST; + errorMessage = prepareBadOperationErrorMessage(t, originalMessage); + } - JSONObject jsonObject = new JSONObject(formattedString); - switch (dmaapRequestMessage.getOperation()) { - case "getPolicySchemas": - ricName = (String) jsonObject.get("ricName"); - logger.debug("Received the request for getPolicySchemas with Ric Name- {}", ricName); - return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicySchemas(ricName)); - case "getPolicySchema": - String policyTypeId = (String) jsonObject.get("id"); - logger.debug("Received the request for getPolicySchema with Policy Type Id- {}", policyTypeId); - return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicySchema(policyTypeId)); - case "getPolicyTypes": - ricName = (String) jsonObject.get("ricName"); - logger.debug("Received the request for getPolicyTypes with Ric Name- {}", ricName); - return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicyTypes(ricName)); - case "getPolicy": - instance = (String) jsonObject.get("instance"); - logger.debug("Received the request for getPolicy with Instance- {}", instance); - return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicy(instance)); - case "deletePolicy": - instance = (String) jsonObject.get("instance"); - logger.debug("Received the request for deletePolicy with Instance- {}", instance); - return getDmaapResponseMessage(dmaapRequestMessage, policyController.deletePolicy(instance).block()); - case "putPolicy": - String type = (String) jsonObject.get("type"); - String putPolicyInstance = (String) jsonObject.get("instance"); - String putPolicyRic = (String) jsonObject.get("ric"); - String service = (String) jsonObject.get("service"); - jsonBody = (String) jsonObject.get("jsonBody"); - return getDmaapResponseMessage(dmaapRequestMessage, - policyController.putPolicy(type, putPolicyInstance, putPolicyRic, service, jsonBody).block()); - case "getPolicies": - String getPolicyType = (String) jsonObject.get("type"); - instance = (String) jsonObject.get("instance"); - String getPolicyRic = (String) jsonObject.get("ric"); - String getPolicyService = (String) jsonObject.get("service"); - jsonBody = (String) jsonObject.get("jsonBody"); - return getDmaapResponseMessage(dmaapRequestMessage, policyController - .putPolicy(getPolicyType, instance, getPolicyRic, getPolicyService, jsonBody).block()); - default: - break; + return sendDmaapResponse(errorMessage, dmaapRequestMessage, status) // + .flatMap(notUsed -> Mono.empty()); + } + + private String prepareBadOperationErrorMessage(Throwable t, String originalMessage) { + String operationParameterStart = "operation\":\""; + int indexOfOperationStart = originalMessage.indexOf(operationParameterStart) + operationParameterStart.length(); + int indexOfOperationEnd = originalMessage.indexOf("\",\"", indexOfOperationStart); + String badOperation = originalMessage.substring(indexOfOperationStart, indexOfOperationEnd); + return t.getMessage().replace("null", badOperation); + } + + private Mono> invokePolicyAgent(DmaapRequestMessage dmaapRequestMessage) { + DmaapRequestMessage.Operation operation = dmaapRequestMessage.operation(); + String uri = dmaapRequestMessage.url(); + + if (operation == Operation.DELETE) { + return agentClient.deleteForEntity(uri); + } else if (operation == Operation.GET) { + return agentClient.getForEntity(uri); + } else if (operation == Operation.PUT) { + return agentClient.putForEntity(uri, payload(dmaapRequestMessage)); + } else if (operation == Operation.POST) { + return agentClient.postForEntity(uri, payload(dmaapRequestMessage)); + } else { + return Mono.error(new ServiceException("Not implemented operation: " + operation)); } - return Optional.empty(); + } - private Optional getDmaapResponseMessage(DmaapRequestMessage dmaapRequestMessage, - ResponseEntity policySchemas) { - DmaapResponseMessage dmaapResponseMessage = DmaapResponseMessage.builder() - .status(policySchemas.getStatusCode().toString()).message(policySchemas.getBody().toString()) - .type("response").correlationId(dmaapRequestMessage.getCorrelationId()) - .originatorId(dmaapRequestMessage.getOriginatorId()).requestId(dmaapRequestMessage.getRequestId()) - .build(); + private String payload(DmaapRequestMessage message) { + Optional payload = message.payload(); + if (payload.isPresent()) { + return gson.toJson(payload.get()); + } else { + logger.warn("Expected payload in message from DMAAP: {}", message); + return ""; + } + } + + private Mono sendDmaapResponse(String response, DmaapRequestMessage dmaapRequestMessage, + HttpStatus status) { + return createDmaapResponseMessage(dmaapRequestMessage, response, status) // + .flatMap(this::sendToDmaap) // + .onErrorResume(this::handleResponseCallError); + } + + private Mono sendToDmaap(String body) { try { - return Optional.of(mapper.writeValueAsString(dmaapResponseMessage)); - } catch (JsonProcessingException e) { - logger.error("Exception occured during getDmaapResponseMessage", e); + logger.debug("sendToDmaap: {} ", body); + dmaapClient.send(body); + dmaapClient.sendBatchWithResponse(); + return Mono.just("OK"); + } catch (IOException e) { + return Mono.error(e); } - return Optional.empty(); } - public void init() { - logger.debug("Reading DMAAP Publisher bus details from Application Config"); - Properties dmaapPublisherConfig = applicationConfig.getDmaapPublisherConfig(); - String host = (String) dmaapPublisherConfig.get("ServiceName"); - topic = dmaapPublisherConfig.getProperty("topic"); - logger.debug("Read the topic & Service Name - {} , {}", host, topic); - this.restClient = new AsyncRestClient("http://" + host + "/"); // get this value from application config - initialize = true; + private Mono handleResponseCallError(Throwable t) { + logger.debug("Failed to send response to DMaaP: {}", t.getMessage()); + return Mono.empty(); + } + + private Mono createDmaapResponseMessage(DmaapRequestMessage dmaapRequestMessage, String response, + HttpStatus status) { + DmaapResponseMessage dmaapResponseMessage = ImmutableDmaapResponseMessage.builder() // + .status(status.toString()) // + .message(response == null ? "" : response) // + .type("response") // + .correlationId(dmaapRequestMessage.correlationId() == null ? "" : dmaapRequestMessage.correlationId()) // + .originatorId(dmaapRequestMessage.originatorId() == null ? "" : dmaapRequestMessage.originatorId()) // + .requestId(dmaapRequestMessage.requestId() == null ? "" : dmaapRequestMessage.requestId()) // + .timestamp(dmaapRequestMessage.timestamp() == null ? "" : dmaapRequestMessage.timestamp()) // + .build(); + String str = gson.toJson(dmaapResponseMessage); + return Mono.just(str); } }