From 4272be3ebddb37ed61239d2a93da630366a333c6 Mon Sep 17 00:00:00 2001 From: Lathish Date: Tue, 28 Jan 2020 14:01:19 +0000 Subject: [PATCH] Removed the DMAAP Accepted/Rejected Call Issue-ID: NONRTRIC-107 Change-Id: If5839d5b3c8a874298b7c46d118fdc17b76bc097 Signed-off-by: Lathish --- .../dmaap/DmaapMessageConsumerImpl.java | 12 +-- .../policyagent/dmaap/DmaapMessageHandler.java | 95 +++++++++++----------- ...{DmaapMessage.java => DmaapRequestMessage.java} | 3 +- .../policyagent/model/DmaapResponseMessage.java | 46 +++++++++++ 4 files changed, 103 insertions(+), 53 deletions(-) rename policy-agent/src/main/java/org/oransc/policyagent/model/{DmaapMessage.java => DmaapRequestMessage.java} (96%) create mode 100644 policy-agent/src/main/java/org/oransc/policyagent/model/DmaapResponseMessage.java diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java index 2ae5e5ee..503ddabe 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java @@ -22,7 +22,6 @@ package org.oransc.policyagent.dmaap; import java.io.IOException; import java.util.Properties; -import javax.annotation.PostConstruct; import org.onap.dmaap.mr.client.MRClientFactory; import org.onap.dmaap.mr.client.MRConsumer; import org.onap.dmaap.mr.client.response.MRConsumerResponse; @@ -55,9 +54,9 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer { @Scheduled(fixedRate = 1000 * 10) // , initialDelay=60000) @Override public void run() { - /* - * if (!alive) { init(); } - */ + if (!alive) { + init(); + } if (this.alive) { try { Iterable dmaapMsgs = fetchAllMessages(); @@ -85,7 +84,8 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer { return response.getActualMessages(); } - @PostConstruct + // Properties are not loaded in first atempt. Need to fix this and then uncomment the post construct annotation + // @PostConstruct @Override public void init() { Properties dmaapConsumerProperties = applicationConfig.getDmaapConsumerConfig(); @@ -98,6 +98,8 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer { } try { logger.debug("Creating DMAAP Client"); + System.out.println("dmaapConsumerProperties--->"+dmaapConsumerProperties.getProperty("topic")); + System.out.println("dmaapPublisherProperties--->"+dmaapPublisherProperties.getProperty("topic")); consumer = MRClientFactory.createConsumer(dmaapConsumerProperties); this.alive = true; } catch (IOException e) { diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java index bf9f06c1..713d4834 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java @@ -20,17 +20,19 @@ package org.oransc.policyagent.dmaap; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; +import java.util.Optional; import java.util.Properties; -import org.apache.commons.lang3.StringUtils; import org.json.JSONException; import org.json.JSONObject; import org.json.JSONTokener; import org.oransc.policyagent.clients.AsyncRestClient; import org.oransc.policyagent.configuration.ApplicationConfig; import org.oransc.policyagent.controllers.PolicyController; -import org.oransc.policyagent.model.DmaapMessage; +import org.oransc.policyagent.model.DmaapRequestMessage; +import org.oransc.policyagent.model.DmaapResponseMessage; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; @@ -60,104 +62,104 @@ public class DmaapMessageHandler { @Async("threadPoolTaskExecutor") public void handleDmaapMsg(String msg) { init(); - DmaapMessage dmaapMessage = null; - ResponseEntity response = null; + DmaapRequestMessage dmaapRequestMessage = null; + Optional dmaapResponse = null; // Process the message /** * Sample Request Message from DMAAP { "type": "request", "correlationId": * "c09ac7d1-de62-0016-2000-e63701125557-201", "target": "policy-agent", "timestamp": "2019-05-14T11:44:51.36Z", * "apiVersion": "1.0", "originatorId": "849e6c6b420", "requestId": "23343221", "operation": "getPolicySchemas", - * "payload": "{\"ric\":\"ric1\"}" } + * "payload": "{\"ricName\":\"ric1\"}" } + * * -------------------------------------------------------------------------------------------------------------- - * Sample Response Message to DMAAP { "type": "response", "correlation-id": - * "c09ac7d1-de62-0016-2000-e63701125557-201", "timestamp": "2019-05-14T11:44:51.36Z", "originator-id": - * "849e6c6b420", "request-id": "23343221", "status" : "ACCEPTED", "message" : "" } + * Sample Response Message to DMAAP {type=response, correlationId=c09ac7d1-de62-0016-2000-e63701125557-201, + * timestamp=null, originatorId=849e6c6b420, requestId=23343221, status=200 OK, message=[]} * ------------------------------------------------------------------------------------------------------------- - * Sample Response Message to DMAAP { "type": "response", "correlation-id": - * "c09ac7d1-de62-0016-2000-e63701125557-201", "timestamp": "2019-05-14T11:44:51.36Z", "originator-id": - * "849e6c6b420", "request-id": "23343221", "status" : "SUCCESS" "message" : "" } */ try { - dmaapMessage = mapper.readValue(msg, DmaapMessage.class); - // Post the accepted message to the DMAAP bus - logger.debug("DMAAP Message- {}", dmaapMessage); - logger.debug("Post Accepted Message to Client"); - restClient - .post("A1-POLICY-AGENT-WRITE", buildDmaapResponse(dmaapMessage.getCorrelationId(), - dmaapMessage.getOriginatorId(), dmaapMessage.getRequestId(), "ACCEPTED", StringUtils.EMPTY)) - .block(); // + dmaapRequestMessage = mapper.readValue(msg, DmaapRequestMessage.class); // Call the Controller logger.debug("Invoke the Policy Agent Controller"); - response = invokeController(dmaapMessage); + dmaapResponse = invokeController(dmaapRequestMessage); // Post the Response message to the DMAAP bus - logger.debug("DMAAP Response Message to Client- {}", response); - restClient - .post("A1-POLICY-AGENT-WRITE", buildDmaapResponse(dmaapMessage.getCorrelationId(), - dmaapMessage.getOriginatorId(), dmaapMessage.getRequestId(), "SUCCESS", response.getBody())) - .block(); // + logger.debug("DMAAP Response Message to Client- {}", dmaapResponse); + if (dmaapResponse.isPresent()) { + restClient.post("A1-POLICY-AGENT-WRITE", dmaapResponse.get()).block(); // + } } catch (IOException e) { logger.error("Exception occured during message processing", e); } } - private ResponseEntity invokeController(DmaapMessage dmaapMessage) { + private Optional invokeController(DmaapRequestMessage dmaapRequestMessage) { String formattedString = ""; String ricName; String instance; - logger.debug("Payload from the Message - {}", dmaapMessage.getPayload()); + String jsonBody; + logger.debug("Payload from the Message - {}", dmaapRequestMessage.getPayload()); try { - formattedString = new JSONTokener(dmaapMessage.getPayload()).nextValue().toString(); + formattedString = new JSONTokener(dmaapRequestMessage.getPayload()).nextValue().toString(); logger.debug("Removed the Escape charater in payload- {}", formattedString); } catch (JSONException e) { - logger.error("Exception occurred during formating Payload- {}", dmaapMessage.getPayload()); + logger.error("Exception occurred during formating Payload- {}", dmaapRequestMessage.getPayload()); } JSONObject jsonObject = new JSONObject(formattedString); - switch (dmaapMessage.getOperation()) { + switch (dmaapRequestMessage.getOperation()) { case "getPolicySchemas": ricName = (String) jsonObject.get("ricName"); logger.debug("Received the request for getPolicySchemas with Ric Name- {}", ricName); - return policyController.getPolicySchemas(ricName); + return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicySchemas(ricName)); case "getPolicySchema": String policyTypeId = (String) jsonObject.get("id"); logger.debug("Received the request for getPolicySchema with Policy Type Id- {}", policyTypeId); - System.out.println("policyTypeId" + policyTypeId); - return policyController.getPolicySchema(policyTypeId); + return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicySchema(policyTypeId)); case "getPolicyTypes": ricName = (String) jsonObject.get("ricName"); logger.debug("Received the request for getPolicyTypes with Ric Name- {}", ricName); - return policyController.getPolicyTypes(ricName); + return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicyTypes(ricName)); case "getPolicy": instance = (String) jsonObject.get("instance"); logger.debug("Received the request for getPolicy with Instance- {}", instance); - return policyController.getPolicy(instance); + return getDmaapResponseMessage(dmaapRequestMessage, policyController.getPolicy(instance)); case "deletePolicy": instance = (String) jsonObject.get("instance"); logger.debug("Received the request for deletePolicy with Instance- {}", instance); - return null;// policyController.deletePolicy(deleteInstance); + return getDmaapResponseMessage(dmaapRequestMessage, policyController.deletePolicy(instance).block()); case "putPolicy": String type = (String) jsonObject.get("type"); String putPolicyInstance = (String) jsonObject.get("instance"); String putPolicyRic = (String) jsonObject.get("ric"); String service = (String) jsonObject.get("service"); - String jsonBody = (String) jsonObject.get("jsonBody"); - return null;// policyController.putPolicy(type, putPolicyInstance, putPolicyRic, service, jsonBody); + jsonBody = (String) jsonObject.get("jsonBody"); + return getDmaapResponseMessage(dmaapRequestMessage, + policyController.putPolicy(type, putPolicyInstance, putPolicyRic, service, jsonBody).block()); case "getPolicies": String getPolicyType = (String) jsonObject.get("type"); + instance = (String) jsonObject.get("instance"); String getPolicyRic = (String) jsonObject.get("ric"); String getPolicyService = (String) jsonObject.get("service"); - return policyController.getPolicies(getPolicyType, getPolicyRic, getPolicyService); + jsonBody = (String) jsonObject.get("jsonBody"); + return getDmaapResponseMessage(dmaapRequestMessage, policyController + .putPolicy(getPolicyType, instance, getPolicyRic, getPolicyService, jsonBody).block()); default: break; } - return null; + return Optional.empty(); } - private String buildDmaapResponse(String correlationId, String originatorId, String requestId, String status, - String message) { - System.out.println("buildResponse "); - return new JSONObject().put("type", "response").put(correlationId, correlationId).put("timestamp", "") - .put("originatorId", originatorId).put("requestId", requestId).put("status", status) - .put("message", message).toString(); + private Optional getDmaapResponseMessage(DmaapRequestMessage dmaapRequestMessage, + ResponseEntity policySchemas) { + DmaapResponseMessage dmaapResponseMessage = DmaapResponseMessage.builder() + .status(policySchemas.getStatusCode().toString()).message(policySchemas.getBody().toString()) + .type("response").correlationId(dmaapRequestMessage.getCorrelationId()) + .originatorId(dmaapRequestMessage.getOriginatorId()).requestId(dmaapRequestMessage.getRequestId()) + .build(); + try { + return Optional.of(mapper.writeValueAsString(dmaapResponseMessage)); + } catch (JsonProcessingException e) { + logger.error("Exception occured during getDmaapResponseMessage", e); + } + return Optional.empty(); } // @PostConstruct @@ -168,6 +170,7 @@ public class DmaapMessageHandler { Properties dmaapPublisherConfig = applicationConfig.getDmaapPublisherConfig(); String host = (String) dmaapPublisherConfig.get("ServiceName"); topic = dmaapPublisherConfig.getProperty("topic"); + System.out.println("\"Read the topic ---------->" + topic); logger.debug("Read the topic & Service Name - {} , {}", host, topic); this.restClient = new AsyncRestClient("http://" + host + "/"); // get this value from application config diff --git a/policy-agent/src/main/java/org/oransc/policyagent/model/DmaapMessage.java b/policy-agent/src/main/java/org/oransc/policyagent/model/DmaapRequestMessage.java similarity index 96% rename from policy-agent/src/main/java/org/oransc/policyagent/model/DmaapMessage.java rename to policy-agent/src/main/java/org/oransc/policyagent/model/DmaapRequestMessage.java index e56f4b4a..0648940d 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/model/DmaapMessage.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/model/DmaapRequestMessage.java @@ -27,7 +27,7 @@ import lombok.Setter; @Getter @Setter -public class DmaapMessage { +public class DmaapRequestMessage { @NotNull private String type; @@ -35,7 +35,6 @@ public class DmaapMessage { private String correlationId; @NotNull private String target; - @NotNull private Timestamp timestamp; private String apiVersion; @NotNull diff --git a/policy-agent/src/main/java/org/oransc/policyagent/model/DmaapResponseMessage.java b/policy-agent/src/main/java/org/oransc/policyagent/model/DmaapResponseMessage.java new file mode 100644 index 00000000..ebf5e8c5 --- /dev/null +++ b/policy-agent/src/main/java/org/oransc/policyagent/model/DmaapResponseMessage.java @@ -0,0 +1,46 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2019 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oransc.policyagent.model; + +import java.sql.Timestamp; +import javax.validation.constraints.NotNull; +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; + +@Getter +@Setter +@Builder +public class DmaapResponseMessage { + + @NotNull + private String type; + @NotNull + private String correlationId; + private Timestamp timestamp; + @NotNull + private String originatorId; + private String requestId; + @NotNull + private String status; + @NotNull + private String message; +} -- 2.16.6