Minor changes.
Removed an unused interface.
Change-Id: I30166c27546dc584d8ee4675af3d807e1175282f
Issue-ID: NONRTRIC-107
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
*/
@ApiOperation(value = "Gets the policy types from Near Realtime-RIC")
@GetMapping(POLICY_TYPES_METHOD)
- @Secured({DashboardConstants.ROLE_ADMIN, DashboardConstants.ROLE_STANDARD})
+ @Secured({ DashboardConstants.ROLE_ADMIN, DashboardConstants.ROLE_STANDARD })
public ResponseEntity<String> getAllPolicyTypes(HttpServletResponse response) {
logger.debug("getAllPolicyTypes");
return this.policyAgentApi.getAllPolicyTypes();
@ApiOperation(value = "Returns the policy instances for the given policy type.")
@GetMapping(POLICY_TYPES_METHOD + "/{" + POLICY_TYPE_ID_NAME + "}/" + POLICIES_NAME)
- @Secured({DashboardConstants.ROLE_ADMIN, DashboardConstants.ROLE_STANDARD})
+ @Secured({ DashboardConstants.ROLE_ADMIN, DashboardConstants.ROLE_STANDARD })
public ResponseEntity<String> getPolicyInstances(@PathVariable(POLICY_TYPE_ID_NAME) String policyTypeIdString) {
logger.debug("getPolicyInstances {}", policyTypeIdString);
return this.policyAgentApi.getPolicyInstancesForType(policyTypeIdString);
@ApiOperation(value = "Returns a policy instance of a type")
@GetMapping(POLICY_TYPES_METHOD + "/{" + POLICY_TYPE_ID_NAME + "}/" + POLICIES_NAME + "/{" + POLICY_INSTANCE_ID_NAME
- + "}")
- @Secured({DashboardConstants.ROLE_ADMIN, DashboardConstants.ROLE_STANDARD})
- public ResponseEntity<String> getPolicyInstance(@PathVariable(POLICY_TYPE_ID_NAME) String policyTypeIdString,
- @PathVariable(POLICY_INSTANCE_ID_NAME) String policyInstanceId) {
+ + "}")
+ @Secured({ DashboardConstants.ROLE_ADMIN, DashboardConstants.ROLE_STANDARD })
+ public ResponseEntity<Object> getPolicyInstance(@PathVariable(POLICY_TYPE_ID_NAME) String policyTypeIdString,
+ @PathVariable(POLICY_INSTANCE_ID_NAME) String policyInstanceId) {
logger.debug("getPolicyInstance {}:{}", policyTypeIdString, policyInstanceId);
return this.policyAgentApi.getPolicyInstance(policyInstanceId);
}
@ApiOperation(value = "Creates the policy instances for the given policy type.")
@PutMapping(POLICY_TYPES_METHOD + "/{" + POLICY_TYPE_ID_NAME + "}/" + POLICIES_NAME + "/{" + POLICY_INSTANCE_ID_NAME
- + "}")
- @Secured({DashboardConstants.ROLE_ADMIN})
+ + "}")
+ @Secured({ DashboardConstants.ROLE_ADMIN })
public ResponseEntity<String> putPolicyInstance(@PathVariable(POLICY_TYPE_ID_NAME) String policyTypeIdString,
- @RequestParam(name = "ric", required = true) String ric,
- @PathVariable(POLICY_INSTANCE_ID_NAME) String policyInstanceId, @RequestBody String instance) {
+ @RequestParam(name = "ric", required = true) String ric,
+ @PathVariable(POLICY_INSTANCE_ID_NAME) String policyInstanceId, @RequestBody String instance) {
logger.debug("putPolicyInstance typeId: {}, instanceId: {}, instance: {}", policyTypeIdString, policyInstanceId,
- instance);
+ instance);
return this.policyAgentApi.putPolicy(policyTypeIdString, policyInstanceId, instance, ric);
}
@ApiOperation(value = "Deletes the policy instances for the given policy type.")
@DeleteMapping(POLICY_TYPES_METHOD + "/{" + POLICY_TYPE_ID_NAME + "}/" + POLICIES_NAME + "/{"
- + POLICY_INSTANCE_ID_NAME + "}")
- @Secured({DashboardConstants.ROLE_ADMIN})
+ + POLICY_INSTANCE_ID_NAME + "}")
+ @Secured({ DashboardConstants.ROLE_ADMIN })
public ResponseEntity<String> deletePolicyInstance(@PathVariable(POLICY_TYPE_ID_NAME) String policyTypeIdString,
- @PathVariable(POLICY_INSTANCE_ID_NAME) String policyInstanceId) {
+ @PathVariable(POLICY_INSTANCE_ID_NAME) String policyInstanceId) {
logger.debug("deletePolicyInstance typeId: {}, instanceId: {}", policyTypeIdString, policyInstanceId);
return this.policyAgentApi.deletePolicy(policyInstanceId);
}
@ApiOperation(value = "Returns the rics supporting the given policy type.")
@GetMapping("/rics")
- @Secured({DashboardConstants.ROLE_ADMIN, DashboardConstants.ROLE_STANDARD})
+ @Secured({ DashboardConstants.ROLE_ADMIN, DashboardConstants.ROLE_STANDARD })
public ResponseEntity<String> getRicsSupportingType(
- @RequestParam(name = "policyType", required = true) String supportingPolicyType) {
+ @RequestParam(name = "policyType", required = true) String supportingPolicyType) {
logger.debug("getRicsSupportingType {}", supportingPolicyType);
return this.policyAgentApi.getRicsSupportingType(supportingPolicyType);
public String ric();
- public String json();
+ public Object json();
public String service();
String name;
@JsonProperty("schema")
- String schema;
+ Object schema;
- public PolicyType(String name, String schema) {
+ public PolicyType(String name, Object schema) {
this.name = name;
this.schema = schema;
}
this.name = name;
}
- public String getSchema() {
+ public Object getSchema() {
return schema;
}
- public void setSchema(String schema) {
+ public void setSchema(Object schema) {
this.schema = schema;
}
@Override
public String toString() {
- return "[name:" + name + ", schema:" + schema + "]";
+ return "[name:" + name + ", schema:" + schema.toString() + "]";
}
}
public ResponseEntity<String> getPolicyInstancesForType(String type);
- public ResponseEntity<String> getPolicyInstance(String id);
+ public ResponseEntity<Object> getPolicyInstance(String id);
- public ResponseEntity<String> putPolicy(String policyTypeIdString, String policyInstanceId, String json,
- String ric);
+ public ResponseEntity<String> putPolicy(String policyTypeIdString, String policyInstanceId, Object json,
+ String ric);
public ResponseEntity<String> deletePolicy(String policyInstanceId);
RestTemplate restTemplate = new RestTemplate();
private static com.google.gson.Gson gson = new GsonBuilder() //
- .serializeNulls() //
- .create(); //
+ .serializeNulls() //
+ .create(); //
private final String urlPrefix;
@Autowired
public PolicyAgentApiImpl(
- @org.springframework.beans.factory.annotation.Value("${policycontroller.url.prefix}") final String urlPrefix) {
+ @org.springframework.beans.factory.annotation.Value("${policycontroller.url.prefix}") final String urlPrefix) {
logger.debug("ctor prefix '{}'", urlPrefix);
this.urlPrefix = urlPrefix;
}
}
try {
- Type listType = new TypeToken<List<ImmutablePolicyInfo>>() {}.getType();
+ Type listType = new TypeToken<List<ImmutablePolicyInfo>>() {
+ }.getType();
List<PolicyInfo> rspParsed = gson.fromJson(rsp.getBody(), listType);
PolicyInstances result = new PolicyInstances();
for (PolicyInfo p : rspParsed) {
}
@Override
- public ResponseEntity<String> getPolicyInstance(String id) {
+ public ResponseEntity<Object> getPolicyInstance(String id) {
String url = baseUrl() + "/policy?instance={id}";
Map<String, ?> uriVariables = Map.of("id", id);
- return this.restTemplate.getForEntity(url, String.class, uriVariables);
+ return this.restTemplate.getForEntity(url, Object.class, uriVariables);
}
@Override
- public ResponseEntity<String> putPolicy(String policyTypeIdString, String policyInstanceId, String json,
- String ric) {
+ public ResponseEntity<String> putPolicy(String policyTypeIdString, String policyInstanceId, Object json,
+ String ric) {
String url = baseUrl() + "/policy?type={type}&instance={instance}&ric={ric}&service={service}";
Map<String, ?> uriVariables = Map.of( //
- "type", policyTypeIdString, //
- "instance", policyInstanceId, //
- "ric", ric, //
- "service", "dashboard");
+ "type", policyTypeIdString, //
+ "instance", policyInstanceId, //
+ "ric", ric, //
+ "service", "dashboard");
try {
this.restTemplate.put(url, createJsonHttpEntity(json), uriVariables);
String rsp = this.restTemplate.getForObject(url, String.class, uriVariables);
try {
- Type listType = new TypeToken<List<ImmutableRicInfo>>() {}.getType();
+ Type listType = new TypeToken<List<ImmutableRicInfo>>() {
+ }.getType();
List<RicInfo> rspParsed = gson.fromJson(rsp, listType);
Collection<String> result = new Vector<>(rspParsed.size());
for (RicInfo ric : rspParsed) {
}
}
- private HttpEntity<String> createJsonHttpEntity(String content) {
+ private HttpEntity<Object> createJsonHttpEntity(Object content) {
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
- return new HttpEntity<String>(content, headers);
+ return new HttpEntity<Object>(content, headers);
}
}
private final Database database = new Database();
@Override
- public ResponseEntity<String> getPolicyInstance(String id) {
+ public ResponseEntity<Object> getPolicyInstance(String id) {
return new ResponseEntity<>(database.getInstance(id), HttpStatus.OK);
}
@Override
- public ResponseEntity<String> putPolicy(String policyTypeIdString, String policyInstanceId, String json,
+ public ResponseEntity<String> putPolicy(String policyTypeIdString, String policyInstanceId, Object json,
String ric) {
database.putInstance(policyTypeIdString, policyInstanceId, json, ric);
return new ResponseEntity<>("Policy was put successfully", HttpStatus.OK);
return java.time.Instant.now().toString();
}
- void putInstance(String typeId, String instanceId, String instanceData, String ric) {
+ void putInstance(String typeId, String instanceId, Object instanceData, String ric) {
PolicyInfo i = ImmutablePolicyInfo.builder().json(instanceData).lastModified(getTimeStampUTC())
.id(instanceId).ric(ric).service("service").type(typeId).build();
instances.put(instanceId, i);
instances.remove(instanceId);
}
- String getInstance(String id) throws RestClientException {
+ Object getInstance(String id) throws RestClientException {
PolicyInfo i = instances.get(id);
if (i == null) {
throw new RestClientException("Type not found: " + id);
schema:
type: array
items:
- type: string
+ type: object
'401':
description: Unauthorized
'403':
produces:
- '*/*'
parameters:
- - name: name
+ - name: serviceName
in: query
- description: name
+ description: serviceName
required: true
type: string
responses:
produces:
- '*/*'
parameters:
- - name: name
+ - name: serviceName
in: query
- description: name
+ description: serviceName
required: true
type: string
responses:
get:
tags:
- status-controller
- summary: Returns status and statistics of the service
+ summary: Returns status and statistics of this service
operationId: getStatusUsingGET
produces:
- '*/*'
type: string
description: identity of the policy
json:
- type: string
+ type: object
description: the configuration of the policy
lastModified:
type: string
type: integer
format: int64
description: keep alive interval for policies owned by the service. 0 means no timeout supervision. Polcies that are not refreshed within this time are removed
- name:
+ serviceName:
type: string
description: identity of the service
title: ServiceRegistrationInfo
type: integer
format: int64
description: policy keep alive timeout
- name:
+ serviceName:
type: string
description: identity of the service
timeSincePingSeconds:
@ApiOperation(value = "Returns policy type schema definitions")
@ApiResponses(
value = {
- @ApiResponse(code = 200, message = "Policy schemas", response = String.class, responseContainer = "List")})
+ @ApiResponse(code = 200, message = "Policy schemas", response = Object.class, responseContainer = "List")})
public ResponseEntity<String> getPolicySchemas(@RequestParam(name = "ric", required = false) String ricName) {
synchronized (this.policyTypes) {
if (ricName == null) {
for (Policy p : policies) {
PolicyInfo policyInfo = new PolicyInfo();
policyInfo.id = p.id();
- policyInfo.json = p.json();
+ policyInfo.json = fromJson(p.json());
policyInfo.ric = p.ric().name();
policyInfo.type = p.type().name();
policyInfo.service = p.ownerServiceName();
return gson.toJson(v);
}
+ private Object fromJson(String jsonStr) {
+ return gson.fromJson(jsonStr, Object.class);
+ }
+
private String toPolicyTypeSchemasJson(Collection<PolicyType> types) {
StringBuilder result = new StringBuilder();
result.append("[");
public String ric;
@ApiModelProperty(value = "the configuration of the policy")
- public String json;
+ public Object json;
@ApiModelProperty(value = "the name of the service owning the policy")
public String service;
}
/**
- * Example: http://localhost:8080/rics?managedElementId=kista_1
+ * Example: http://localhost:8081/rics?managedElementId=kista_1
*/
@GetMapping("/ric")
@ApiOperation(value = "Returns the name of a RIC managing one Mananged Element")
/**
* @return a Json array of all RIC data
- * Example: http://localhost:8080/ric
+ * Example: http://localhost:8081/ric
*/
@GetMapping("/rics")
@ApiOperation(value = "Query NearRT RIC information")
@ApiResponses(value = {@ApiResponse(code = 200, message = "OK")})
@DeleteMapping("/services")
public ResponseEntity<String> deleteService( //
- @RequestParam(name = "name", required = true) String name) {
+ @RequestParam(name = "serviceName", required = true) String serviceName) {
try {
- Service service = removeService(name);
+ Service service = removeService(serviceName);
// Remove the policies from the repo and let the consistency monitoring
// do the rest.
removePolicies(service);
@ApiResponse(code = 404, message = "The service is not found, needs re-registration")})
@PostMapping("/services/keepalive")
public ResponseEntity<String> keepAliveService( //
- @RequestParam(name = "name", required = true) String name) {
+ @RequestParam(name = "serviceName", required = true) String serviceName) {
try {
- services.getService(name).ping();
+ services.getService(serviceName).ping();
return new ResponseEntity<String>("OK", HttpStatus.OK);
} catch (Exception e) {
return new ResponseEntity<String>(e.getMessage(), HttpStatus.NOT_FOUND);
}
private Service toService(ServiceRegistrationInfo s) {
- return new Service(s.name, Duration.ofSeconds(s.keepAliveIntervalSeconds), s.callbackUrl);
+ return new Service(s.serviceName, Duration.ofSeconds(s.keepAliveIntervalSeconds), s.callbackUrl);
}
}
public class ServiceRegistrationInfo {
@ApiModelProperty(value = "identity of the service")
- public String name;
+ public String serviceName;
@ApiModelProperty(
value = "keep alive interval for policies owned by the service. 0 means no timeout supervision."
}
public ServiceRegistrationInfo(String name, long keepAliveIntervalSeconds, String callbackUrl) {
- this.name = name;
+ this.serviceName = name;
this.keepAliveIntervalSeconds = keepAliveIntervalSeconds;
this.callbackUrl = callbackUrl;
}
public class ServiceStatus {
@ApiModelProperty(value = "identity of the service")
- public final String name;
+ public final String serviceName;
@ApiModelProperty(value = "policy keep alive timeout")
public final long keepAliveIntervalSeconds;
public final long timeSincePingSeconds;
ServiceStatus(String name, long keepAliveIntervalSeconds, long timeSincePingSeconds) {
- this.name = name;
+ this.serviceName = name;
this.keepAliveIntervalSeconds = keepAliveIntervalSeconds;
this.timeSincePingSeconds = timeSincePingSeconds;
}
public class StatusController {
@GetMapping("/status")
- @ApiOperation(value = "Returns status and statistics of the service")
+ @ApiOperation(value = "Returns status and statistics of this service")
@ApiResponses(
value = { //
@ApiResponse(code = 200, message = "Service is living", response = String.class) //
-
/*-
* ========================LICENSE_START=================================
* O-RAN-SC
package org.oransc.policyagent.dmaap;
-/**
- * The Dmaap consumer which has the base methods to be implemented by any class which implements this interface
- *
- */
-public interface DmaapMessageConsumer {
-
- /**
- * The init method creates the MRConsumer with the properties passed from the Application Config
- *
- * @param properties
- */
- public void init();
-
- /**
- * This method process the message and call the respective Controller
- *
- * @param msg
- * @throws Exception
- */
- public abstract void processMsg(String msg) throws Exception;
-
- /**
- * To check whether the DMAAP Listner is alive
- *
- * @return boolean
- */
- public boolean isAlive();
-
- /**
- * It's a infinite loop run every configured seconds to fetch the message from DMAAP. This method can be stop by
- * setting the alive flag to false
- */
- public void run();
+import com.google.common.collect.Iterables;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Properties;
+
+import org.onap.dmaap.mr.client.MRBatchingPublisher;
+import org.onap.dmaap.mr.client.MRClientFactory;
+import org.onap.dmaap.mr.client.MRConsumer;
+import org.onap.dmaap.mr.client.response.MRConsumerResponse;
+import org.oransc.policyagent.clients.AsyncRestClient;
+import org.oransc.policyagent.configuration.ApplicationConfig;
+import org.oransc.policyagent.exceptions.ServiceException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+@Component
+public class DmaapMessageConsumer implements Runnable {
+
+ private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumer.class);
+
+ final Duration TIME_BETWEEN_DMAAP_POLLS = Duration.ofSeconds(10);
+ private final ApplicationConfig applicationConfig;
+
+ @Value("${server.port}")
+ private int localServerPort;
+
+ @Autowired
+ public DmaapMessageConsumer(ApplicationConfig applicationConfig) {
+ this.applicationConfig = applicationConfig;
+
+ Thread thread = new Thread(this);
+ thread.start();
+ }
+
+ private boolean isDmaapConfigured() {
+ Properties consumerCfg = applicationConfig.getDmaapConsumerConfig();
+ Properties producerCfg = applicationConfig.getDmaapPublisherConfig();
+ return (consumerCfg != null && consumerCfg.size() > 0 && producerCfg != null && producerCfg.size() > 0);
+ }
+
+ @Override
+ public void run() {
+ while (sleep(TIME_BETWEEN_DMAAP_POLLS) && isDmaapConfigured()) {
+ try {
+ Iterable<String> dmaapMsgs = fetchAllMessages();
+ if (dmaapMsgs != null && Iterables.size(dmaapMsgs) > 0) {
+ logger.debug("Fetched all the messages from DMAAP and will start to process the messages");
+ for (String msg : dmaapMsgs) {
+ processMsg(msg);
+ }
+ }
+ } catch (Exception e) {
+ logger.warn("{}: cannot fetch because of ", this, e.getMessage(), e);
+ sleep(TIME_BETWEEN_DMAAP_POLLS);
+ }
+ }
+ }
+
+ private Iterable<String> fetchAllMessages() throws ServiceException, FileNotFoundException, IOException {
+ Properties dmaapConsumerProperties = this.applicationConfig.getDmaapConsumerConfig();
+ MRConsumer consumer = MRClientFactory.createConsumer(dmaapConsumerProperties);
+ MRConsumerResponse response = consumer.fetchWithReturnConsumerResponse();
+ if (response == null || !"200".equals(response.getResponseCode())) {
+ throw new ServiceException("DMaaP NULL response received");
+ } else {
+ logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(), response.getResponseMessage());
+ return response.getActualMessages();
+ }
+ }
+
+ private void processMsg(String msg) throws Exception {
+ logger.debug("Message Reveived from DMAAP : {}", msg);
+ createDmaapMessageHandler().handleDmaapMsg(msg);
+ }
+
+ private DmaapMessageHandler createDmaapMessageHandler() throws FileNotFoundException, IOException {
+ String agentBaseUrl = "http://localhost:" + this.localServerPort;
+ AsyncRestClient agentClient = new AsyncRestClient(agentBaseUrl);
+ Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig();
+ MRBatchingPublisher producer = MRClientFactory.createBatchingPublisher(dmaapPublisherProperties);
+
+ return new DmaapMessageHandler(producer, this.applicationConfig, agentClient);
+ }
+ private boolean sleep(Duration duration) {
+ try {
+ Thread.sleep(duration.toMillis());
+ return true;
+ } catch (Exception e) {
+ logger.error("Failed to put the thread to sleep", e);
+ return false;
+ }
+ }
}
+++ /dev/null
-/*-
- * ========================LICENSE_START=================================
- * O-RAN-SC
- * %%
- * Copyright (C) 2019 Nordix Foundation
- * %%
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- * ========================LICENSE_END===================================
- */
-
-package org.oransc.policyagent.dmaap;
-
-import com.google.common.collect.Iterables;
-
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.time.Duration;
-import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.onap.dmaap.mr.client.MRBatchingPublisher;
-import org.onap.dmaap.mr.client.MRClientFactory;
-import org.onap.dmaap.mr.client.MRConsumer;
-import org.onap.dmaap.mr.client.response.MRConsumerResponse;
-import org.oransc.policyagent.clients.AsyncRestClient;
-import org.oransc.policyagent.configuration.ApplicationConfig;
-import org.oransc.policyagent.exceptions.ServiceException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.stereotype.Component;
-
-@Component
-public class DmaapMessageConsumerImpl implements Runnable {
-
- private static final Logger logger = LoggerFactory.getLogger(DmaapMessageConsumerImpl.class);
-
- final Duration ERROR_TIMEOUT = Duration.ofSeconds(30);
- final Duration TIME_BETWEEN_DMAAP_POLLS = Duration.ofSeconds(10);
- private final ApplicationConfig applicationConfig;
-
- @Value("${server.port}")
- private int localServerPort;
-
- @Autowired
- public DmaapMessageConsumerImpl(ApplicationConfig applicationConfig) {
- this.applicationConfig = applicationConfig;
-
- Thread thread = new Thread(this);
- thread.start();
- }
-
- private boolean isDmaapConfigured() {
- Properties consumerCfg = applicationConfig.getDmaapConsumerConfig();
- Properties producerCfg = applicationConfig.getDmaapPublisherConfig();
- return (consumerCfg != null && consumerCfg.size() > 0 && producerCfg != null && producerCfg.size() > 0);
- }
-
- @Override
- public void run() {
- while (sleep(TIME_BETWEEN_DMAAP_POLLS) && isDmaapConfigured()) {
- try {
- Iterable<String> dmaapMsgs = fetchAllMessages();
- if (dmaapMsgs != null && Iterables.size(dmaapMsgs) > 0) {
- logger.debug("Fetched all the messages from DMAAP and will start to process the messages");
- for (String msg : dmaapMsgs) {
- processMsg(msg);
- }
- }
- } catch (Exception e) {
- logger.error("{}: cannot fetch because of ", this, e.getMessage(), e);
- sleep(ERROR_TIMEOUT);
- }
- }
- }
-
- private Iterable<String> fetchAllMessages() throws ServiceException, FileNotFoundException, IOException {
- Properties dmaapConsumerProperties = this.applicationConfig.getDmaapConsumerConfig();
- MRConsumer consumer = MRClientFactory.createConsumer(dmaapConsumerProperties);
- MRConsumerResponse response = consumer.fetchWithReturnConsumerResponse();
- if (response == null || !"200".equals(response.getResponseCode())) {
- throw new ServiceException("DMaaP NULL response received");
- } else {
- logger.debug("DMaaP consumer received {} : {}", response.getResponseCode(), response.getResponseMessage());
- return response.getActualMessages();
- }
- }
-
- private void processMsg(String msg) throws Exception {
- logger.debug("Message Reveived from DMAAP : {}", msg);
- createDmaapMessageHandler().handleDmaapMsg(msg);
- }
-
- private DmaapMessageHandler createDmaapMessageHandler() throws FileNotFoundException, IOException {
- String agentBaseUrl = "http://localhost:" + this.localServerPort;
- AsyncRestClient agentClient = new AsyncRestClient(agentBaseUrl);
- Properties dmaapPublisherProperties = applicationConfig.getDmaapPublisherConfig();
- MRBatchingPublisher producer = MRClientFactory.createBatchingPublisher(dmaapPublisherProperties);
-
- return new DmaapMessageHandler(producer, this.applicationConfig, agentClient);
- }
-
- private boolean sleep(Duration duration) {
- CountDownLatch sleep = new CountDownLatch(1);
- try {
- sleep.await(duration.toMillis(), TimeUnit.MILLISECONDS);
- return true;
- } catch (Exception e) {
- logger.error("msg", e);
- return false;
- }
- }
-}
String rsp = this.restTemplate.getForObject(url, String.class);
System.out.println("*** " + rsp);
assertThat(rsp).contains("type1");
- assertThat(rsp).contains("type2");
- assertThat(rsp).contains("title");
+ assertThat(rsp).contains("[{\"title\":\"type2\"}");
List<String> info = parseSchemas(rsp);
assertEquals(2, info.size());
assertThat(info.size() == 1);
ServiceStatus status = info.iterator().next();
assertThat(status.keepAliveIntervalSeconds == 1);
- assertThat(status.name.equals("name"));
+ assertThat(status.serviceName.equals("name"));
// GET (all)
url = baseUrl() + "/services";