From 1f722deb1cd91420eed0add219ea749fec9d9912 Mon Sep 17 00:00:00 2001 From: Lathish Date: Mon, 27 Jan 2020 14:57:22 +0000 Subject: [PATCH 1/1] Process message with Producer logic Issue-ID: NONRTRIC-107 Change-Id: Iba71ee37ad2e19742afb303d19632126b6067e35 Signed-off-by: Lathish --- policy-agent/pom.xml | 5 + .../java/org/oransc/policyagent/BeanFactory.java | 6 + .../configuration/ApplicationConfigParser.java | 10 +- .../configuration/AsyncConfiguration.java | 44 ++++++ .../dmaap/DmaapMessageConsumerImpl.java | 27 ++-- .../policyagent/dmaap/DmaapMessageHandler.java | 175 +++++++++++++++++++++ .../org/oransc/policyagent/model/DmaapMessage.java | 47 ++++++ ...pplication_configuration_with_dmaap_config.json | 4 +- 8 files changed, 297 insertions(+), 21 deletions(-) create mode 100644 policy-agent/src/main/java/org/oransc/policyagent/configuration/AsyncConfiguration.java create mode 100644 policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java create mode 100644 policy-agent/src/main/java/org/oransc/policyagent/model/DmaapMessage.java diff --git a/policy-agent/pom.xml b/policy-agent/pom.xml index 43d3aefa..f2a9411f 100644 --- a/policy-agent/pom.xml +++ b/policy-agent/pom.xml @@ -128,6 +128,11 @@ dmaap-client ${sdk.version} + + org.projectlombok + lombok + provided + io.springfox diff --git a/policy-agent/src/main/java/org/oransc/policyagent/BeanFactory.java b/policy-agent/src/main/java/org/oransc/policyagent/BeanFactory.java index f0826e9a..e05eb95f 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/BeanFactory.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/BeanFactory.java @@ -20,6 +20,7 @@ package org.oransc.policyagent; +import com.fasterxml.jackson.databind.ObjectMapper; import org.oransc.policyagent.clients.A1ClientFactory; import org.oransc.policyagent.configuration.ApplicationConfig; import org.oransc.policyagent.repository.Policies; @@ -61,4 +62,9 @@ class BeanFactory { return new A1ClientFactory(); } + @Bean + public ObjectMapper mapper() { + return new ObjectMapper(); + } + } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java index d32db7a0..34d1d97a 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java @@ -25,16 +25,13 @@ import com.google.gson.GsonBuilder; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; - import java.net.MalformedURLException; import java.net.URL; import java.util.Map.Entry; import java.util.Properties; import java.util.Set; import java.util.Vector; - import javax.validation.constraints.NotNull; - import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; import org.oransc.policyagent.exceptions.ServiceException; import org.springframework.http.MediaType; @@ -125,9 +122,9 @@ public class ApplicationConfigParser { String urlPath = url.getPath(); DmaapUrlPath path = parseDmaapUrlPath(urlPath); - dmaapProps.put("ServiceName", url.getHost()); + dmaapProps.put("ServiceName", url.getHost()+":"+url.getPort()+"/events"); dmaapProps.put("topic", path.dmaapTopicName); - dmaapProps.put("host", url.getHost()); + dmaapProps.put("host", url.getHost()+":"+url.getPort()); dmaapProps.put("contenttype", MediaType.APPLICATION_JSON.toString()); dmaapProps.put("userName", userName); dmaapProps.put("password", passwd); @@ -136,7 +133,6 @@ public class ApplicationConfigParser { dmaapProps.put("TransportType", ProtocolTypeConstants.HTTPNOAUTH.toString()); dmaapProps.put("timeout", 15000); dmaapProps.put("limit", 1000); - dmaapProps.put("port", url.getPort()); } catch (MalformedURLException e) { throw new ServiceException("Could not parse the URL", e); } @@ -166,7 +162,7 @@ public class ApplicationConfigParser { throw new ServiceException("The path has incorrect syntax: " + urlPath); } - final String dmaapTopicName = tokens[1] + "/" + tokens[2]; // /events/A1-P + final String dmaapTopicName = tokens[2]; // /events/A1-P String consumerGroup = ""; // users String consumerId = ""; // sdnc1 if (tokens.length == 5) { diff --git a/policy-agent/src/main/java/org/oransc/policyagent/configuration/AsyncConfiguration.java b/policy-agent/src/main/java/org/oransc/policyagent/configuration/AsyncConfiguration.java new file mode 100644 index 00000000..085320c9 --- /dev/null +++ b/policy-agent/src/main/java/org/oransc/policyagent/configuration/AsyncConfiguration.java @@ -0,0 +1,44 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2019 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oransc.policyagent.configuration; + +import java.util.concurrent.Executor; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.AsyncConfigurer; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +@Configuration +@EnableAsync +public class AsyncConfiguration implements AsyncConfigurer { + + @Override + @Bean(name = "threadPoolTaskExecutor") + public Executor getAsyncExecutor() { + //Set this configuration value from common properties file + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(5); + executor.setMaxPoolSize(10); + executor.setQueueCapacity(25); + return executor; + } +} diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java index 191a13b2..2ae5e5ee 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerImpl.java @@ -22,7 +22,7 @@ package org.oransc.policyagent.dmaap; import java.io.IOException; import java.util.Properties; - +import javax.annotation.PostConstruct; import org.onap.dmaap.mr.client.MRClientFactory; import org.onap.dmaap.mr.client.MRConsumer; import org.onap.dmaap.mr.client.response.MRConsumerResponse; @@ -44,21 +44,24 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer { private final ApplicationConfig applicationConfig; protected MRConsumer consumer; private MRConsumerResponse response = null; + @Autowired + private DmaapMessageHandler dmaapMessageHandler; @Autowired public DmaapMessageConsumerImpl(ApplicationConfig applicationConfig) { this.applicationConfig = applicationConfig; } - @Scheduled(fixedRate = 1000 * 60) + @Scheduled(fixedRate = 1000 * 10) // , initialDelay=60000) @Override public void run() { - if (!alive) { - init(); - } + /* + * if (!alive) { init(); } + */ if (this.alive) { try { Iterable dmaapMsgs = fetchAllMessages(); + logger.debug("Fetched all the messages from DMAAP and will start to process the messages"); for (String msg : dmaapMsgs) { processMsg(msg); } @@ -76,23 +79,25 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer { logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(), response.getResponseMessage()); if (!"200".equals(response.getResponseCode())) { logger.error("DMaaP consumer received: {} : {}", response.getResponseCode(), - response.getResponseMessage()); + response.getResponseMessage()); } } return response.getActualMessages(); } + @PostConstruct @Override public void init() { Properties dmaapConsumerProperties = applicationConfig.getDmaapConsumerConfig(); Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig(); // No need to start if there is no configuration. if (dmaapConsumerProperties == null || dmaapPublisherProperties == null || dmaapConsumerProperties.size() == 0 - || dmaapPublisherProperties.size() == 0) { + || dmaapPublisherProperties.size() == 0) { + logger.error("DMaaP properties Failed to Load"); return; } - // Do we need to do any validation of properties before calling the factory? try { + logger.debug("Creating DMAAP Client"); consumer = MRClientFactory.createConsumer(dmaapConsumerProperties); this.alive = true; } catch (IOException e) { @@ -102,11 +107,9 @@ public class DmaapMessageConsumerImpl implements DmaapMessageConsumer { @Override public void processMsg(String msg) throws Exception { - System.out.println("sysout" + msg); + logger.debug("Message Reveived from DMAAP : {}", msg); // Call the concurrent Task executor to handle the incoming request - // Validate the Input & if its valid, post the ACCEPTED Response back to DMAAP - // through REST CLIENT - // Call the Controller with the extracted payload + dmaapMessageHandler.handleDmaapMsg(msg); } @Override diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java new file mode 100644 index 00000000..bf9f06c1 --- /dev/null +++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java @@ -0,0 +1,175 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2019 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oransc.policyagent.dmaap; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.io.IOException; +import java.util.Properties; +import org.apache.commons.lang3.StringUtils; +import org.json.JSONException; +import org.json.JSONObject; +import org.json.JSONTokener; +import org.oransc.policyagent.clients.AsyncRestClient; +import org.oransc.policyagent.configuration.ApplicationConfig; +import org.oransc.policyagent.controllers.PolicyController; +import org.oransc.policyagent.model.DmaapMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.ResponseEntity; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; + +@Component +public class DmaapMessageHandler { + + private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandler.class); + + @Autowired + private ObjectMapper mapper; + @Autowired + private PolicyController policyController; + private AsyncRestClient restClient; + private ApplicationConfig applicationConfig; + private String topic = ""; + + @Autowired + public DmaapMessageHandler(ApplicationConfig applicationConfig) { + this.applicationConfig = applicationConfig; + } + + // The publish properties is corrupted. It contains the subscribe property values. + @Async("threadPoolTaskExecutor") + public void handleDmaapMsg(String msg) { + init(); + DmaapMessage dmaapMessage = null; + ResponseEntity response = null; + // Process the message + /** + * Sample Request Message from DMAAP { "type": "request", "correlationId": + * "c09ac7d1-de62-0016-2000-e63701125557-201", "target": "policy-agent", "timestamp": "2019-05-14T11:44:51.36Z", + * "apiVersion": "1.0", "originatorId": "849e6c6b420", "requestId": "23343221", "operation": "getPolicySchemas", + * "payload": "{\"ric\":\"ric1\"}" } + * -------------------------------------------------------------------------------------------------------------- + * Sample Response Message to DMAAP { "type": "response", "correlation-id": + * "c09ac7d1-de62-0016-2000-e63701125557-201", "timestamp": "2019-05-14T11:44:51.36Z", "originator-id": + * "849e6c6b420", "request-id": "23343221", "status" : "ACCEPTED", "message" : "" } + * ------------------------------------------------------------------------------------------------------------- + * Sample Response Message to DMAAP { "type": "response", "correlation-id": + * "c09ac7d1-de62-0016-2000-e63701125557-201", "timestamp": "2019-05-14T11:44:51.36Z", "originator-id": + * "849e6c6b420", "request-id": "23343221", "status" : "SUCCESS" "message" : "" } + */ + try { + dmaapMessage = mapper.readValue(msg, DmaapMessage.class); + // Post the accepted message to the DMAAP bus + logger.debug("DMAAP Message- {}", dmaapMessage); + logger.debug("Post Accepted Message to Client"); + restClient + .post("A1-POLICY-AGENT-WRITE", buildDmaapResponse(dmaapMessage.getCorrelationId(), + dmaapMessage.getOriginatorId(), dmaapMessage.getRequestId(), "ACCEPTED", StringUtils.EMPTY)) + .block(); // + // Call the Controller + logger.debug("Invoke the Policy Agent Controller"); + response = invokeController(dmaapMessage); + // Post the Response message to the DMAAP bus + logger.debug("DMAAP Response Message to Client- {}", response); + restClient + .post("A1-POLICY-AGENT-WRITE", buildDmaapResponse(dmaapMessage.getCorrelationId(), + dmaapMessage.getOriginatorId(), dmaapMessage.getRequestId(), "SUCCESS", response.getBody())) + .block(); // + } catch (IOException e) { + logger.error("Exception occured during message processing", e); + } + } + + private ResponseEntity invokeController(DmaapMessage dmaapMessage) { + String formattedString = ""; + String ricName; + String instance; + logger.debug("Payload from the Message - {}", dmaapMessage.getPayload()); + try { + formattedString = new JSONTokener(dmaapMessage.getPayload()).nextValue().toString(); + logger.debug("Removed the Escape charater in payload- {}", formattedString); + } catch (JSONException e) { + logger.error("Exception occurred during formating Payload- {}", dmaapMessage.getPayload()); + } + JSONObject jsonObject = new JSONObject(formattedString); + switch (dmaapMessage.getOperation()) { + case "getPolicySchemas": + ricName = (String) jsonObject.get("ricName"); + logger.debug("Received the request for getPolicySchemas with Ric Name- {}", ricName); + return policyController.getPolicySchemas(ricName); + case "getPolicySchema": + String policyTypeId = (String) jsonObject.get("id"); + logger.debug("Received the request for getPolicySchema with Policy Type Id- {}", policyTypeId); + System.out.println("policyTypeId" + policyTypeId); + return policyController.getPolicySchema(policyTypeId); + case "getPolicyTypes": + ricName = (String) jsonObject.get("ricName"); + logger.debug("Received the request for getPolicyTypes with Ric Name- {}", ricName); + return policyController.getPolicyTypes(ricName); + case "getPolicy": + instance = (String) jsonObject.get("instance"); + logger.debug("Received the request for getPolicy with Instance- {}", instance); + return policyController.getPolicy(instance); + case "deletePolicy": + instance = (String) jsonObject.get("instance"); + logger.debug("Received the request for deletePolicy with Instance- {}", instance); + return null;// policyController.deletePolicy(deleteInstance); + case "putPolicy": + String type = (String) jsonObject.get("type"); + String putPolicyInstance = (String) jsonObject.get("instance"); + String putPolicyRic = (String) jsonObject.get("ric"); + String service = (String) jsonObject.get("service"); + String jsonBody = (String) jsonObject.get("jsonBody"); + return null;// policyController.putPolicy(type, putPolicyInstance, putPolicyRic, service, jsonBody); + case "getPolicies": + String getPolicyType = (String) jsonObject.get("type"); + String getPolicyRic = (String) jsonObject.get("ric"); + String getPolicyService = (String) jsonObject.get("service"); + return policyController.getPolicies(getPolicyType, getPolicyRic, getPolicyService); + default: + break; + } + return null; + } + + private String buildDmaapResponse(String correlationId, String originatorId, String requestId, String status, + String message) { + System.out.println("buildResponse "); + return new JSONObject().put("type", "response").put(correlationId, correlationId).put("timestamp", "") + .put("originatorId", originatorId).put("requestId", requestId).put("status", status) + .put("message", message).toString(); + } + + // @PostConstruct + // The application properties value is always NULL for the first time + // Need to fix this + public void init() { + logger.debug("Reading DMAAP Publisher bus details from Application Config"); + Properties dmaapPublisherConfig = applicationConfig.getDmaapPublisherConfig(); + String host = (String) dmaapPublisherConfig.get("ServiceName"); + topic = dmaapPublisherConfig.getProperty("topic"); + logger.debug("Read the topic & Service Name - {} , {}", host, topic); + this.restClient = new AsyncRestClient("http://" + host + "/"); // get this value from application config + + } +} diff --git a/policy-agent/src/main/java/org/oransc/policyagent/model/DmaapMessage.java b/policy-agent/src/main/java/org/oransc/policyagent/model/DmaapMessage.java new file mode 100644 index 00000000..e56f4b4a --- /dev/null +++ b/policy-agent/src/main/java/org/oransc/policyagent/model/DmaapMessage.java @@ -0,0 +1,47 @@ +/*- + * ========================LICENSE_START================================= + * O-RAN-SC + * %% + * Copyright (C) 2019 Nordix Foundation + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * ========================LICENSE_END=================================== + */ + +package org.oransc.policyagent.model; + +import java.sql.Timestamp; +import javax.validation.constraints.NotNull; +import lombok.Getter; +import lombok.Setter; + +@Getter +@Setter +public class DmaapMessage { + + @NotNull + private String type; + @NotNull + private String correlationId; + @NotNull + private String target; + @NotNull + private Timestamp timestamp; + private String apiVersion; + @NotNull + private String originatorId; + private String requestId; + @NotNull + private String operation; + private String payload; +} diff --git a/policy-agent/src/test/resources/test_application_configuration_with_dmaap_config.json b/policy-agent/src/test/resources/test_application_configuration_with_dmaap_config.json index d4a39728..c9453603 100644 --- a/policy-agent/src/test/resources/test_application_configuration_with_dmaap_config.json +++ b/policy-agent/src/test/resources/test_application_configuration_with_dmaap_config.json @@ -24,14 +24,14 @@ "dmaap_publisher": { "type": "message_router", "dmaap_info": { - "topic_url": "https://dradmin:dradmin@localhost:2222/events/A1-P-RESULT" + "topic_url": "http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-WRITE" } } }, "streams_subscribes": { "dmaap_subscriber": { "dmaap_info": { - "topic_url": "http://dradmin:dradmin@localhost:2222/events/A1-P/users/sdnc1" + "topic_url": "http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-READ/users/policy-agent" }, "type": "message_router" } -- 2.16.6