From: PatrikBuhr Date: Wed, 11 Mar 2020 13:15:50 +0000 (+0100) Subject: Added a test for service supervision X-Git-Tag: 2.0.0~123^2 X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=commitdiff_plain;h=c5c251953f36a3a56613ad28f2d73f958ff58295;p=nonrtric.git Added a test for service supervision Added that when service supervision deletes policices, it will grant a shared lock of the RIC (which it must). PUT and DELETE a policy will trigger keep alive of the owning service Change-Id: I6770694a5dbff3b8d635a9997265292ad3c0f524 Issue-ID: NONRTRIC-155 Signed-off-by: PatrikBuhr --- diff --git a/policy-agent/config/application.yaml b/policy-agent/config/application.yaml index 90b73a46..9fb3fba5 100644 --- a/policy-agent/config/application.yaml +++ b/policy-agent/config/application.yaml @@ -15,7 +15,7 @@ logging: org.springframework: ERROR org.springframework.data: ERROR org.springframework.web.reactive.function.client.ExchangeFunctions: ERROR - org.oransc.policyagent: WARN + org.oransc.policyagent: INFO file: /var/log/policy-agent/application.log app: filepath: /opt/app/policy-agent/config/application_configuration.json diff --git a/policy-agent/docs/api.yaml b/policy-agent/docs/api.yaml index 9c7d96ef..a2f1b57c 100644 --- a/policy-agent/docs/api.yaml +++ b/policy-agent/docs/api.yaml @@ -431,11 +431,11 @@ paths: '200': description: Policy updated schema: - type: string + type: object '201': description: Policy created schema: - type: string + type: object '401': description: Unauthorized '403': @@ -869,6 +869,8 @@ definitions: title: RicInfo ServiceRegistrationInfo: type: object + required: + - serviceName properties: callbackUrl: type: string diff --git a/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java b/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java index 1af607f4..987cd97a 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java @@ -33,7 +33,6 @@ import java.util.Collection; import java.util.List; import org.oransc.policyagent.clients.A1ClientFactory; -import org.oransc.policyagent.configuration.ApplicationConfig; import org.oransc.policyagent.exceptions.ServiceException; import org.oransc.policyagent.repository.ImmutablePolicy; import org.oransc.policyagent.repository.Lock.LockType; @@ -43,6 +42,8 @@ import org.oransc.policyagent.repository.PolicyType; import org.oransc.policyagent.repository.PolicyTypes; import org.oransc.policyagent.repository.Ric; import org.oransc.policyagent.repository.Rics; +import org.oransc.policyagent.repository.Service; +import org.oransc.policyagent.repository.Services; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; @@ -58,24 +59,21 @@ import reactor.core.publisher.Mono; @Api(tags = "A1 Policy Management") public class PolicyController { - private final Rics rics; - private final PolicyTypes policyTypes; - private final Policies policies; - private final A1ClientFactory a1ClientFactory; + @Autowired + private Rics rics; + @Autowired + private PolicyTypes policyTypes; + @Autowired + private Policies policies; + @Autowired + private A1ClientFactory a1ClientFactory; + @Autowired + private Services services; private static Gson gson = new GsonBuilder() // .serializeNulls() // .create(); // - @Autowired - PolicyController(ApplicationConfig config, PolicyTypes types, Policies policies, Rics rics, - A1ClientFactory a1ClientFactory) { - this.policyTypes = types; - this.policies = policies; - this.rics = rics; - this.a1ClientFactory = a1ClientFactory; - } - @GetMapping("/policy_schemas") @ApiOperation(value = "Returns policy type schema definitions") @ApiResponses( @@ -165,8 +163,13 @@ public class PolicyController { @ApiResponse(code = 423, message = "RIC is locked", response = String.class)}) public Mono> deletePolicy( // @RequestParam(name = "instance", required = true) String id) { - Policy policy = policies.get(id); - if (policy != null && policy.ric().getState() == Ric.RicState.IDLE) { + Policy policy; + try { + policy = policies.getPolicy(id); + keepServiceAlive(policy.ownerServiceName()); + if (policy.ric().getState() != Ric.RicState.IDLE) { + return Mono.just(new ResponseEntity<>("Busy, recovering", HttpStatus.LOCKED)); + } Ric ric = policy.ric(); return ric.getLock().lock(LockType.SHARED) // // .flatMap(lock -> a1ClientFactory.createA1Client(policy.ric())) // @@ -175,9 +178,7 @@ public class PolicyController { .doOnNext(notUsed -> ric.getLock().unlockBlocking()) // .doOnError(notUsed -> ric.getLock().unlockBlocking()) // .flatMap(notUsed -> Mono.just(new ResponseEntity<>(HttpStatus.NO_CONTENT))); - } else if (policy != null) { - return Mono.just(new ResponseEntity<>("Busy, recovering", HttpStatus.LOCKED)); - } else { + } catch (ServiceException e) { return Mono.just(new ResponseEntity<>(HttpStatus.NOT_FOUND)); } } @@ -186,8 +187,8 @@ public class PolicyController { @ApiOperation(value = "Put a policy", response = String.class) @ApiResponses( value = { // - @ApiResponse(code = 201, message = "Policy created"), // - @ApiResponse(code = 200, message = "Policy updated"), // + @ApiResponse(code = 201, message = "Policy created", response = Object.class), // + @ApiResponse(code = 200, message = "Policy updated", response = Object.class), // @ApiResponse(code = 423, message = "RIC is locked", response = String.class), // @ApiResponse(code = 404, message = "RIC or policy type is not found", response = String.class), // @ApiResponse(code = 405, message = "Change is not allowed", response = String.class)}) @@ -201,6 +202,7 @@ public class PolicyController { String jsonString = gson.toJson(jsonBody); Ric ric = rics.get(ricName); PolicyType type = policyTypes.get(typeName); + keepServiceAlive(service); if (ric != null && type != null && ric.getState() == Ric.RicState.IDLE) { Policy policy = ImmutablePolicy.builder() // .id(instanceId) // @@ -301,6 +303,13 @@ public class PolicyController { } } + private void keepServiceAlive(String name) { + Service s = this.services.get(name); + if (s != null) { + s.keepAlive(); + } + } + private boolean include(String filter, String value) { return filter == null || value.equals(filter); } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceController.java b/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceController.java index 35348c7d..3d362282 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceController.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceController.java @@ -57,7 +57,6 @@ public class ServiceController { private final Policies policies; private static Gson gson = new GsonBuilder() // - .serializeNulls() // .create(); // @Autowired @@ -97,6 +96,12 @@ public class ServiceController { s.getCallbackUrl()); } + private void validateRegistrationInfo(ServiceRegistrationInfo registrationInfo) throws ServiceException { + if (registrationInfo.serviceName.isEmpty()) { + throw new ServiceException("Missing mandatory parameter 'serviceName'"); + } + } + @ApiOperation(value = "Register a service") @ApiResponses( value = { // @@ -106,6 +111,7 @@ public class ServiceController { public ResponseEntity putService(// @RequestBody ServiceRegistrationInfo registrationInfo) { try { + validateRegistrationInfo(registrationInfo); this.services.put(toService(registrationInfo)); return new ResponseEntity<>("OK", HttpStatus.OK); } catch (Exception e) { @@ -132,18 +138,18 @@ public class ServiceController { } } - @ApiOperation(value = "Keep the policies alive for a service") + @ApiOperation(value = "Heartbeat from a serice") @ApiResponses( value = { // - @ApiResponse(code = 200, message = "Policies timeout supervision refreshed"), + @ApiResponse(code = 200, message = "Service supervision timer refreshed, OK"), @ApiResponse(code = 404, message = "The service is not found, needs re-registration")}) @PostMapping("/services/keepalive") public ResponseEntity keepAliveService(// @RequestParam(name = "name", required = true) String serviceName) { try { - services.getService(serviceName).ping(); + services.getService(serviceName).keepAlive(); return new ResponseEntity<>("OK", HttpStatus.OK); - } catch (Exception e) { + } catch (ServiceException e) { return new ResponseEntity<>(e.getMessage(), HttpStatus.NOT_FOUND); } } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceRegistrationInfo.java b/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceRegistrationInfo.java index 907fa1c4..e532a362 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceRegistrationInfo.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceRegistrationInfo.java @@ -20,6 +20,8 @@ package org.oransc.policyagent.controllers; +import com.google.gson.annotations.SerializedName; + import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; @@ -29,16 +31,23 @@ import org.immutables.gson.Gson; @ApiModel(value = "ServiceRegistrationInfo") public class ServiceRegistrationInfo { - @ApiModelProperty(value = "identity of the service") - public String serviceName; + @ApiModelProperty(value = "identity of the service", required = true, allowEmptyValue = false) + @SerializedName(value = "serviceName", alternate = {"name"}) - @ApiModelProperty( - value = "keep alive interval for policies owned by the service. 0 means no timeout supervision." - + " Polcies that are not refreshed within this time are removed") - public long keepAliveIntervalSeconds; + public String serviceName = ""; - @ApiModelProperty(value = "callback for notifying of RIC recovery") - public String callbackUrl; + @ApiModelProperty( + value = "keep alive interval for the service. This is a heartbeat supervision of the service, " + + "which in regular intevals must invoke a 'keepAlive' REST call. " + + "When a service does not invoke this call within the given time, it is considered unavailble. " + + "An unavailable service will be automatically deregistered and its policies will be deleted. " + + "Value 0 means no timeout supervision.") + @SerializedName("keepAliveIntervalSeconds") + public long keepAliveIntervalSeconds = 0; + + @ApiModelProperty(value = "callback for notifying of RIC recovery", required = false, allowEmptyValue = true) + @SerializedName("callbackUrl") + public String callbackUrl = ""; public ServiceRegistrationInfo() { } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceStatus.java b/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceStatus.java index 1a9cf327..8f4daac1 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceStatus.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceStatus.java @@ -36,7 +36,7 @@ public class ServiceStatus { public final long keepAliveIntervalSeconds; @ApiModelProperty(value = "time since last invocation by the service") - public final long timeSincePingSeconds; + public final long timeSinceLastActivitySeconds; @ApiModelProperty(value = "callback for notifying of RIC recovery") public String callbackUrl; @@ -44,7 +44,7 @@ public class ServiceStatus { ServiceStatus(String name, long keepAliveIntervalSeconds, long timeSincePingSeconds, String callbackUrl) { this.serviceName = name; this.keepAliveIntervalSeconds = keepAliveIntervalSeconds; - this.timeSincePingSeconds = timeSincePingSeconds; + this.timeSinceLastActivitySeconds = timeSincePingSeconds; this.callbackUrl = callbackUrl; } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Service.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Service.java index f0863a5e..7b2c9bde 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Service.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Service.java @@ -36,14 +36,14 @@ public class Service { this.name = name; this.keepAliveInterval = keepAliveInterval; this.callbackUrl = callbackUrl; - ping(); + keepAlive(); } public synchronized Duration getKeepAliveInterval() { return this.keepAliveInterval; } - public synchronized void ping() { + public synchronized void keepAlive() { this.lastPing = Instant.now(); } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/repository/Services.java b/policy-agent/src/main/java/org/oransc/policyagent/repository/Services.java index 568f0029..f6c55dc4 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/repository/Services.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/repository/Services.java @@ -20,6 +20,7 @@ package org.oransc.policyagent.repository; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -50,7 +51,7 @@ public class Services { } public synchronized Iterable getAll() { - return registeredServices.values(); + return Collections.unmodifiableCollection(registeredServices.values()); } public synchronized void remove(String name) { diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSupervision.java similarity index 91% rename from policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java rename to policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSupervision.java index 22905f6c..d6013de7 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSupervision.java @@ -47,8 +47,8 @@ import reactor.core.publisher.Mono; */ @Component @EnableScheduling -public class RepositorySupervision { - private static final Logger logger = LoggerFactory.getLogger(RepositorySupervision.class); +public class RicSupervision { + private static final Logger logger = LoggerFactory.getLogger(RicSupervision.class); private final Rics rics; private final Policies policies; @@ -57,7 +57,7 @@ public class RepositorySupervision { private final Services services; @Autowired - public RepositorySupervision(Rics rics, Policies policies, A1ClientFactory a1ClientFactory, PolicyTypes policyTypes, + public RicSupervision(Rics rics, Policies policies, A1ClientFactory a1ClientFactory, PolicyTypes policyTypes, Services services) { this.rics = rics; this.policies = policies; @@ -72,7 +72,9 @@ public class RepositorySupervision { @Scheduled(fixedRate = 1000 * 60) public void checkAllRics() { logger.debug("Checking Rics starting"); - createTask().subscribe(this::onRicChecked, null, this::onComplete); + createTask().subscribe(ric -> logger.debug("Ric: {} checked", ric.ric.name()), // + null, // + () -> logger.debug("Checking Rics completed")); } private Flux createTask() { @@ -163,15 +165,6 @@ public class RepositorySupervision { return Mono.error(new Exception("Syncronization started")); } - @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally - private void onRicChecked(RicData ric) { - logger.debug("Ric: {} checked", ric.ric.name()); - } - - private void onComplete() { - logger.debug("Checking Rics completed"); - } - RicSynchronizationTask createSynchronizationTask() { return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services); } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java index 9f88d5cf..0a0ab826 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java @@ -103,7 +103,7 @@ public class RicSynchronizationTask { @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally private void onSynchronizationComplete(Ric ric) { - logger.debug("Synchronization completed for: {}", ric.name()); + logger.info("Synchronization completed for: {}", ric.name()); ric.setState(RicState.IDLE); notifyAllServices("Synchronization completed for:" + ric.name()); } diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java index 8d7062b1..4be26eb9 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java @@ -20,7 +20,11 @@ package org.oransc.policyagent.tasks; +import java.time.Duration; + import org.oransc.policyagent.clients.A1ClientFactory; +import org.oransc.policyagent.repository.Lock; +import org.oransc.policyagent.repository.Lock.LockType; import org.oransc.policyagent.repository.Policies; import org.oransc.policyagent.repository.Policy; import org.oransc.policyagent.repository.Service; @@ -29,16 +33,17 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.EnableScheduling; -import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; /** - * Periodically checks that services with a keepAliveInterval set are alive. If a service is deemed not alive, - * all the service's policies are deleted, both in the repository and in the affected Rics, and the service is - * removed from the repository. This means that the service needs to register again after this. + * Periodically checks that services with a keepAliveInterval set are alive. If + * a service is deemed not alive, all the service's policies are deleted, both + * in the repository and in the affected Rics, and the service is removed from + * the repository. This means that the service needs to register again after + * this. */ @Component @EnableScheduling @@ -47,39 +52,55 @@ public class ServiceSupervision { private final Services services; private final Policies policies; private A1ClientFactory a1ClientFactory; + private final Duration checkInterval; @Autowired public ServiceSupervision(Services services, Policies policies, A1ClientFactory a1ClientFactory) { + this(services, policies, a1ClientFactory, Duration.ofMinutes(1)); + } + + public ServiceSupervision(Services services, Policies policies, A1ClientFactory a1ClientFactory, + Duration checkInterval) { this.services = services; this.policies = policies; this.a1ClientFactory = a1ClientFactory; + this.checkInterval = checkInterval; + start(); } - @Scheduled(fixedRate = 1000 * 60) - public void checkAllServices() { + private void start() { logger.debug("Checking services starting"); - createTask().subscribe(this::onPolicyDeleted, null, this::onComplete); + createTask().subscribe(null, null, () -> logger.error("Checking services unexpectedly terminated")); } - @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally - private void onPolicyDeleted(Policy policy) { - logger.debug("Policy deleted due to inactivity: {}, service: {}", policy.id(), policy.ownerServiceName()); + private Flux createTask() { + return Flux.interval(this.checkInterval) // + .flatMap(notUsed -> checkAllServices()); } - private void onComplete() { - logger.debug("Checking services completed"); + Flux checkAllServices() { + return Flux.fromIterable(services.getAll()) // + .filter(Service::isExpired) // + .doOnNext(service -> logger.info("Service is expired: {}", service.getName())) // + .doOnNext(service -> services.remove(service.getName())) // + .flatMap(this::getAllPoliciesForService) // + .flatMap(this::deletePolicy); } - private Flux createTask() { - synchronized (services) { - return Flux.fromIterable(services.getAll()) // - .filter(Service::isExpired) // - .doOnNext(service -> logger.info("Service is expired: {}", service.getName())) // - .doOnNext(service -> services.remove(service.getName())) // - .flatMap(this::getAllPoliciesForService) // - .doOnNext(policies::remove) // - .flatMap(this::deletePolicyInRic); - } + @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally + private Flux deletePolicy(Policy policy) { + Lock lock = policy.ric().getLock(); + return lock.lock(LockType.SHARED) // + .doOnNext(notUsed -> policies.remove(policy)) // + .flatMap(notUsed -> deletePolicyInRic(policy)) + .doOnNext(notUsed -> logger.debug("Policy deleted due to service inactivity: {}, service: {}", policy.id(), + policy.ownerServiceName())) // + .doOnNext(notUsed -> lock.unlockBlocking()) // + .doOnError(throwable -> lock.unlockBlocking()) // + .doOnError(throwable -> logger.debug("Failed to delete inactive policy: {}, reason: {}", policy.id(), + throwable.getMessage())) // + .flatMapMany(notUsed -> Flux.just(policy)) // + .onErrorResume(throwable -> Flux.empty()); } private Flux getAllPoliciesForService(Service service) { diff --git a/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java b/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java index 377ca79e..a5bf3cb7 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java @@ -59,9 +59,12 @@ import org.oransc.policyagent.repository.Ric; import org.oransc.policyagent.repository.Ric.RicState; import org.oransc.policyagent.repository.Rics; import org.oransc.policyagent.repository.Services; -import org.oransc.policyagent.tasks.RepositorySupervision; +import org.oransc.policyagent.tasks.RicSupervision; +import org.oransc.policyagent.tasks.ServiceSupervision; import org.oransc.policyagent.utils.MockA1Client; import org.oransc.policyagent.utils.MockA1ClientFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; @@ -84,6 +87,8 @@ import reactor.test.StepVerifier; @ExtendWith(SpringExtension.class) @SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT) public class ApplicationTest { + private static final Logger logger = LoggerFactory.getLogger(ApplicationTest.class); + @Autowired ApplicationContext context; @@ -100,7 +105,7 @@ public class ApplicationTest { MockA1ClientFactory a1ClientFactory; @Autowired - RepositorySupervision supervision; + RicSupervision supervision; @Autowired Services services; @@ -122,6 +127,9 @@ public class ApplicationTest { @TestConfiguration static class TestBeanFactory { private final PolicyTypes policyTypes = new PolicyTypes(); + private final Services services = new Services(); + private final Policies policies = new Policies(); + MockA1ClientFactory a1ClientFactory = null; @Bean public ApplicationConfig getApplicationConfig() { @@ -130,13 +138,32 @@ public class ApplicationTest { @Bean MockA1ClientFactory getA1ClientFactory() { - return new MockA1ClientFactory(this.policyTypes); + if (a1ClientFactory == null) { + this.a1ClientFactory = new MockA1ClientFactory(this.policyTypes); + } + return this.a1ClientFactory; } @Bean public PolicyTypes getPolicyTypes() { return this.policyTypes; } + + @Bean + Policies getPolicies() { + return this.policies; + } + + @Bean + Services getServices() { + return this.services; + } + + @Bean + public ServiceSupervision getServiceSupervision() { + Duration checkInterval = Duration.ofMillis(1); + return new ServiceSupervision(this.services, this.policies, this.getA1ClientFactory(), checkInterval); + } } @LocalServerPort @@ -327,7 +354,7 @@ public class ApplicationTest { String url = "/policy_schema?id=type1"; String rsp = restClient().get(url).block(); - System.out.println(rsp); + logger.info(rsp); assertThat(rsp).contains("type1"); assertThat(rsp).contains("title"); @@ -361,7 +388,7 @@ public class ApplicationTest { String url = "/policies"; String rsp = restClient().get(url).block(); - System.out.println(rsp); + logger.info(rsp); List info = parseList(rsp, PolicyInfo.class); assertThat(info).size().isEqualTo(1); PolicyInfo policyInfo = info.get(0); @@ -379,14 +406,14 @@ public class ApplicationTest { String url = "/policies?type=type1"; String rsp = restClient().get(url).block(); - System.out.println(rsp); + logger.info(rsp); assertThat(rsp).contains("id1"); assertThat(rsp).contains("id2"); assertThat(rsp.contains("id3")).isFalse(); url = "/policies?type=type1&service=service2"; rsp = restClient().get(url).block(); - System.out.println(rsp); + logger.info(rsp); assertThat(rsp.contains("id1")).isFalse(); assertThat(rsp).contains("id2"); assertThat(rsp.contains("id3")).isFalse(); @@ -403,7 +430,7 @@ public class ApplicationTest { @Test public void testPutAndGetService() throws Exception { // PUT - putService("name"); + putService("name", 0); // GET one service String url = "/services?name=name"; @@ -411,14 +438,14 @@ public class ApplicationTest { List info = parseList(rsp, ServiceStatus.class); assertThat(info.size()).isEqualTo(1); ServiceStatus status = info.iterator().next(); - assertThat(status.keepAliveIntervalSeconds).isEqualTo(1); + assertThat(status.keepAliveIntervalSeconds).isEqualTo(0); assertThat(status.serviceName).isEqualTo("name"); // GET (all) url = "/services"; rsp = restClient().get(url).block(); assertThat(rsp.contains("name")).isTrue(); - System.out.println(rsp); + logger.info(rsp); // Keep alive url = "/services/keepalive?name=name"; @@ -435,12 +462,30 @@ public class ApplicationTest { testErrorCode(restClient().post("/services/keepalive?name=name", ""), HttpStatus.NOT_FOUND); // PUT servive with crap payload - testErrorCode(restClient().put("/service", "junk"), HttpStatus.BAD_REQUEST); + testErrorCode(restClient().put("/service", "crap"), HttpStatus.BAD_REQUEST); + testErrorCode(restClient().put("/service", "{}"), HttpStatus.BAD_REQUEST); // GET non existing servive testErrorCode(restClient().get("/services?name=XXX"), HttpStatus.NOT_FOUND); } + @Test + public void testServiceSupervision() throws Exception { + putService("service1", 1); + addPolicyType("type1", "ric1"); + + String url = putPolicyUrl("service1", "ric1", "type1", "instance1"); + final String policyBody = jsonString(); + restClient().put(url, policyBody).block(); + + assertThat(policies.size()).isEqualTo(1); + assertThat(services.size()).isEqualTo(1); + + // Timeout after ~1 second + await().untilAsserted(() -> assertThat(policies.size()).isEqualTo(0)); + assertThat(services.size()).isEqualTo(0); + } + @Test public void testGetPolicyStatus() throws Exception { addPolicy("id", "typeName", "service1", "ric1"); @@ -471,16 +516,21 @@ public class ApplicationTest { return addPolicy(id, typeName, service, "ric"); } - private String createServiceJson(String name) { - ServiceRegistrationInfo service = new ServiceRegistrationInfo(name, 1, "callbackUrl"); + private String createServiceJson(String name, long keepAliveIntervalSeconds) { + ServiceRegistrationInfo service = new ServiceRegistrationInfo(name, keepAliveIntervalSeconds, "callbackUrl"); String json = gson.toJson(service); return json; } private void putService(String name) { + putService(name, 0); + } + + private void putService(String name, long keepAliveIntervalSeconds) { String url = "/service"; - restClient().put(url, createServiceJson(name)).block(); + String body = createServiceJson(name, keepAliveIntervalSeconds); + restClient().put(url, body).block(); } private String baseUrl() { @@ -496,9 +546,9 @@ public class ApplicationTest { private final String baseUrl; static AtomicInteger nextCount = new AtomicInteger(0); private final int count; - private final RepositorySupervision supervision; + private final RicSupervision supervision; - ConcurrencyTestRunnable(String baseUrl, RepositorySupervision supervision) { + ConcurrencyTestRunnable(String baseUrl, RicSupervision supervision) { this.baseUrl = baseUrl; this.count = nextCount.incrementAndGet(); this.supervision = supervision; @@ -543,7 +593,7 @@ public class ApplicationTest { t.join(); } assertThat(policies.size()).isEqualTo(0); - System.out.println("Concurrency test took " + Duration.between(startTime, Instant.now())); + logger.info("Concurrency test took " + Duration.between(startTime, Instant.now())); } private AsyncRestClient restClient() { diff --git a/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java b/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java index b0f1e7f4..efbd5765 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java @@ -37,6 +37,8 @@ import org.oransc.policyagent.repository.PolicyType; import org.oransc.policyagent.repository.PolicyTypes; import org.oransc.policyagent.repository.Rics; import org.oransc.policyagent.utils.MockA1ClientFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; @@ -48,6 +50,7 @@ import org.springframework.test.context.junit.jupiter.SpringExtension; @ExtendWith(SpringExtension.class) @SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT) public class MockPolicyAgent { + private static final Logger logger = LoggerFactory.getLogger(MockPolicyAgent.class); @Autowired Rics rics; @@ -117,7 +120,7 @@ public class MockPolicyAgent { PolicyType type = ImmutablePolicyType.builder().name(typeName).schema(schema).build(); policyTypes.put(type); } catch (Exception e) { - System.out.println("Could not load json schema " + e); + logger.error("Could not load json schema ", e); } } } @@ -127,13 +130,13 @@ public class MockPolicyAgent { private int port; private void keepServerAlive() { - System.out.println("Keeping server alive!"); + logger.info("Keeping server alive!"); try { synchronized (this) { this.wait(); } } catch (Exception ex) { - System.out.println("Unexpected: " + ex.toString()); + logger.error("Unexpected: " + ex); } } diff --git a/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageHandlerTest.java b/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageHandlerTest.java index d034d4d0..52147a85 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageHandlerTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageHandlerTest.java @@ -52,6 +52,8 @@ import org.oransc.policyagent.dmaap.DmaapRequestMessage.Operation; import org.oransc.policyagent.repository.ImmutablePolicyType; import org.oransc.policyagent.repository.PolicyType; import org.oransc.policyagent.utils.LoggingUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.http.HttpStatus; import org.springframework.http.ResponseEntity; import org.springframework.web.reactive.function.client.WebClientResponseException; @@ -60,7 +62,7 @@ import reactor.core.publisher.Mono; import reactor.test.StepVerifier; public class DmaapMessageHandlerTest { - + private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandlerTest.class); private static final String URL = "url"; private final MRBatchingPublisher dmaapClient = mock(MRBatchingPublisher.class); @@ -112,13 +114,13 @@ public class DmaapMessageHandlerTest { @Test public void testMessageParsing() { String message = dmaapInputMessage(Operation.DELETE); - System.out.println(message); + logger.info(message); DmaapRequestMessage parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class); assertTrue(parsedMessage != null); assertFalse(parsedMessage.payload().isPresent()); message = dmaapInputMessage(Operation.PUT); - System.out.println(message); + logger.info(message); parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class); assertTrue(parsedMessage != null); assertTrue(parsedMessage.payload().isPresent()); diff --git a/policy-agent/src/test/java/org/oransc/policyagent/repository/LockTest.java b/policy-agent/src/test/java/org/oransc/policyagent/repository/LockTest.java index 825010ab..6fd6c8b9 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/repository/LockTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/repository/LockTest.java @@ -71,10 +71,8 @@ public class LockTest { Lock lock = new Lock(); Mono seq = lock.lock(LockType.EXCLUSIVE) // - .doOnNext(l -> System.out.println("1 " + l)) // .flatMap(l -> lock.lock(LockType.EXCLUSIVE)) // - .flatMap(l -> lock.unlock()) // - .doOnNext(l -> System.out.println("2 " + l)); // + .flatMap(l -> lock.unlock()); asynchUnlock(lock); StepVerifier.create(seq) // diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSupervisionTest.java similarity index 88% rename from policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java rename to policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSupervisionTest.java index d837f78d..8d3dd942 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSupervisionTest.java @@ -56,7 +56,7 @@ import org.oransc.policyagent.repository.Rics; import reactor.core.publisher.Mono; @ExtendWith(MockitoExtension.class) -public class RepositorySupervisionTest { +public class RicSupervisionTest { private static final String POLICY_TYPE_1_NAME = "type1"; private static final PolicyType POLICY_TYPE_1 = ImmutablePolicyType.builder() // .name(POLICY_TYPE_1_NAME) // @@ -133,8 +133,7 @@ public class RepositorySupervisionTest { setUpGetPolicyIdentitiesToReturn(new ArrayList<>(Arrays.asList(POLICY_1_ID))); setUpGetPolicyTypeIdentitiesToReturn(new ArrayList<>(Arrays.asList(POLICY_TYPE_1_NAME))); - RepositorySupervision supervisorUnderTest = - spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null)); + RicSupervision supervisorUnderTest = spy(new RicSupervision(rics, policies, a1ClientFactory, types, null)); supervisorUnderTest.checkAllRics(); @@ -147,8 +146,7 @@ public class RepositorySupervisionTest { RIC_1.setState(RicState.UNDEFINED); rics.put(RIC_1); - RepositorySupervision supervisorUnderTest = - spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null)); + RicSupervision supervisorUnderTest = spy(new RicSupervision(rics, policies, a1ClientFactory, types, null)); doReturn(recoveryTaskMock).when(supervisorUnderTest).createSynchronizationTask(); @@ -165,8 +163,7 @@ public class RepositorySupervisionTest { RIC_1.setState(RicState.SYNCHRONIZING); rics.put(RIC_1); - RepositorySupervision supervisorUnderTest = - spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null)); + RicSupervision supervisorUnderTest = spy(new RicSupervision(rics, policies, a1ClientFactory, types, null)); supervisorUnderTest.checkAllRics(); @@ -182,8 +179,7 @@ public class RepositorySupervisionTest { setUpGetPolicyIdentitiesToReturn(new Exception("Failed")); - RepositorySupervision supervisorUnderTest = - spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null)); + RicSupervision supervisorUnderTest = spy(new RicSupervision(rics, policies, a1ClientFactory, types, null)); supervisorUnderTest.checkAllRics(); verify(supervisorUnderTest).checkAllRics(); @@ -200,8 +196,7 @@ public class RepositorySupervisionTest { setUpGetPolicyIdentitiesToReturn(new ArrayList<>(Arrays.asList(POLICY_1_ID))); - RepositorySupervision supervisorUnderTest = - spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null)); + RicSupervision supervisorUnderTest = spy(new RicSupervision(rics, policies, a1ClientFactory, types, null)); doReturn(recoveryTaskMock).when(supervisorUnderTest).createSynchronizationTask(); @@ -223,8 +218,7 @@ public class RepositorySupervisionTest { setUpGetPolicyIdentitiesToReturn(new ArrayList<>(Arrays.asList(POLICY_1_ID, "Another_policy"))); - RepositorySupervision supervisorUnderTest = - spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null)); + RicSupervision supervisorUnderTest = spy(new RicSupervision(rics, policies, a1ClientFactory, types, null)); doReturn(recoveryTaskMock).when(supervisorUnderTest).createSynchronizationTask(); @@ -245,8 +239,7 @@ public class RepositorySupervisionTest { setUpGetPolicyIdentitiesToReturn(Collections.emptyList()); setUpGetPolicyTypeIdentitiesToReturn(new Exception("Failed")); - RepositorySupervision supervisorUnderTest = - spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null)); + RicSupervision supervisorUnderTest = spy(new RicSupervision(rics, policies, a1ClientFactory, types, null)); supervisorUnderTest.checkAllRics(); verify(supervisorUnderTest).checkAllRics(); @@ -264,8 +257,7 @@ public class RepositorySupervisionTest { setUpGetPolicyIdentitiesToReturn(Collections.emptyList()); setUpGetPolicyTypeIdentitiesToReturn(new ArrayList<>(Arrays.asList(POLICY_TYPE_1_NAME, "another_policy_type"))); - RepositorySupervision supervisorUnderTest = - spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null)); + RicSupervision supervisorUnderTest = spy(new RicSupervision(rics, policies, a1ClientFactory, types, null)); doReturn(recoveryTaskMock).when(supervisorUnderTest).createSynchronizationTask(); @@ -292,8 +284,7 @@ public class RepositorySupervisionTest { setUpGetPolicyIdentitiesToReturn(Collections.emptyList()); setUpGetPolicyTypeIdentitiesToReturn(new ArrayList<>(Arrays.asList(POLICY_TYPE_1_NAME, "another_policy_type"))); - RepositorySupervision supervisorUnderTest = - spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null)); + RicSupervision supervisorUnderTest = spy(new RicSupervision(rics, policies, a1ClientFactory, types, null)); doReturn(recoveryTaskMock).when(supervisorUnderTest).createSynchronizationTask(); diff --git a/policy-agent/src/test/java/org/oransc/policyagent/tasks/ServiceSupervisionTest.java b/policy-agent/src/test/java/org/oransc/policyagent/tasks/ServiceSupervisionTest.java index a9845285..070e8dab 100644 --- a/policy-agent/src/test/java/org/oransc/policyagent/tasks/ServiceSupervisionTest.java +++ b/policy-agent/src/test/java/org/oransc/policyagent/tasks/ServiceSupervisionTest.java @@ -101,7 +101,7 @@ public class ServiceSupervisionTest { await().atMost(Durations.FIVE_SECONDS).with().pollInterval(Durations.ONE_SECOND).until(service::isExpired); - serviceSupervisionUnderTest.checkAllServices(); + serviceSupervisionUnderTest.checkAllServices().blockLast(); assertThat(policies.size()).isEqualTo(0); assertThat(services.size()).isEqualTo(0); @@ -125,7 +125,7 @@ public class ServiceSupervisionTest { final ListAppender logAppender = LoggingUtils.getLogListAppender(ServiceSupervision.class, WARN); - serviceSupervisionUnderTest.checkAllServices(); + serviceSupervisionUnderTest.checkAllServices().blockLast(); assertThat(policies.size()).isEqualTo(0); assertThat(services.size()).isEqualTo(0); @@ -143,7 +143,7 @@ public class ServiceSupervisionTest { ServiceSupervision serviceSupervisionUnderTest = new ServiceSupervision(services, policies, a1ClientFactoryMock); - serviceSupervisionUnderTest.checkAllServices(); + serviceSupervisionUnderTest.checkAllServices().blockLast(); assertThat(policies.size()).isEqualTo(1); assertThat(services.size()).isEqualTo(1); @@ -159,7 +159,7 @@ public class ServiceSupervisionTest { ServiceSupervision serviceSupervisionUnderTest = new ServiceSupervision(services, policies, a1ClientFactoryMock); - serviceSupervisionUnderTest.checkAllServices(); + serviceSupervisionUnderTest.checkAllServices().blockLast(); assertThat(policies.size()).isEqualTo(1); assertThat(services.size()).isEqualTo(1);