From: PatrikBuhr Date: Tue, 16 Jun 2020 09:19:01 +0000 (+0200) Subject: Remove using of DMAAP client from ONAP X-Git-Tag: 2.0.0~1^2 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=4e7db50d7fb3fd2c7101520f00f0f0b4baf9bddc;hp=824033853e6b3b52f9019283e85f21006596b0fb;p=nonrtric.git Remove using of DMAAP client from ONAP The DMAAP client did not work with https, which is a fundametal need. Instead, the AsynchRestClient is used. Change-Id: I347ddd5a140e854da858ad87386227b1da8c437a Issue-ID: NONRTRIC-195 Signed-off-by: PatrikBuhr --- diff --git a/policy-agent/dpo/blueprints/k8s-policy-agent.yaml b/policy-agent/dpo/blueprints/k8s-policy-agent.yaml index 03473859..aa09ff36 100644 --- a/policy-agent/dpo/blueprints/k8s-policy-agent.yaml +++ b/policy-agent/dpo/blueprints/k8s-policy-agent.yaml @@ -67,7 +67,7 @@ node_templates: streams_subscribes: dmaap_subscriber: dmaap_info: - topic_url: { concat: ['https://message-router:3905/events/',{ get_input: subscribe_topic_name }, '/', { get_input: consumer_group }, "/", { get_input: consumer_id }] } + topic_url: { concat: ['https://message-router:3905/events/',{ get_input: subscribe_topic_name }, '/', { get_input: consumer_group }, "/", { get_input: consumer_id }, "?timeout=15000&limit=100"] } type: message_router ric: - name: ric1 diff --git a/policy-agent/pom.xml b/policy-agent/pom.xml index 76f3c33a..3725af6e 100644 --- a/policy-agent/pom.xml +++ b/policy-agent/pom.xml @@ -77,8 +77,8 @@ spring-boot-starter-webflux - org.springframework.boot - spring-boot-starter-aop + org.springframework.boot + spring-boot-starter-aop org.springframework.boot @@ -134,11 +134,6 @@ cbs-client ${sdk.version} - - org.onap.dcaegen2.services.sdk.rest.services - dmaap-client - ${sdk.version} - org.projectlombok lombok @@ -158,13 +153,11 @@ org.glassfish.jersey.inject jersey-hk2 - org.springframework.boot spring-boot-starter-actuator - io.springfox @@ -176,7 +169,6 @@ springfox-swagger-ui ${springfox.version} - org.springframework.boot diff --git a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java index 6c2c91b2..a2533733 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfig.java @@ -24,7 +24,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Map; -import java.util.Properties; import javax.validation.constraints.NotEmpty; @@ -54,10 +53,12 @@ public class ApplicationConfig { private String sslTrustStore = ""; private Map ricConfigs = new HashMap<>(); + @Getter - private Properties dmaapPublisherConfig; + private String dmaapConsumerTopicUrl; + @Getter - private Properties dmaapConsumerConfig; + private String dmaapProducerTopicUrl; private Map controllerConfigs = new HashMap<>(); @@ -109,10 +110,11 @@ public class ApplicationConfig { ApplicationConfigParser.ConfigParserResult parserResult) { Collection modifications = new ArrayList<>(); - this.dmaapPublisherConfig = parserResult.dmaapPublisherConfig(); - this.dmaapConsumerConfig = parserResult.dmaapConsumerConfig(); this.controllerConfigs = parserResult.controllerConfigs(); + this.dmaapConsumerTopicUrl = parserResult.dmaapConsumerTopicUrl(); + this.dmaapProducerTopicUrl = parserResult.dmaapProducerTopicUrl(); + Map newRicConfigs = new HashMap<>(); for (RicConfig newConfig : parserResult.ricConfigs()) { RicConfig oldConfig = this.ricConfigs.get(newConfig.name()); diff --git a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java index a76f964c..14e836be 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/configuration/ApplicationConfigParser.java @@ -24,8 +24,6 @@ import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; -import java.net.MalformedURLException; -import java.net.URL; import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; @@ -33,16 +31,13 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Map.Entry; -import java.util.Properties; import java.util.Set; import javax.validation.constraints.NotNull; import org.immutables.gson.Gson; import org.immutables.value.Value; -import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; import org.oransc.policyagent.exceptions.ServiceException; -import org.springframework.http.MediaType; /** * Parser for the Json representing of the component configuration. @@ -57,17 +52,18 @@ public class ApplicationConfigParser { public interface ConfigParserResult { List ricConfigs(); - Properties dmaapPublisherConfig(); + Map controllerConfigs(); - Properties dmaapConsumerConfig(); + String dmaapConsumerTopicUrl(); + + String dmaapProducerTopicUrl(); - Map controllerConfigs(); } public ConfigParserResult parse(JsonObject root) throws ServiceException { - Properties dmaapPublisherConfig = new Properties(); - Properties dmaapConsumerConfig = new Properties(); + String dmaapProducerTopicUrl = ""; + String dmaapConsumerTopicUrl = ""; JsonObject agentConfigJson = root.getAsJsonObject(CONFIG); @@ -77,12 +73,12 @@ public class ApplicationConfigParser { JsonObject json = agentConfigJson.getAsJsonObject("streams_publishes"); if (json != null) { - dmaapPublisherConfig = parseDmaapConfig(json); + dmaapProducerTopicUrl = parseDmaapConfig(json); } json = agentConfigJson.getAsJsonObject("streams_subscribes"); if (json != null) { - dmaapConsumerConfig = parseDmaapConfig(json); + dmaapConsumerTopicUrl = parseDmaapConfig(json); } List ricConfigs = parseRics(agentConfigJson); @@ -90,8 +86,8 @@ public class ApplicationConfigParser { checkConfigurationConsistency(ricConfigs, controllerConfigs); return ImmutableConfigParserResult.builder() // - .dmaapConsumerConfig(dmaapConsumerConfig) // - .dmaapPublisherConfig(dmaapPublisherConfig) // + .dmaapConsumerTopicUrl(dmaapConsumerTopicUrl) // + .dmaapProducerTopicUrl(dmaapProducerTopicUrl) // .ricConfigs(ricConfigs) // .controllerConfigs(controllerConfigs) // .build(); @@ -114,7 +110,6 @@ public class ApplicationConfigParser { } } - } private List parseRics(JsonObject config) throws ServiceException { @@ -177,7 +172,7 @@ public class ApplicationConfigParser { return get(obj, memberName).getAsJsonArray(); } - private Properties parseDmaapConfig(JsonObject streamCfg) throws ServiceException { + private String parseDmaapConfig(JsonObject streamCfg) throws ServiceException { Set> streamConfigEntries = streamCfg.entrySet(); if (streamConfigEntries.size() != 1) { throw new ServiceException( @@ -185,71 +180,10 @@ public class ApplicationConfigParser { } JsonObject streamConfigEntry = streamConfigEntries.iterator().next().getValue().getAsJsonObject(); JsonObject dmaapInfo = get(streamConfigEntry, "dmaap_info").getAsJsonObject(); - String topicUrl = getAsString(dmaapInfo, "topic_url"); - - try { - Properties dmaapProps = new Properties(); - URL url = new URL(topicUrl); - String passwd = ""; - String userName = ""; - if (url.getUserInfo() != null) { - String[] userInfo = url.getUserInfo().split(":"); - userName = userInfo[0]; - passwd = userInfo[1]; - } - String urlPath = url.getPath(); - DmaapUrlPath path = parseDmaapUrlPath(urlPath); - - dmaapProps.put("ServiceName", url.getHost() + ":" + url.getPort() + "/events"); - dmaapProps.put("topic", path.dmaapTopicName); - dmaapProps.put("host", url.getHost() + ":" + url.getPort()); - dmaapProps.put("contenttype", MediaType.APPLICATION_JSON.toString()); - dmaapProps.put("userName", userName); - dmaapProps.put("password", passwd); - dmaapProps.put("group", path.consumerGroup); - dmaapProps.put("id", path.consumerId); - dmaapProps.put("TransportType", ProtocolTypeConstants.HTTPNOAUTH.toString()); - dmaapProps.put("timeout", "15000"); - dmaapProps.put("limit", "100"); - dmaapProps.put("maxBatchSize", "10"); - dmaapProps.put("maxAgeMs", "10000"); - dmaapProps.put("compress", true); - dmaapProps.put("MessageSentThreadOccurance", "2"); - return dmaapProps; - } catch (MalformedURLException e) { - throw new ServiceException("Could not parse the URL", e); - } + return getAsString(dmaapInfo, "topic_url"); } private static @NotNull String getAsString(JsonObject obj, String memberName) throws ServiceException { return get(obj, memberName).getAsString(); } - - private class DmaapUrlPath { - final String dmaapTopicName; - final String consumerGroup; - final String consumerId; - - DmaapUrlPath(String dmaapTopicName, String consumerGroup, String consumerId) { - this.dmaapTopicName = dmaapTopicName; - this.consumerGroup = consumerGroup; - this.consumerId = consumerId; - } - } - - private DmaapUrlPath parseDmaapUrlPath(String urlPath) throws ServiceException { - String[] tokens = urlPath.split("/"); // /events/A1-P/users/sdnc1 - if (!(tokens.length == 3 ^ tokens.length == 5)) { - throw new ServiceException("The path has incorrect syntax: " + urlPath); - } - - final String dmaapTopicName = tokens[2]; // /events/A1-P - String consumerGroup = ""; // users - String consumerId = ""; // sdnc1 - if (tokens.length == 5) { - consumerGroup = tokens[3]; - consumerId = tokens[4]; - } - return new DmaapUrlPath(dmaapTopicName, consumerGroup, consumerId); - } } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java index 011b9779..e141babd 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageConsumer.java @@ -21,15 +21,15 @@ package org.oransc.policyagent.dmaap; import com.google.common.collect.Iterables; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonParser; import java.io.IOException; import java.time.Duration; -import java.util.Properties; +import java.util.ArrayList; +import java.util.List; -import org.onap.dmaap.mr.client.MRBatchingPublisher; -import org.onap.dmaap.mr.client.MRClientFactory; -import org.onap.dmaap.mr.client.MRConsumer; -import org.onap.dmaap.mr.client.response.MRConsumerResponse; import org.oransc.policyagent.clients.AsyncRestClient; import org.oransc.policyagent.configuration.ApplicationConfig; import org.oransc.policyagent.exceptions.ServiceException; @@ -37,6 +37,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; +import org.springframework.http.ResponseEntity; import org.springframework.stereotype.Component; /** @@ -62,7 +63,6 @@ public class DmaapMessageConsumer { private final ApplicationConfig applicationConfig; private DmaapMessageHandler dmaapMessageHandler = null; - private MRConsumer messageRouterConsumer = null; @Value("${server.http-port}") private int localServerHttpPort; @@ -99,7 +99,7 @@ public class DmaapMessageConsumer { sleep(TIME_BETWEEN_DMAAP_RETRIES); // wait for configuration } } catch (Exception e) { - logger.warn("Cannot fetch because of {}", e.getMessage()); + logger.warn("{}", e.getMessage()); sleep(TIME_BETWEEN_DMAAP_RETRIES); } } @@ -110,25 +110,35 @@ public class DmaapMessageConsumer { } protected boolean isDmaapConfigured() { - Properties consumerCfg = applicationConfig.getDmaapConsumerConfig(); - Properties producerCfg = applicationConfig.getDmaapPublisherConfig(); - return (consumerCfg != null && consumerCfg.size() > 0 && producerCfg != null && producerCfg.size() > 0); + String producerTopicUrl = applicationConfig.getDmaapProducerTopicUrl(); + String consumerTopicUrl = applicationConfig.getDmaapConsumerTopicUrl(); + return (!producerTopicUrl.isEmpty() && !consumerTopicUrl.isEmpty()); } - protected Iterable fetchAllMessages() throws ServiceException, IOException { - Properties dmaapConsumerProperties = this.applicationConfig.getDmaapConsumerConfig(); - MRConsumer consumer = getMessageRouterConsumer(dmaapConsumerProperties); - MRConsumerResponse response = consumer.fetchWithReturnConsumerResponse(); - if (response == null || !"200".equals(response.getResponseCode())) { - String errorMessage = "DMaaP NULL response received"; - if (response != null) { - errorMessage = "Error respons " + response.getResponseCode() + " " + response.getResponseMessage() - + " from DMaaP."; + private static List parseMessages(String jsonString) { + JsonArray arrayOfMessages = JsonParser.parseString(jsonString).getAsJsonArray(); + List result = new ArrayList<>(); + for (JsonElement element : arrayOfMessages) { + if (element.isJsonPrimitive()) { + result.add(element.getAsString()); + } else { + String messageAsString = element.toString(); + result.add(messageAsString); } - throw new ServiceException(errorMessage); + } + return result; + } + + protected Iterable fetchAllMessages() throws ServiceException, IOException { + String topicUrl = this.applicationConfig.getDmaapConsumerTopicUrl(); + AsyncRestClient consumer = getMessageRouterConsumer(); + ResponseEntity response = consumer.getForEntity(topicUrl).block(); + logger.debug("DMaaP consumer received {} : {}", response.getStatusCode(), response.getBody()); + if (response.getStatusCode().is2xxSuccessful()) { + return parseMessages(response.getBody()); } else { - logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(), response.getResponseMessage()); - return response.getActualMessages(); + throw new ServiceException("Cannot fetch because of Error respons: " + response.getStatusCode().toString() + + " " + response.getBody()); } } @@ -141,8 +151,8 @@ public class DmaapMessageConsumer { if (this.dmaapMessageHandler == null) { String agentBaseUrl = "http://localhost:" + this.localServerHttpPort; AsyncRestClient agentClient = new AsyncRestClient(agentBaseUrl); - Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig(); - MRBatchingPublisher producer = MRClientFactory.createBatchingPublisher(dmaapPublisherProperties); + AsyncRestClient producer = new AsyncRestClient(this.applicationConfig.getDmaapProducerTopicUrl(), + this.applicationConfig.getWebClientConfig()); this.dmaapMessageHandler = new DmaapMessageHandler(producer, agentClient); } return this.dmaapMessageHandler; @@ -156,11 +166,8 @@ public class DmaapMessageConsumer { } } - protected MRConsumer getMessageRouterConsumer(Properties dmaapConsumerProperties) throws IOException { - if (this.messageRouterConsumer == null) { - this.messageRouterConsumer = MRClientFactory.createConsumer(dmaapConsumerProperties); - } - return this.messageRouterConsumer; + protected AsyncRestClient getMessageRouterConsumer() { + return new AsyncRestClient("", this.applicationConfig.getWebClientConfig()); } } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java index 19d15648..efdccd87 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/dmaap/DmaapMessageHandler.java @@ -24,10 +24,8 @@ import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.JsonObject; -import java.io.IOException; import java.util.Optional; -import org.onap.dmaap.mr.client.MRBatchingPublisher; import org.oransc.policyagent.clients.AsyncRestClient; import org.oransc.policyagent.dmaap.DmaapRequestMessage.Operation; import org.oransc.policyagent.exceptions.ServiceException; @@ -49,10 +47,10 @@ public class DmaapMessageHandler { private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandler.class); private static Gson gson = new GsonBuilder() // .create(); // - private final MRBatchingPublisher dmaapClient; + private final AsyncRestClient dmaapClient; private final AsyncRestClient agentClient; - public DmaapMessageHandler(MRBatchingPublisher dmaapClient, AsyncRestClient agentClient) { + public DmaapMessageHandler(AsyncRestClient dmaapClient, AsyncRestClient agentClient) { this.agentClient = agentClient; this.dmaapClient = dmaapClient; } @@ -99,11 +97,7 @@ public class DmaapMessageHandler { } private String prepareBadOperationErrorMessage(Throwable t, String originalMessage) { - String operationParameterStart = "operation\":\""; - int indexOfOperationStart = originalMessage.indexOf(operationParameterStart) + operationParameterStart.length(); - int indexOfOperationEnd = originalMessage.indexOf("\",\"", indexOfOperationStart); - String badOperation = originalMessage.substring(indexOfOperationStart, indexOfOperationEnd); - return t.getMessage().replace("null", badOperation); + return t.getMessage(); } private Mono> invokePolicyAgent(DmaapRequestMessage dmaapRequestMessage) { @@ -141,14 +135,8 @@ public class DmaapMessageHandler { } private Mono sendToDmaap(String body) { - try { - logger.debug("sendToDmaap: {} ", body); - dmaapClient.send(body); - dmaapClient.sendBatchWithResponse(); - return Mono.just("OK"); - } catch (IOException e) { - return Mono.error(e); - } + logger.debug("sendToDmaap: {} ", body); + return dmaapClient.post("", "[" + body + "]"); } private Mono handleResponseCallError(Throwable t) { diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java index 5d782075..83c64e89 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java @@ -134,7 +134,7 @@ public class RicSynchronizationTask { private void notifyAllServices(String body) { for (Service service : services.getAll()) { String url = service.getCallbackUrl(); - if (service.getCallbackUrl().length() > 0) { + if (url.length() > 0) { createNotificationClient(url) // .put("", body) // .subscribe( // diff --git a/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigParserTest.java b/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigParserTest.java index 6b106c93..5a6b0232 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigParserTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigParserTest.java @@ -20,7 +20,6 @@ package org.oransc.policyagent.configuration; -import static org.junit.jupiter.api.Assertions.assertAll; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -36,16 +35,12 @@ import java.io.ByteArrayInputStream; import java.io.IOException; import java.io.InputStream; import java.io.InputStreamReader; -import java.net.MalformedURLException; import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.Map; -import java.util.Properties; import org.junit.jupiter.api.Test; -import org.onap.dmaap.mr.test.clients.ProtocolTypeConstants; import org.oransc.policyagent.exceptions.ServiceException; -import org.springframework.http.MediaType; class ApplicationConfigParserTest { @@ -57,35 +52,13 @@ class ApplicationConfigParserTest { ApplicationConfigParser.ConfigParserResult result = parserUnderTest.parse(jsonRootObject); - Properties actualPublisherConfig = result.dmaapPublisherConfig(); - assertAll("publisherConfig", - () -> assertEquals("localhost:6845/events", actualPublisherConfig.get("ServiceName"), "Wrong ServiceName"), - () -> assertEquals("A1-POLICY-AGENT-WRITE", actualPublisherConfig.get("topic"), "Wrong topic"), - () -> assertEquals("localhost:6845", actualPublisherConfig.get("host"), "Wrong host"), - () -> assertEquals(MediaType.APPLICATION_JSON.toString(), actualPublisherConfig.get("contenttype"), - "Wrong contenttype"), - () -> assertEquals("admin", actualPublisherConfig.get("userName"), "Wrong userName"), - () -> assertEquals("admin", actualPublisherConfig.get("password"), "Wrong password"), - () -> assertEquals(ProtocolTypeConstants.HTTPNOAUTH.toString(), actualPublisherConfig.get("TransportType"), - "Wrong TransportType"), - () -> assertEquals("15000", actualPublisherConfig.get("timeout"), "Wrong timeout"), - () -> assertEquals("100", actualPublisherConfig.get("limit"), "Wrong limit")); - - Properties actualConsumerConfig = result.dmaapConsumerConfig(); - assertAll("consumerConfig", - () -> assertEquals("localhost:6845/events", actualConsumerConfig.get("ServiceName"), "Wrong ServiceName"), - () -> assertEquals("A1-POLICY-AGENT-READ", actualConsumerConfig.get("topic"), "Wrong topic"), - () -> assertEquals("localhost:6845", actualConsumerConfig.get("host"), "Wrong host"), - () -> assertEquals(MediaType.APPLICATION_JSON.toString(), actualConsumerConfig.get("contenttype"), - "Wrong contenttype"), - () -> assertEquals("admin", actualConsumerConfig.get("userName"), "Wrong userName"), - () -> assertEquals("admin", actualConsumerConfig.get("password"), "Wrong password"), - () -> assertEquals("users", actualConsumerConfig.get("group"), "Wrong group"), - () -> assertEquals("policy-agent", actualConsumerConfig.get("id"), "Wrong id"), - () -> assertEquals(ProtocolTypeConstants.HTTPNOAUTH.toString(), actualConsumerConfig.get("TransportType"), - "Wrong TransportType"), - () -> assertEquals("15000", actualConsumerConfig.get("timeout"), "Wrong timeout"), - () -> assertEquals("100", actualConsumerConfig.get("limit"), "Wrong limit")); + String topicUrl = result.dmaapProducerTopicUrl(); + assertEquals("http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-WRITE", topicUrl, "controller contents"); + + topicUrl = result.dmaapConsumerTopicUrl(); + assertEquals( + "http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-READ/users/policy-agent?timeout=15000&limit=100", + topicUrl, "controller contents"); Map controllers = result.controllerConfigs(); assertEquals(1, controllers.size(), "size"); @@ -166,38 +139,6 @@ class ApplicationConfigParserTest { } } - @Test - void whenMalformedUrlStreamsSubscribing() throws Exception { - JsonObject jsonRootObject = getJsonRootObject(); - final String wrongTopicUrl = "WrongTopicUrl"; - JsonObject json = getDmaapInfo(jsonRootObject, "streams_subscribes", "dmaap_subscriber"); - json.addProperty("topic_url", wrongTopicUrl); - final String expectedMessage = "Could not parse the URL"; - - Exception actualException = assertThrows(ServiceException.class, () -> parserUnderTest.parse(jsonRootObject)); - - assertEquals(expectedMessage, actualException.getMessage().replace("\"", ""), - "Wrong error message when the streams subscribes' URL is malformed"); - assertEquals(MalformedURLException.class, actualException.getCause().getClass(), - "The exception is not a MalformedURLException"); - } - - @Test - void whenMalformedUrlStreamsPublishing() throws Exception { - JsonObject jsonRootObject = getJsonRootObject(); - final String wrongTopicUrl = "WrongTopicUrl"; - JsonObject json = getDmaapInfo(jsonRootObject, "streams_publishes", "dmaap_publisher"); - json.addProperty("topic_url", wrongTopicUrl); - final String expectedMessage = "Could not parse the URL"; - - Exception actualException = assertThrows(ServiceException.class, () -> parserUnderTest.parse(jsonRootObject)); - - assertEquals(expectedMessage, actualException.getMessage().replace("\"", ""), - "Wrong error message when the streams publishes' URL is malformed"); - assertEquals(MalformedURLException.class, actualException.getCause().getClass(), - "The exception is not a MalformedURLException"); - } - @Test void whenWrongMemberNameInObject() throws Exception { JsonObject jsonRootObject = getJsonRootObject(); @@ -210,38 +151,6 @@ class ApplicationConfigParserTest { assertEquals(message, actualException.getMessage(), "Wrong error message when wrong member name in object"); } - @Test - void whenWrongUrlPathStreamsSubscribing() throws Exception { - JsonObject jsonRootObject = getJsonRootObject(); - final String wrongTopicUrlString = - "http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-READ/users/policy-agent/wrong-topic-url"; - final URL wrongTopicUrl = new URL(wrongTopicUrlString); - JsonObject json = getDmaapInfo(jsonRootObject, "streams_subscribes", "dmaap_subscriber"); - json.addProperty("topic_url", wrongTopicUrlString); - final String expectedMessage = "The path has incorrect syntax: " + wrongTopicUrl.getPath(); - - Exception actualException = assertThrows(ServiceException.class, () -> parserUnderTest.parse(jsonRootObject)); - - assertEquals(expectedMessage, actualException.getMessage(), - "Wrong error message when the streams subscribes' URL has incorrect syntax"); - } - - @Test - void whenWrongUrlPathStreamsPublishing() throws Exception { - JsonObject jsonRootObject = getJsonRootObject(); - final String wrongTopicUrlString = - "http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-WRITE/wrong-topic-url"; - final URL wrongTopicUrl = new URL(wrongTopicUrlString); - JsonObject json = getDmaapInfo(jsonRootObject, "streams_publishes", "dmaap_publisher"); - json.addProperty("topic_url", wrongTopicUrlString); - final String expectedMessage = "The path has incorrect syntax: " + wrongTopicUrl.getPath(); - - Exception actualException = assertThrows(ServiceException.class, () -> parserUnderTest.parse(jsonRootObject)); - - assertEquals(expectedMessage, actualException.getMessage(), - "Wrong error message when the streams publishes' URL has incorrect syntax"); - } - JsonObject getDmaapInfo(JsonObject jsonRootObject, String streamsPublishesOrSubscribes, String dmaapPublisherOrSubscriber) throws Exception { return jsonRootObject.getAsJsonObject("config").getAsJsonObject(streamsPublishesOrSubscribes) diff --git a/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigTest.java b/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigTest.java index 6a6f4470..5667fd25 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/configuration/ApplicationConfigTest.java @@ -27,7 +27,6 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.util.Arrays; import java.util.HashMap; -import java.util.Properties; import java.util.Vector; import org.junit.jupiter.api.Test; @@ -50,8 +49,8 @@ class ApplicationConfigTest { ConfigParserResult configParserResult(RicConfig... rics) { return ImmutableConfigParserResult.builder() // .ricConfigs(Arrays.asList(rics)) // - .dmaapConsumerConfig(new Properties()) // - .dmaapPublisherConfig(new Properties()) // + .dmaapConsumerTopicUrl("dmaapConsumerTopicUrl") // + .dmaapProducerTopicUrl("dmaapProducerTopicUrl") // .controllerConfigs(new HashMap<>()) // .build(); } diff --git a/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java b/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java index 6e786568..a78fde03 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageConsumerTest.java @@ -35,30 +35,28 @@ import ch.qos.logback.classic.spi.ILoggingEvent; import ch.qos.logback.core.read.ListAppender; import java.time.Duration; -import java.util.Arrays; -import java.util.Collections; import java.util.LinkedList; -import java.util.List; -import java.util.Properties; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; -import org.onap.dmaap.mr.client.MRConsumer; -import org.onap.dmaap.mr.client.response.MRConsumerResponse; +import org.oransc.policyagent.clients.AsyncRestClient; import org.oransc.policyagent.configuration.ApplicationConfig; import org.oransc.policyagent.utils.LoggingUtils; import org.springframework.http.HttpStatus; +import org.springframework.http.ResponseEntity; +import reactor.core.publisher.Mono; @ExtendWith(MockitoExtension.class) class DmaapMessageConsumerTest { @Mock private ApplicationConfig applicationConfigMock; @Mock - private MRConsumer messageRouterConsumerMock; + private AsyncRestClient messageRouterConsumerMock; @Mock private DmaapMessageHandler messageHandlerMock; @@ -107,18 +105,15 @@ class DmaapMessageConsumerTest { messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); - MRConsumerResponse response = new MRConsumerResponse(); - response.setResponseCode(Integer.toString(HttpStatus.OK.value())); - response.setActualMessages(Collections.emptyList()); + Mono> response = Mono.empty(); doReturn(false, true).when(messageConsumerUnderTest).isStopped(); - doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest) - .getMessageRouterConsumer(any(Properties.class)); - when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response); + doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer(); + doReturn(response).when(messageRouterConsumerMock).getForEntity(any()); messageConsumerUnderTest.start().join(); - verify(messageRouterConsumerMock).fetchWithReturnConsumerResponse(); + verify(messageRouterConsumerMock).getForEntity(any()); verifyNoMoreInteractions(messageRouterConsumerMock); } @@ -130,56 +125,73 @@ class DmaapMessageConsumerTest { doNothing().when(messageConsumerUnderTest).sleep(any(Duration.class)); doReturn(false, true).when(messageConsumerUnderTest).isStopped(); - doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest) - .getMessageRouterConsumer(any(Properties.class)); + doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer(); - MRConsumerResponse response = new MRConsumerResponse(); - int responseCode = HttpStatus.BAD_REQUEST.value(); - response.setResponseCode(Integer.toString(responseCode)); - String responseMessage = "Error"; - response.setResponseMessage(responseMessage); - when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response); + Mono> response = Mono.just(new ResponseEntity<>("Error", HttpStatus.BAD_REQUEST)); + when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response); final ListAppender logAppender = LoggingUtils.getLogListAppender(DmaapMessageConsumer.class, WARN); messageConsumerUnderTest.start().join(); - assertThat(logAppender.list.get(0).getFormattedMessage()).isEqualTo( - "Cannot fetch because of Error respons " + responseCode + " " + responseMessage + " from DMaaP."); + assertThat(logAppender.list.get(0).getFormattedMessage()) + .isEqualTo("Cannot fetch because of Error respons: 400 BAD_REQUEST Error"); verify(messageConsumerUnderTest).sleep(DmaapMessageConsumer.TIME_BETWEEN_DMAAP_RETRIES); } @Test void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage() throws Exception { + // The message from MR is here an array of Json objects + setUpMrConfig(); + messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); + + String message = + "{\"apiVersion\":\"1.0\",\"operation\":\"GET\",\"correlationId\":\"1592341013115594000\",\"originatorId\":\"849e6c6b420\",\"payload\":{},\"requestId\":\"23343221\", \"target\":\"policy-agent\",\"timestamp\":\"2020-06-16 20:56:53.115665\",\"type\":\"request\",\"url\":\"/rics\"}"; + String messages = "[" + message + "]"; + + doReturn(false, true).when(messageConsumerUnderTest).isStopped(); + doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer(); + + Mono> response = Mono.just(new ResponseEntity<>(messages, HttpStatus.OK)); + when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response); + + doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler(); + + messageConsumerUnderTest.start().join(); + + ArgumentCaptor captor = ArgumentCaptor.forClass(String.class); + verify(messageHandlerMock).handleDmaapMsg(captor.capture()); + String messageAfterJsonParsing = captor.getValue(); + assertThat(messageAfterJsonParsing.contains("apiVersion")).isTrue(); + + verifyNoMoreInteractions(messageHandlerMock); + } + + @Test + void dmaapConfiguredAndOneMessage_thenPollOnceAndProcessMessage2() throws Exception { + // The message from MR is here an array of String (which is the case when the MR + // simulator is used) setUpMrConfig(); messageConsumerUnderTest = spy(new DmaapMessageConsumer(applicationConfigMock)); doReturn(false, true).when(messageConsumerUnderTest).isStopped(); - doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest) - .getMessageRouterConsumer(any(Properties.class)); + doReturn(messageRouterConsumerMock).when(messageConsumerUnderTest).getMessageRouterConsumer(); - MRConsumerResponse response = new MRConsumerResponse(); - response.setResponseCode(Integer.toString(HttpStatus.OK.value())); - String responseMessage = "message"; - List messages = Arrays.asList(responseMessage); - response.setActualMessages(messages); - when(messageRouterConsumerMock.fetchWithReturnConsumerResponse()).thenReturn(response); + Mono> response = Mono.just(new ResponseEntity<>("[\"aMessage\"]", HttpStatus.OK)); + when(messageRouterConsumerMock.getForEntity(any())).thenReturn(response); doReturn(messageHandlerMock).when(messageConsumerUnderTest).getDmaapMessageHandler(); messageConsumerUnderTest.start().join(); - verify(messageHandlerMock).handleDmaapMsg(responseMessage); + verify(messageHandlerMock).handleDmaapMsg("aMessage"); verifyNoMoreInteractions(messageHandlerMock); } - private Properties setUpMrConfig() { - Properties properties = new Properties(); - properties.put("key", "value"); - when(applicationConfigMock.getDmaapConsumerConfig()).thenReturn(properties); - when(applicationConfigMock.getDmaapPublisherConfig()).thenReturn(properties); - return properties; + private void setUpMrConfig() { + when(applicationConfigMock.getDmaapConsumerTopicUrl()).thenReturn("url"); + when(applicationConfigMock.getDmaapProducerTopicUrl()).thenReturn("url"); } } diff --git a/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageHandlerTest.java b/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageHandlerTest.java index 3cbe28bc..7deef6e2 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageHandlerTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageHandlerTest.java @@ -25,7 +25,6 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -46,8 +45,6 @@ import java.util.Optional; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; -import org.onap.dmaap.mr.client.MRBatchingPublisher; -import org.onap.dmaap.mr.client.response.MRPublisherResponse; import org.oransc.policyagent.clients.AsyncRestClient; import org.oransc.policyagent.dmaap.DmaapRequestMessage.Operation; import org.oransc.policyagent.repository.ImmutablePolicyType; @@ -57,7 +54,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; -import org.springframework.web.reactive.function.client.WebClientResponseException; import reactor.core.publisher.Mono; import reactor.test.StepVerifier; @@ -66,7 +62,7 @@ class DmaapMessageHandlerTest { private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandlerTest.class); private static final String URL = "url"; - private final MRBatchingPublisher dmaapClient = mock(MRBatchingPublisher.class); + private final AsyncRestClient dmaapClient = mock(AsyncRestClient.class); private final AsyncRestClient agentClient = mock(AsyncRestClient.class); private DmaapMessageHandler testedObject; private static Gson gson = new GsonBuilder() // @@ -112,6 +108,11 @@ class DmaapMessageHandlerTest { return Mono.just(entity); } + private Mono> notOkResponse() { + ResponseEntity entity = new ResponseEntity<>("NOK", HttpStatus.BAD_GATEWAY); + return Mono.just(entity); + } + @Test void testMessageParsing() { String message = dmaapInputMessage(Operation.DELETE); @@ -143,8 +144,7 @@ class DmaapMessageHandlerTest { @Test void successfulDelete() throws IOException { doReturn(okResponse()).when(agentClient).deleteForEntity(anyString()); - doReturn(1).when(dmaapClient).send(anyString()); - doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse(); + doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString()); String message = dmaapInputMessage(Operation.DELETE); @@ -157,16 +157,15 @@ class DmaapMessageHandlerTest { verify(agentClient).deleteForEntity(URL); verifyNoMoreInteractions(agentClient); - verify(dmaapClient).send(anyString()); - verify(dmaapClient).sendBatchWithResponse(); + verify(dmaapClient).post(anyString(), anyString()); + verifyNoMoreInteractions(dmaapClient); } @Test void successfulGet() throws IOException { doReturn(okResponse()).when(agentClient).getForEntity(anyString()); - doReturn(1).when(dmaapClient).send(anyString()); - doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse(); + doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString()); StepVerifier // .create(testedObject.createTask(dmaapInputMessage(Operation.GET))) // @@ -177,16 +176,14 @@ class DmaapMessageHandlerTest { verify(agentClient).getForEntity(URL); verifyNoMoreInteractions(agentClient); - verify(dmaapClient).send(anyString()); - verify(dmaapClient).sendBatchWithResponse(); + verify(dmaapClient).post(anyString(), anyString()); verifyNoMoreInteractions(dmaapClient); } @Test void successfulPut() throws IOException { doReturn(okResponse()).when(agentClient).putForEntity(anyString(), anyString()); - doReturn(1).when(dmaapClient).send(anyString()); - doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse(); + doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString()); StepVerifier // .create(testedObject.createTask(dmaapInputMessage(Operation.PUT))) // @@ -197,16 +194,14 @@ class DmaapMessageHandlerTest { verify(agentClient).putForEntity(URL, payloadAsString()); verifyNoMoreInteractions(agentClient); - verify(dmaapClient).send(anyString()); - verify(dmaapClient).sendBatchWithResponse(); + verify(dmaapClient).post(anyString(), anyString()); verifyNoMoreInteractions(dmaapClient); } @Test void successfulPost() throws IOException { doReturn(okResponse()).when(agentClient).postForEntity(anyString(), anyString()); - doReturn(1).when(dmaapClient).send(anyString()); - doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse(); + doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString()); StepVerifier // .create(testedObject.createTask(dmaapInputMessage(Operation.POST))) // @@ -217,34 +212,28 @@ class DmaapMessageHandlerTest { verify(agentClient).postForEntity(URL, payloadAsString()); verifyNoMoreInteractions(agentClient); - verify(dmaapClient).send(anyString()); - verify(dmaapClient).sendBatchWithResponse(); + verify(dmaapClient).post(anyString(), anyString()); verifyNoMoreInteractions(dmaapClient); } @Test void exceptionWhenCallingPolicyAgent_thenNotFoundResponse() throws IOException { - WebClientResponseException except = new WebClientResponseException(400, "Refused", null, null, null, null); - doReturn(Mono.error(except)).when(agentClient).putForEntity(anyString(), any()); - doReturn(1).when(dmaapClient).send(anyString()); - doReturn(new MRPublisherResponse()).when(dmaapClient).sendBatchWithResponse(); - StepVerifier // - .create(testedObject.createTask(dmaapInputMessage(Operation.PUT))) // - .expectSubscription() // - .verifyComplete(); // + doReturn(notOkResponse()).when(agentClient).putForEntity(anyString(), anyString()); + doReturn(Mono.just("OK")).when(dmaapClient).post(anyString(), anyString()); + + testedObject.createTask(dmaapInputMessage(Operation.PUT)).block(); verify(agentClient).putForEntity(anyString(), anyString()); verifyNoMoreInteractions(agentClient); ArgumentCaptor captor = ArgumentCaptor.forClass(String.class); - verify(dmaapClient).send(captor.capture()); + verify(dmaapClient).post(anyString(), captor.capture()); String actualMessage = captor.getValue(); - assertThat(actualMessage.contains(HttpStatus.BAD_REQUEST.toString())) - .as("Message \"%s\" sent to DMaaP contains %s", actualMessage, HttpStatus.BAD_REQUEST) // + assertThat(actualMessage.contains(HttpStatus.BAD_GATEWAY.toString())) + .as("Message \"%s\" sent to DMaaP contains %s", actualMessage, HttpStatus.BAD_GATEWAY) // .isTrue(); - verify(dmaapClient).sendBatchWithResponse(); verifyNoMoreInteractions(dmaapClient); } @@ -257,15 +246,10 @@ class DmaapMessageHandlerTest { testedObject.handleDmaapMsg(message); ArgumentCaptor captor = ArgumentCaptor.forClass(String.class); - verify(dmaapClient).send(captor.capture()); + verify(dmaapClient).post(anyString(), captor.capture()); String actualMessage = captor.getValue(); - assertThat(actualMessage - .contains(HttpStatus.BAD_REQUEST + "\",\"message\":\"Not implemented operation: " + badOperation)) // - .as("Message \"%s\" sent to DMaaP contains %s", actualMessage, HttpStatus.BAD_REQUEST) // - .isTrue(); - - verify(dmaapClient).sendBatchWithResponse(); - verifyNoMoreInteractions(dmaapClient); + assertThat(actualMessage.contains("Not implemented operation")).isTrue(); + assertThat(actualMessage.contains("BAD_REQUEST")).isTrue(); } @Test diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java index 2b1d2a7a..9be6c6f4 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RefreshConfigTaskTest.java @@ -355,8 +355,8 @@ class RefreshConfigTaskTest { ConfigParserResult configParserResult(RicConfig... rics) { return ImmutableConfigParserResult.builder() // .ricConfigs(Arrays.asList(rics)) // - .dmaapConsumerConfig(new Properties()) // - .dmaapPublisherConfig(new Properties()) // + .dmaapConsumerTopicUrl("") // + .dmaapProducerTopicUrl("") // .controllerConfigs(new HashMap<>()) // .build(); } diff --git a/policy-agent/src/test/resources/test_application_configuration.json b/policy-agent/src/test/resources/test_application_configuration.json index 0122fb91..3cbc371c 100644 --- a/policy-agent/src/test/resources/test_application_configuration.json +++ b/policy-agent/src/test/resources/test_application_configuration.json @@ -19,21 +19,21 @@ ] } ], - "streams_publishes":{ - "dmaap_publisher":{ - "type":"message_router", - "dmaap_info":{ - "topic_url":"http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-WRITE" + "streams_publishes": { + "dmaap_publisher": { + "type": "message_router", + "dmaap_info": { + "topic_url": "http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-WRITE" } } }, - "streams_subscribes":{ - "dmaap_subscriber":{ - "type":"message_router", - "dmaap_info":{ - "topic_url":"http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-READ/users/policy-agent" + "streams_subscribes": { + "dmaap_subscriber": { + "type": "message_router", + "dmaap_info": { + "topic_url": "http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-READ/users/policy-agent?timeout=15000&limit=100" } } } } -} +} \ No newline at end of file diff --git a/policy-agent/src/test/resources/test_application_configuration_with_dmaap_config.json b/policy-agent/src/test/resources/test_application_configuration_with_dmaap_config.json index 6d6cdce4..61ab31e8 100644 --- a/policy-agent/src/test/resources/test_application_configuration_with_dmaap_config.json +++ b/policy-agent/src/test/resources/test_application_configuration_with_dmaap_config.json @@ -1,47 +1,47 @@ { - "config":{ - "controller":[ + "config": { + "controller": [ { - "name":"controller1", - "baseUrl":"http://localhost:8083/", - "userName":"user", - "password":"password" - } + "name": "controller1", + "baseUrl": "http://localhost:8083/", + "userName": "user", + "password": "password" + } ], - "ric":[ + "ric": [ { - "name":"ric1", + "name": "ric1", "controller": "controller1", - "baseUrl":"http://localhost:8083/", - "managedElementIds":[ + "baseUrl": "http://localhost:8083/", + "managedElementIds": [ "kista_1", "kista_2" ] }, { - "name":"ric2", - "baseUrl":"http://localhost:8085/", - "managedElementIds":[ + "name": "ric2", + "baseUrl": "http://localhost:8085/", + "managedElementIds": [ "kista_3", "kista_4" ] } ], - "streams_publishes":{ - "dmaap_publisher":{ - "type":"message_router", - "dmaap_info":{ - "topic_url":"http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-WRITE" + "streams_publishes": { + "dmaap_publisher": { + "type": "message_router", + "dmaap_info": { + "topic_url": "http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-WRITE" } } }, - "streams_subscribes":{ - "dmaap_subscriber":{ - "type":"message_router", - "dmaap_info":{ - "topic_url":"http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-READ/users/policy-agent" + "streams_subscribes": { + "dmaap_subscriber": { + "type": "message_router", + "dmaap_info": { + "topic_url": "http://admin:admin@localhost:6845/events/A1-POLICY-AGENT-READ/users/policy-agent?timeout=15000&limit=100" } } } } -} +} \ No newline at end of file