* ========================LICENSE_START=================================
* O-RAN-SC
* %%
- * Copyright (C) 2019 Nordix Foundation
+ * Copyright (C) 2020 Nordix Foundation
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
package org.oransc.policyagent.dmaap;
-import com.fasterxml.jackson.core.JsonProcessingException;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+
import java.io.IOException;
-import java.util.Optional;
-import java.util.Properties;
-import org.json.JSONException;
-import org.json.JSONObject;
-import org.json.JSONTokener;
+
+import org.onap.dmaap.mr.client.MRBatchingPublisher;
import org.oransc.policyagent.clients.AsyncRestClient;
-import org.oransc.policyagent.configuration.ApplicationConfig;
-import org.oransc.policyagent.controllers.PolicyController;
-import org.oransc.policyagent.model.DmaapRequestMessage;
-import org.oransc.policyagent.model.DmaapResponseMessage;
+import org.oransc.policyagent.dmaap.DmaapRequestMessage.Operation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.http.ResponseEntity;
-import org.springframework.scheduling.annotation.Async;
-import org.springframework.stereotype.Component;
+import org.springframework.http.HttpStatus;
+import reactor.core.publisher.Mono;
-@Component
public class DmaapMessageHandler {
private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandler.class);
- private boolean initialize = false;
- @Autowired
- private ObjectMapper mapper;
- @Autowired
- private PolicyController policyController;
- private AsyncRestClient restClient;
- @Autowired
- private ApplicationConfig applicationConfig;
- private String topic = "";
-
- // The publish properties is corrupted. It contains the subscribe property values.
- @Async("threadPoolTaskExecutor")
+ private static Gson gson = new GsonBuilder() //
+ .create(); //
+
+ private final MRBatchingPublisher dmaapClient;
+ private final AsyncRestClient agentClient;
+
+ public DmaapMessageHandler(MRBatchingPublisher dmaapClient, AsyncRestClient agentClient) {
+ this.agentClient = agentClient;
+ this.dmaapClient = dmaapClient;
+ }
+
public void handleDmaapMsg(String msg) {
- if (!initialize) {
- init();
- }
- DmaapRequestMessage dmaapRequestMessage = null;
- Optional<String> dmaapResponse = null;
- // Process the message
- /**
- * Sample Request Message from DMAAP { "type": "request", "correlationId":
- * "c09ac7d1-de62-0016-2000-e63701125557-201", "target": "policy-agent", "timestamp": "2019-05-14T11:44:51.36Z",
- * "apiVersion": "1.0", "originatorId": "849e6c6b420", "requestId": "23343221", "operation": "getPolicySchemas",
- * "payload": "{\"ricName\":\"ric1\"}" }
- *
- * -------------------------------------------------------------------------------------------------------------
- * Sample Response Message to DMAAP {type=response, correlationId=c09ac7d1-de62-0016-2000-e63701125557-201,
- * timestamp=null, originatorId=849e6c6b420, requestId=23343221, status=200 OK, message=[]}
- * -------------------------------------------------------------------------------------------------------------
- */
+ this.createTask(msg) //
+ .subscribe(message -> logger.debug("handleDmaapMsg: {}", message), //
+ throwable -> logger.warn("handleDmaapMsg failure ", throwable), //
+ () -> logger.debug("handleDmaapMsg complete"));
+ }
+
+ Mono<String> createTask(String msg) {
try {
- dmaapRequestMessage = mapper.readValue(msg, DmaapRequestMessage.class);
- // Call the Controller
- logger.debug("Invoke the Policy Agent Controller");
- dmaapResponse = invokeController(dmaapRequestMessage);
- // Post the Response message to the DMAAP bus
- logger.debug("DMAAP Response Message to Client- {}", dmaapResponse);
- if (dmaapResponse.isPresent()) {
- restClient.post("A1-POLICY-AGENT-WRITE", dmaapResponse.get()).block(); //
- }
- } catch (IOException e) {
- logger.error("Exception occured during message processing", e);
+ DmaapRequestMessage dmaapRequestMessage = gson.fromJson(msg, ImmutableDmaapRequestMessage.class);
+
+ return this.invokePolicyAgent(dmaapRequestMessage) //
+ .onErrorResume(t -> handleAgentCallError(t, dmaapRequestMessage)) //
+ .flatMap(response -> sendDmaapResponse(response, dmaapRequestMessage, HttpStatus.OK));
+
+ } catch (Exception e) {
+ logger.warn("Received unparsable message from DMAAP: {}", msg);
+ return Mono.error(e);
}
}
- private Optional<String> invokeController(DmaapRequestMessage dmaapRequestMessage) {
- String formattedString = "";
- String ricName;
- String instance;
- String jsonBody;
- logger.debug("Payload from the Message - {}", dmaapRequestMessage.getPayload());
- try {
- formattedString = new JSONTokener(dmaapRequestMessage.getPayload()).nextValue().toString();
- logger.debug("Removed the Escape charater in payload- {}", formattedString);
- } catch (JSONException e) {
- logger.error("Exception occurred during formating Payload- {}", dmaapRequestMessage.getPayload());
+ private Mono<String> handleAgentCallError(Throwable t, DmaapRequestMessage dmaapRequestMessage) {
+ logger.debug("Agent call failed: {}", t.getMessage());
+ return sendDmaapResponse(t.toString(), dmaapRequestMessage, HttpStatus.NOT_FOUND) //
+ .flatMap(notUsed -> Mono.empty());
+ }
+
+ private Mono<String> invokePolicyAgent(DmaapRequestMessage dmaapRequestMessage) {
+ DmaapRequestMessage.Operation operation = dmaapRequestMessage.operation();
+ Mono<String> result = null;
+ String uri = dmaapRequestMessage.url();
+ if (operation == Operation.DELETE) {
+ result = agentClient.delete(uri);
+ } else if (operation == Operation.GET) {
+ result = agentClient.get(uri);
+ } else if (operation == Operation.PUT) {
+ result = agentClient.put(uri, payload(dmaapRequestMessage));
+ } else if (operation == Operation.POST) {
+ result = agentClient.post(uri, payload(dmaapRequestMessage));
+ } else {
+ return Mono.error(new Exception("Not implemented operation: " + operation));
}
- JSONObject jsonObject = new JSONObject(formattedString);
- switch (dmaapRequestMessage.getOperation()) {
- case "getPolicySchemas":
- ricName = (String) jsonObject.get("ricName");
- logger.debug("Received the request for getPolicySchemas with Ric Name- {}", ricName);
- return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicySchemas(ricName));
- case "getPolicySchema":
- String policyTypeId = (String) jsonObject.get("id");
- logger.debug("Received the request for getPolicySchema with Policy Type Id- {}", policyTypeId);
- return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicySchema(policyTypeId));
- case "getPolicyTypes":
- ricName = (String) jsonObject.get("ricName");
- logger.debug("Received the request for getPolicyTypes with Ric Name- {}", ricName);
- return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicyTypes(ricName));
- case "getPolicy":
- instance = (String) jsonObject.get("instance");
- logger.debug("Received the request for getPolicy with Instance- {}", instance);
- return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicy(instance));
- case "deletePolicy":
- instance = (String) jsonObject.get("instance");
- logger.debug("Received the request for deletePolicy with Instance- {}", instance);
- return getDmaapResponseMessage(dmaapRequestMessage, policyController.deletePolicy(instance).block());
- case "putPolicy":
- String type = (String) jsonObject.get("type");
- String putPolicyInstance = (String) jsonObject.get("instance");
- String putPolicyRic = (String) jsonObject.get("ric");
- String service = (String) jsonObject.get("service");
- jsonBody = (String) jsonObject.get("jsonBody");
- return getDmaapResponseMessage(dmaapRequestMessage,
- policyController.putPolicy(type, putPolicyInstance, putPolicyRic, service, jsonBody).block());
- case "getPolicies":
- String getPolicyType = (String) jsonObject.get("type");
- instance = (String) jsonObject.get("instance");
- String getPolicyRic = (String) jsonObject.get("ric");
- String getPolicyService = (String) jsonObject.get("service");
- jsonBody = (String) jsonObject.get("jsonBody");
- return getDmaapResponseMessage(dmaapRequestMessage, policyController
- .putPolicy(getPolicyType, instance, getPolicyRic, getPolicyService, jsonBody).block());
- default:
- break;
+ return result;
+ }
+
+ private String payload(DmaapRequestMessage message) {
+ if (message.payload().isPresent()) {
+ return gson.toJson(message.payload().get());
+ } else {
+ logger.warn("Expected payload in message from DMAAP: {}", message);
+ return "";
}
- return Optional.empty();
}
- private Optional<String> getDmaapResponseMessage(DmaapRequestMessage dmaapRequestMessage,
- ResponseEntity<?> policySchemas) {
- DmaapResponseMessage dmaapResponseMessage = DmaapResponseMessage.builder()
- .status(policySchemas.getStatusCode().toString()).message(policySchemas.getBody().toString())
- .type("response").correlationId(dmaapRequestMessage.getCorrelationId())
- .originatorId(dmaapRequestMessage.getOriginatorId()).requestId(dmaapRequestMessage.getRequestId())
- .build();
+ private Mono<String> sendDmaapResponse(String response, DmaapRequestMessage dmaapRequestMessage,
+ HttpStatus status) {
+ return getDmaapResponseMessage(dmaapRequestMessage, response, status) //
+ .flatMap(this::sendToDmaap) //
+ .onErrorResume(this::handleResponseCallError);
+ }
+
+ private Mono<String> sendToDmaap(String body) {
try {
- return Optional.of(mapper.writeValueAsString(dmaapResponseMessage));
- } catch (JsonProcessingException e) {
- logger.error("Exception occured during getDmaapResponseMessage", e);
+ logger.debug("sendToDmaap: {} ", body);
+ dmaapClient.send(body);
+ dmaapClient.sendBatchWithResponse();
+ return Mono.just("OK");
+ } catch (IOException e) {
+ return Mono.error(e);
}
- return Optional.empty();
}
- public void init() {
- logger.debug("Reading DMAAP Publisher bus details from Application Config");
- Properties dmaapPublisherConfig = applicationConfig.getDmaapPublisherConfig();
- String host = (String) dmaapPublisherConfig.get("ServiceName");
- topic = dmaapPublisherConfig.getProperty("topic");
- logger.debug("Read the topic & Service Name - {} , {}", host, topic);
- this.restClient = new AsyncRestClient("http://" + host + "/"); // get this value from application config
- initialize = true;
+ private Mono<String> handleResponseCallError(Throwable t) {
+ logger.debug("Failed to respond: {}", t.getMessage());
+ return Mono.empty();
+ }
+
+ private Mono<String> getDmaapResponseMessage(DmaapRequestMessage dmaapRequestMessage, String response,
+ HttpStatus status) {
+ DmaapResponseMessage dmaapResponseMessage = ImmutableDmaapResponseMessage.builder() //
+ .status(status.toString()) //
+ .message(response) //
+ .type("response") //
+ .correlationId(dmaapRequestMessage.correlationId()) //
+ .originatorId(dmaapRequestMessage.originatorId()) //
+ .requestId(dmaapRequestMessage.requestId()) //
+ .timestamp(dmaapRequestMessage.timestamp()) //
+ .build();
+ String str = gson.toJson(dmaapResponseMessage);
+
+ return Mono.just(str);
}
}