Added a test for service supervision 50/2750/5
authorPatrikBuhr <patrik.buhr@est.tech>
Wed, 11 Mar 2020 13:15:50 +0000 (14:15 +0100)
committerPatrikBuhr <patrik.buhr@est.tech>
Thu, 12 Mar 2020 08:13:13 +0000 (09:13 +0100)
Added that when service supervision deletes policices, it will
grant a shared lock of the RIC (which it must).

PUT and DELETE a policy will trigger keep alive of the owning service

Change-Id: I6770694a5dbff3b8d635a9997265292ad3c0f524
Issue-ID: NONRTRIC-155
Signed-off-by: PatrikBuhr <patrik.buhr@est.tech>
17 files changed:
policy-agent/config/application.yaml
policy-agent/docs/api.yaml
policy-agent/src/main/java/org/oransc/policyagent/controllers/PolicyController.java
policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceController.java
policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceRegistrationInfo.java
policy-agent/src/main/java/org/oransc/policyagent/controllers/ServiceStatus.java
policy-agent/src/main/java/org/oransc/policyagent/repository/Service.java
policy-agent/src/main/java/org/oransc/policyagent/repository/Services.java
policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSupervision.java [moved from policy-agent/src/main/java/org/oransc/policyagent/tasks/RepositorySupervision.java with 91% similarity]
policy-agent/src/main/java/org/oransc/policyagent/tasks/RicSynchronizationTask.java
policy-agent/src/main/java/org/oransc/policyagent/tasks/ServiceSupervision.java
policy-agent/src/test/java/org/oransc/policyagent/ApplicationTest.java
policy-agent/src/test/java/org/oransc/policyagent/MockPolicyAgent.java
policy-agent/src/test/java/org/oransc/policyagent/dmaap/DmaapMessageHandlerTest.java
policy-agent/src/test/java/org/oransc/policyagent/repository/LockTest.java
policy-agent/src/test/java/org/oransc/policyagent/tasks/RicSupervisionTest.java [moved from policy-agent/src/test/java/org/oransc/policyagent/tasks/RepositorySupervisionTest.java with 88% similarity]
policy-agent/src/test/java/org/oransc/policyagent/tasks/ServiceSupervisionTest.java

index 90b73a4..9fb3fba 100644 (file)
@@ -15,7 +15,7 @@ logging:
     org.springframework: ERROR
     org.springframework.data: ERROR
     org.springframework.web.reactive.function.client.ExchangeFunctions: ERROR
-    org.oransc.policyagent: WARN
+    org.oransc.policyagent: INFO
   file: /var/log/policy-agent/application.log
 app:
   filepath: /opt/app/policy-agent/config/application_configuration.json
index 9c7d96e..a2f1b57 100644 (file)
@@ -431,11 +431,11 @@ paths:
         '200':
           description: Policy updated
           schema:
-            type: string
+            type: object
         '201':
           description: Policy created
           schema:
-            type: string
+            type: object
         '401':
           description: Unauthorized
         '403':
@@ -869,6 +869,8 @@ definitions:
     title: RicInfo
   ServiceRegistrationInfo:
     type: object
+    required:
+      - serviceName
     properties:
       callbackUrl:
         type: string
index 1af607f..987cd97 100644 (file)
@@ -33,7 +33,6 @@ import java.util.Collection;
 import java.util.List;
 
 import org.oransc.policyagent.clients.A1ClientFactory;
-import org.oransc.policyagent.configuration.ApplicationConfig;
 import org.oransc.policyagent.exceptions.ServiceException;
 import org.oransc.policyagent.repository.ImmutablePolicy;
 import org.oransc.policyagent.repository.Lock.LockType;
@@ -43,6 +42,8 @@ import org.oransc.policyagent.repository.PolicyType;
 import org.oransc.policyagent.repository.PolicyTypes;
 import org.oransc.policyagent.repository.Ric;
 import org.oransc.policyagent.repository.Rics;
+import org.oransc.policyagent.repository.Service;
+import org.oransc.policyagent.repository.Services;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
@@ -58,24 +59,21 @@ import reactor.core.publisher.Mono;
 @Api(tags = "A1 Policy Management")
 public class PolicyController {
 
-    private final Rics rics;
-    private final PolicyTypes policyTypes;
-    private final Policies policies;
-    private final A1ClientFactory a1ClientFactory;
+    @Autowired
+    private Rics rics;
+    @Autowired
+    private PolicyTypes policyTypes;
+    @Autowired
+    private Policies policies;
+    @Autowired
+    private A1ClientFactory a1ClientFactory;
+    @Autowired
+    private Services services;
 
     private static Gson gson = new GsonBuilder() //
         .serializeNulls() //
         .create(); //
 
-    @Autowired
-    PolicyController(ApplicationConfig config, PolicyTypes types, Policies policies, Rics rics,
-        A1ClientFactory a1ClientFactory) {
-        this.policyTypes = types;
-        this.policies = policies;
-        this.rics = rics;
-        this.a1ClientFactory = a1ClientFactory;
-    }
-
     @GetMapping("/policy_schemas")
     @ApiOperation(value = "Returns policy type schema definitions")
     @ApiResponses(
@@ -165,8 +163,13 @@ public class PolicyController {
             @ApiResponse(code = 423, message = "RIC is locked", response = String.class)})
     public Mono<ResponseEntity<Object>> deletePolicy( //
         @RequestParam(name = "instance", required = true) String id) {
-        Policy policy = policies.get(id);
-        if (policy != null && policy.ric().getState() == Ric.RicState.IDLE) {
+        Policy policy;
+        try {
+            policy = policies.getPolicy(id);
+            keepServiceAlive(policy.ownerServiceName());
+            if (policy.ric().getState() != Ric.RicState.IDLE) {
+                return Mono.just(new ResponseEntity<>("Busy, recovering", HttpStatus.LOCKED));
+            }
             Ric ric = policy.ric();
             return ric.getLock().lock(LockType.SHARED) // //
                 .flatMap(lock -> a1ClientFactory.createA1Client(policy.ric())) //
@@ -175,9 +178,7 @@ public class PolicyController {
                 .doOnNext(notUsed -> ric.getLock().unlockBlocking()) //
                 .doOnError(notUsed -> ric.getLock().unlockBlocking()) //
                 .flatMap(notUsed -> Mono.just(new ResponseEntity<>(HttpStatus.NO_CONTENT)));
-        } else if (policy != null) {
-            return Mono.just(new ResponseEntity<>("Busy, recovering", HttpStatus.LOCKED));
-        } else {
+        } catch (ServiceException e) {
             return Mono.just(new ResponseEntity<>(HttpStatus.NOT_FOUND));
         }
     }
@@ -186,8 +187,8 @@ public class PolicyController {
     @ApiOperation(value = "Put a policy", response = String.class)
     @ApiResponses(
         value = { //
-            @ApiResponse(code = 201, message = "Policy created"), //
-            @ApiResponse(code = 200, message = "Policy updated"), //
+            @ApiResponse(code = 201, message = "Policy created", response = Object.class), //
+            @ApiResponse(code = 200, message = "Policy updated", response = Object.class), //
             @ApiResponse(code = 423, message = "RIC is locked", response = String.class), //
             @ApiResponse(code = 404, message = "RIC or policy type is not found", response = String.class), //
             @ApiResponse(code = 405, message = "Change is not allowed", response = String.class)})
@@ -201,6 +202,7 @@ public class PolicyController {
         String jsonString = gson.toJson(jsonBody);
         Ric ric = rics.get(ricName);
         PolicyType type = policyTypes.get(typeName);
+        keepServiceAlive(service);
         if (ric != null && type != null && ric.getState() == Ric.RicState.IDLE) {
             Policy policy = ImmutablePolicy.builder() //
                 .id(instanceId) //
@@ -301,6 +303,13 @@ public class PolicyController {
         }
     }
 
+    private void keepServiceAlive(String name) {
+        Service s = this.services.get(name);
+        if (s != null) {
+            s.keepAlive();
+        }
+    }
+
     private boolean include(String filter, String value) {
         return filter == null || value.equals(filter);
     }
index 35348c7..3d36228 100644 (file)
@@ -57,7 +57,6 @@ public class ServiceController {
     private final Policies policies;
 
     private static Gson gson = new GsonBuilder() //
-        .serializeNulls() //
         .create(); //
 
     @Autowired
@@ -97,6 +96,12 @@ public class ServiceController {
             s.getCallbackUrl());
     }
 
+    private void validateRegistrationInfo(ServiceRegistrationInfo registrationInfo) throws ServiceException {
+        if (registrationInfo.serviceName.isEmpty()) {
+            throw new ServiceException("Missing mandatory parameter 'serviceName'");
+        }
+    }
+
     @ApiOperation(value = "Register a service")
     @ApiResponses(
         value = { //
@@ -106,6 +111,7 @@ public class ServiceController {
     public ResponseEntity<String> putService(//
         @RequestBody ServiceRegistrationInfo registrationInfo) {
         try {
+            validateRegistrationInfo(registrationInfo);
             this.services.put(toService(registrationInfo));
             return new ResponseEntity<>("OK", HttpStatus.OK);
         } catch (Exception e) {
@@ -132,18 +138,18 @@ public class ServiceController {
         }
     }
 
-    @ApiOperation(value = "Keep the policies alive for a service")
+    @ApiOperation(value = "Heartbeat from a serice")
     @ApiResponses(
         value = { //
-            @ApiResponse(code = 200, message = "Policies timeout supervision refreshed"),
+            @ApiResponse(code = 200, message = "Service supervision timer refreshed, OK"),
             @ApiResponse(code = 404, message = "The service is not found, needs re-registration")})
     @PostMapping("/services/keepalive")
     public ResponseEntity<String> keepAliveService(//
         @RequestParam(name = "name", required = true) String serviceName) {
         try {
-            services.getService(serviceName).ping();
+            services.getService(serviceName).keepAlive();
             return new ResponseEntity<>("OK", HttpStatus.OK);
-        } catch (Exception e) {
+        } catch (ServiceException e) {
             return new ResponseEntity<>(e.getMessage(), HttpStatus.NOT_FOUND);
         }
     }
index 907fa1c..e532a36 100644 (file)
@@ -20,6 +20,8 @@
 
 package org.oransc.policyagent.controllers;
 
+import com.google.gson.annotations.SerializedName;
+
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 
@@ -29,16 +31,23 @@ import org.immutables.gson.Gson;
 @ApiModel(value = "ServiceRegistrationInfo")
 public class ServiceRegistrationInfo {
 
-    @ApiModelProperty(value = "identity of the service")
-    public String serviceName;
+    @ApiModelProperty(value = "identity of the service", required = true, allowEmptyValue = false)
+    @SerializedName(value = "serviceName", alternate = {"name"})
 
-    @ApiModelProperty(
-        value = "keep alive interval for policies owned by the service. 0 means no timeout supervision."
-            + " Polcies that are not refreshed within this time are removed")
-    public long keepAliveIntervalSeconds;
+    public String serviceName = "";
 
-    @ApiModelProperty(value = "callback for notifying of RIC recovery")
-    public String callbackUrl;
+    @ApiModelProperty(
+        value = "keep alive interval for the service. This is a heartbeat supervision of the service, "
+            + "which in regular intevals must invoke a 'keepAlive' REST call. "
+            + "When a service does not invoke this call within the given time, it is considered unavailble. "
+            + "An unavailable service will be automatically deregistered and its policies will be deleted. "
+            + "Value 0 means no timeout supervision.")
+    @SerializedName("keepAliveIntervalSeconds")
+    public long keepAliveIntervalSeconds = 0;
+
+    @ApiModelProperty(value = "callback for notifying of RIC recovery", required = false, allowEmptyValue = true)
+    @SerializedName("callbackUrl")
+    public String callbackUrl = "";
 
     public ServiceRegistrationInfo() {
     }
index 1a9cf32..8f4daac 100644 (file)
@@ -36,7 +36,7 @@ public class ServiceStatus {
     public final long keepAliveIntervalSeconds;
 
     @ApiModelProperty(value = "time since last invocation by the service")
-    public final long timeSincePingSeconds;
+    public final long timeSinceLastActivitySeconds;
 
     @ApiModelProperty(value = "callback for notifying of RIC recovery")
     public String callbackUrl;
@@ -44,7 +44,7 @@ public class ServiceStatus {
     ServiceStatus(String name, long keepAliveIntervalSeconds, long timeSincePingSeconds, String callbackUrl) {
         this.serviceName = name;
         this.keepAliveIntervalSeconds = keepAliveIntervalSeconds;
-        this.timeSincePingSeconds = timeSincePingSeconds;
+        this.timeSinceLastActivitySeconds = timeSincePingSeconds;
         this.callbackUrl = callbackUrl;
     }
 
index f0863a5..7b2c9bd 100644 (file)
@@ -36,14 +36,14 @@ public class Service {
         this.name = name;
         this.keepAliveInterval = keepAliveInterval;
         this.callbackUrl = callbackUrl;
-        ping();
+        keepAlive();
     }
 
     public synchronized Duration getKeepAliveInterval() {
         return this.keepAliveInterval;
     }
 
-    public synchronized void ping() {
+    public synchronized void keepAlive() {
         this.lastPing = Instant.now();
     }
 
index 568f002..f6c55dc 100644 (file)
@@ -20,6 +20,7 @@
 
 package org.oransc.policyagent.repository;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -50,7 +51,7 @@ public class Services {
     }
 
     public synchronized Iterable<Service> getAll() {
-        return registeredServices.values();
+        return Collections.unmodifiableCollection(registeredServices.values());
     }
 
     public synchronized void remove(String name) {
@@ -47,8 +47,8 @@ import reactor.core.publisher.Mono;
  */
 @Component
 @EnableScheduling
-public class RepositorySupervision {
-    private static final Logger logger = LoggerFactory.getLogger(RepositorySupervision.class);
+public class RicSupervision {
+    private static final Logger logger = LoggerFactory.getLogger(RicSupervision.class);
 
     private final Rics rics;
     private final Policies policies;
@@ -57,7 +57,7 @@ public class RepositorySupervision {
     private final Services services;
 
     @Autowired
-    public RepositorySupervision(Rics rics, Policies policies, A1ClientFactory a1ClientFactory, PolicyTypes policyTypes,
+    public RicSupervision(Rics rics, Policies policies, A1ClientFactory a1ClientFactory, PolicyTypes policyTypes,
         Services services) {
         this.rics = rics;
         this.policies = policies;
@@ -72,7 +72,9 @@ public class RepositorySupervision {
     @Scheduled(fixedRate = 1000 * 60)
     public void checkAllRics() {
         logger.debug("Checking Rics starting");
-        createTask().subscribe(this::onRicChecked, null, this::onComplete);
+        createTask().subscribe(ric -> logger.debug("Ric: {} checked", ric.ric.name()), //
+            null, //
+            () -> logger.debug("Checking Rics completed"));
     }
 
     private Flux<RicData> createTask() {
@@ -163,15 +165,6 @@ public class RepositorySupervision {
         return Mono.error(new Exception("Syncronization started"));
     }
 
-    @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
-    private void onRicChecked(RicData ric) {
-        logger.debug("Ric: {} checked", ric.ric.name());
-    }
-
-    private void onComplete() {
-        logger.debug("Checking Rics completed");
-    }
-
     RicSynchronizationTask createSynchronizationTask() {
         return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services);
     }
index 9f88d5c..0a0ab82 100644 (file)
@@ -103,7 +103,7 @@ public class RicSynchronizationTask {
 
     @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
     private void onSynchronizationComplete(Ric ric) {
-        logger.debug("Synchronization completed for: {}", ric.name());
+        logger.info("Synchronization completed for: {}", ric.name());
         ric.setState(RicState.IDLE);
         notifyAllServices("Synchronization completed for:" + ric.name());
     }
index 8d7062b..4be26eb 100644 (file)
 
 package org.oransc.policyagent.tasks;
 
+import java.time.Duration;
+
 import org.oransc.policyagent.clients.A1ClientFactory;
+import org.oransc.policyagent.repository.Lock;
+import org.oransc.policyagent.repository.Lock.LockType;
 import org.oransc.policyagent.repository.Policies;
 import org.oransc.policyagent.repository.Policy;
 import org.oransc.policyagent.repository.Service;
@@ -29,16 +33,17 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.scheduling.annotation.EnableScheduling;
-import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
 /**
- * Periodically checks that services with a keepAliveInterval set are alive. If a service is deemed not alive,
- * all the service's policies are deleted, both in the repository and in the affected Rics, and the service is
- * removed from the repository. This means that the service needs to register again after this.
+ * Periodically checks that services with a keepAliveInterval set are alive. If
+ * a service is deemed not alive, all the service's policies are deleted, both
+ * in the repository and in the affected Rics, and the service is removed from
+ * the repository. This means that the service needs to register again after
+ * this.
  */
 @Component
 @EnableScheduling
@@ -47,39 +52,55 @@ public class ServiceSupervision {
     private final Services services;
     private final Policies policies;
     private A1ClientFactory a1ClientFactory;
+    private final Duration checkInterval;
 
     @Autowired
     public ServiceSupervision(Services services, Policies policies, A1ClientFactory a1ClientFactory) {
+        this(services, policies, a1ClientFactory, Duration.ofMinutes(1));
+    }
+
+    public ServiceSupervision(Services services, Policies policies, A1ClientFactory a1ClientFactory,
+        Duration checkInterval) {
         this.services = services;
         this.policies = policies;
         this.a1ClientFactory = a1ClientFactory;
+        this.checkInterval = checkInterval;
+        start();
     }
 
-    @Scheduled(fixedRate = 1000 * 60)
-    public void checkAllServices() {
+    private void start() {
         logger.debug("Checking services starting");
-        createTask().subscribe(this::onPolicyDeleted, null, this::onComplete);
+        createTask().subscribe(null, null, () -> logger.error("Checking services unexpectedly terminated"));
     }
 
-    @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
-    private void onPolicyDeleted(Policy policy) {
-        logger.debug("Policy deleted due to inactivity: {}, service: {}", policy.id(), policy.ownerServiceName());
+    private Flux<?> createTask() {
+        return Flux.interval(this.checkInterval) //
+            .flatMap(notUsed -> checkAllServices());
     }
 
-    private void onComplete() {
-        logger.debug("Checking services completed");
+    Flux<Policy> checkAllServices() {
+        return Flux.fromIterable(services.getAll()) //
+            .filter(Service::isExpired) //
+            .doOnNext(service -> logger.info("Service is expired: {}", service.getName())) //
+            .doOnNext(service -> services.remove(service.getName())) //
+            .flatMap(this::getAllPoliciesForService) //
+            .flatMap(this::deletePolicy);
     }
 
-    private Flux<Policy> createTask() {
-        synchronized (services) {
-            return Flux.fromIterable(services.getAll()) //
-                .filter(Service::isExpired) //
-                .doOnNext(service -> logger.info("Service is expired: {}", service.getName())) //
-                .doOnNext(service -> services.remove(service.getName())) //
-                .flatMap(this::getAllPoliciesForService) //
-                .doOnNext(policies::remove) //
-                .flatMap(this::deletePolicyInRic);
-        }
+    @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
+    private Flux<Policy> deletePolicy(Policy policy) {
+        Lock lock = policy.ric().getLock();
+        return lock.lock(LockType.SHARED) //
+            .doOnNext(notUsed -> policies.remove(policy)) //
+            .flatMap(notUsed -> deletePolicyInRic(policy))
+            .doOnNext(notUsed -> logger.debug("Policy deleted due to service inactivity: {}, service: {}", policy.id(),
+                policy.ownerServiceName())) //
+            .doOnNext(notUsed -> lock.unlockBlocking()) //
+            .doOnError(throwable -> lock.unlockBlocking()) //
+            .doOnError(throwable -> logger.debug("Failed to delete inactive policy: {}, reason: {}", policy.id(),
+                throwable.getMessage())) //
+            .flatMapMany(notUsed -> Flux.just(policy)) //
+            .onErrorResume(throwable -> Flux.empty());
     }
 
     private Flux<Policy> getAllPoliciesForService(Service service) {
index 377ca79..a5bf3cb 100644 (file)
@@ -59,9 +59,12 @@ import org.oransc.policyagent.repository.Ric;
 import org.oransc.policyagent.repository.Ric.RicState;
 import org.oransc.policyagent.repository.Rics;
 import org.oransc.policyagent.repository.Services;
-import org.oransc.policyagent.tasks.RepositorySupervision;
+import org.oransc.policyagent.tasks.RicSupervision;
+import org.oransc.policyagent.tasks.ServiceSupervision;
 import org.oransc.policyagent.utils.MockA1Client;
 import org.oransc.policyagent.utils.MockA1ClientFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
@@ -84,6 +87,8 @@ import reactor.test.StepVerifier;
 @ExtendWith(SpringExtension.class)
 @SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
 public class ApplicationTest {
+    private static final Logger logger = LoggerFactory.getLogger(ApplicationTest.class);
+
     @Autowired
     ApplicationContext context;
 
@@ -100,7 +105,7 @@ public class ApplicationTest {
     MockA1ClientFactory a1ClientFactory;
 
     @Autowired
-    RepositorySupervision supervision;
+    RicSupervision supervision;
 
     @Autowired
     Services services;
@@ -122,6 +127,9 @@ public class ApplicationTest {
     @TestConfiguration
     static class TestBeanFactory {
         private final PolicyTypes policyTypes = new PolicyTypes();
+        private final Services services = new Services();
+        private final Policies policies = new Policies();
+        MockA1ClientFactory a1ClientFactory = null;
 
         @Bean
         public ApplicationConfig getApplicationConfig() {
@@ -130,13 +138,32 @@ public class ApplicationTest {
 
         @Bean
         MockA1ClientFactory getA1ClientFactory() {
-            return new MockA1ClientFactory(this.policyTypes);
+            if (a1ClientFactory == null) {
+                this.a1ClientFactory = new MockA1ClientFactory(this.policyTypes);
+            }
+            return this.a1ClientFactory;
         }
 
         @Bean
         public PolicyTypes getPolicyTypes() {
             return this.policyTypes;
         }
+
+        @Bean
+        Policies getPolicies() {
+            return this.policies;
+        }
+
+        @Bean
+        Services getServices() {
+            return this.services;
+        }
+
+        @Bean
+        public ServiceSupervision getServiceSupervision() {
+            Duration checkInterval = Duration.ofMillis(1);
+            return new ServiceSupervision(this.services, this.policies, this.getA1ClientFactory(), checkInterval);
+        }
     }
 
     @LocalServerPort
@@ -327,7 +354,7 @@ public class ApplicationTest {
 
         String url = "/policy_schema?id=type1";
         String rsp = restClient().get(url).block();
-        System.out.println(rsp);
+        logger.info(rsp);
         assertThat(rsp).contains("type1");
         assertThat(rsp).contains("title");
 
@@ -361,7 +388,7 @@ public class ApplicationTest {
 
         String url = "/policies";
         String rsp = restClient().get(url).block();
-        System.out.println(rsp);
+        logger.info(rsp);
         List<PolicyInfo> info = parseList(rsp, PolicyInfo.class);
         assertThat(info).size().isEqualTo(1);
         PolicyInfo policyInfo = info.get(0);
@@ -379,14 +406,14 @@ public class ApplicationTest {
 
         String url = "/policies?type=type1";
         String rsp = restClient().get(url).block();
-        System.out.println(rsp);
+        logger.info(rsp);
         assertThat(rsp).contains("id1");
         assertThat(rsp).contains("id2");
         assertThat(rsp.contains("id3")).isFalse();
 
         url = "/policies?type=type1&service=service2";
         rsp = restClient().get(url).block();
-        System.out.println(rsp);
+        logger.info(rsp);
         assertThat(rsp.contains("id1")).isFalse();
         assertThat(rsp).contains("id2");
         assertThat(rsp.contains("id3")).isFalse();
@@ -403,7 +430,7 @@ public class ApplicationTest {
     @Test
     public void testPutAndGetService() throws Exception {
         // PUT
-        putService("name");
+        putService("name", 0);
 
         // GET one service
         String url = "/services?name=name";
@@ -411,14 +438,14 @@ public class ApplicationTest {
         List<ServiceStatus> info = parseList(rsp, ServiceStatus.class);
         assertThat(info.size()).isEqualTo(1);
         ServiceStatus status = info.iterator().next();
-        assertThat(status.keepAliveIntervalSeconds).isEqualTo(1);
+        assertThat(status.keepAliveIntervalSeconds).isEqualTo(0);
         assertThat(status.serviceName).isEqualTo("name");
 
         // GET (all)
         url = "/services";
         rsp = restClient().get(url).block();
         assertThat(rsp.contains("name")).isTrue();
-        System.out.println(rsp);
+        logger.info(rsp);
 
         // Keep alive
         url = "/services/keepalive?name=name";
@@ -435,12 +462,30 @@ public class ApplicationTest {
         testErrorCode(restClient().post("/services/keepalive?name=name", ""), HttpStatus.NOT_FOUND);
 
         // PUT servive with crap payload
-        testErrorCode(restClient().put("/service", "junk"), HttpStatus.BAD_REQUEST);
+        testErrorCode(restClient().put("/service", "crap"), HttpStatus.BAD_REQUEST);
+        testErrorCode(restClient().put("/service", "{}"), HttpStatus.BAD_REQUEST);
 
         // GET non existing servive
         testErrorCode(restClient().get("/services?name=XXX"), HttpStatus.NOT_FOUND);
     }
 
+    @Test
+    public void testServiceSupervision() throws Exception {
+        putService("service1", 1);
+        addPolicyType("type1", "ric1");
+
+        String url = putPolicyUrl("service1", "ric1", "type1", "instance1");
+        final String policyBody = jsonString();
+        restClient().put(url, policyBody).block();
+
+        assertThat(policies.size()).isEqualTo(1);
+        assertThat(services.size()).isEqualTo(1);
+
+        // Timeout after ~1 second
+        await().untilAsserted(() -> assertThat(policies.size()).isEqualTo(0));
+        assertThat(services.size()).isEqualTo(0);
+    }
+
     @Test
     public void testGetPolicyStatus() throws Exception {
         addPolicy("id", "typeName", "service1", "ric1");
@@ -471,16 +516,21 @@ public class ApplicationTest {
         return addPolicy(id, typeName, service, "ric");
     }
 
-    private String createServiceJson(String name) {
-        ServiceRegistrationInfo service = new ServiceRegistrationInfo(name, 1, "callbackUrl");
+    private String createServiceJson(String name, long keepAliveIntervalSeconds) {
+        ServiceRegistrationInfo service = new ServiceRegistrationInfo(name, keepAliveIntervalSeconds, "callbackUrl");
 
         String json = gson.toJson(service);
         return json;
     }
 
     private void putService(String name) {
+        putService(name, 0);
+    }
+
+    private void putService(String name, long keepAliveIntervalSeconds) {
         String url = "/service";
-        restClient().put(url, createServiceJson(name)).block();
+        String body = createServiceJson(name, keepAliveIntervalSeconds);
+        restClient().put(url, body).block();
     }
 
     private String baseUrl() {
@@ -496,9 +546,9 @@ public class ApplicationTest {
         private final String baseUrl;
         static AtomicInteger nextCount = new AtomicInteger(0);
         private final int count;
-        private final RepositorySupervision supervision;
+        private final RicSupervision supervision;
 
-        ConcurrencyTestRunnable(String baseUrl, RepositorySupervision supervision) {
+        ConcurrencyTestRunnable(String baseUrl, RicSupervision supervision) {
             this.baseUrl = baseUrl;
             this.count = nextCount.incrementAndGet();
             this.supervision = supervision;
@@ -543,7 +593,7 @@ public class ApplicationTest {
             t.join();
         }
         assertThat(policies.size()).isEqualTo(0);
-        System.out.println("Concurrency test took " + Duration.between(startTime, Instant.now()));
+        logger.info("Concurrency test took " + Duration.between(startTime, Instant.now()));
     }
 
     private AsyncRestClient restClient() {
index b0f1e7f..efbd576 100644 (file)
@@ -37,6 +37,8 @@ import org.oransc.policyagent.repository.PolicyType;
 import org.oransc.policyagent.repository.PolicyTypes;
 import org.oransc.policyagent.repository.Rics;
 import org.oransc.policyagent.utils.MockA1ClientFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
@@ -48,6 +50,7 @@ import org.springframework.test.context.junit.jupiter.SpringExtension;
 @ExtendWith(SpringExtension.class)
 @SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
 public class MockPolicyAgent {
+    private static final Logger logger = LoggerFactory.getLogger(MockPolicyAgent.class);
 
     @Autowired
     Rics rics;
@@ -117,7 +120,7 @@ public class MockPolicyAgent {
                     PolicyType type = ImmutablePolicyType.builder().name(typeName).schema(schema).build();
                     policyTypes.put(type);
                 } catch (Exception e) {
-                    System.out.println("Could not load json schema " + e);
+                    logger.error("Could not load json schema ", e);
                 }
             }
         }
@@ -127,13 +130,13 @@ public class MockPolicyAgent {
     private int port;
 
     private void keepServerAlive() {
-        System.out.println("Keeping server alive!");
+        logger.info("Keeping server alive!");
         try {
             synchronized (this) {
                 this.wait();
             }
         } catch (Exception ex) {
-            System.out.println("Unexpected: " + ex.toString());
+            logger.error("Unexpected: " + ex);
         }
     }
 
index d034d4d..52147a8 100644 (file)
@@ -52,6 +52,8 @@ import org.oransc.policyagent.dmaap.DmaapRequestMessage.Operation;
 import org.oransc.policyagent.repository.ImmutablePolicyType;
 import org.oransc.policyagent.repository.PolicyType;
 import org.oransc.policyagent.utils.LoggingUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ResponseEntity;
 import org.springframework.web.reactive.function.client.WebClientResponseException;
@@ -60,7 +62,7 @@ import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
 public class DmaapMessageHandlerTest {
-
+    private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandlerTest.class);
     private static final String URL = "url";
 
     private final MRBatchingPublisher dmaapClient = mock(MRBatchingPublisher.class);
@@ -112,13 +114,13 @@ public class DmaapMessageHandlerTest {
     @Test
     public void testMessageParsing() {
         String message = dmaapInputMessage(Operation.DELETE);
-        System.out.println(message);
+        logger.info(message);
         DmaapRequestMessage parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class);
         assertTrue(parsedMessage != null);
         assertFalse(parsedMessage.payload().isPresent());
 
         message = dmaapInputMessage(Operation.PUT);
-        System.out.println(message);
+        logger.info(message);
         parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class);
         assertTrue(parsedMessage != null);
         assertTrue(parsedMessage.payload().isPresent());
index 825010a..6fd6c8b 100644 (file)
@@ -71,10 +71,8 @@ public class LockTest {
         Lock lock = new Lock();
 
         Mono<Lock> seq = lock.lock(LockType.EXCLUSIVE) //
-            .doOnNext(l -> System.out.println("1 " + l)) //
             .flatMap(l -> lock.lock(LockType.EXCLUSIVE)) //
-            .flatMap(l -> lock.unlock()) //
-            .doOnNext(l -> System.out.println("2 " + l)); //
+            .flatMap(l -> lock.unlock());
 
         asynchUnlock(lock);
         StepVerifier.create(seq) //
@@ -56,7 +56,7 @@ import org.oransc.policyagent.repository.Rics;
 import reactor.core.publisher.Mono;
 
 @ExtendWith(MockitoExtension.class)
-public class RepositorySupervisionTest {
+public class RicSupervisionTest {
     private static final String POLICY_TYPE_1_NAME = "type1";
     private static final PolicyType POLICY_TYPE_1 = ImmutablePolicyType.builder() //
         .name(POLICY_TYPE_1_NAME) //
@@ -133,8 +133,7 @@ public class RepositorySupervisionTest {
         setUpGetPolicyIdentitiesToReturn(new ArrayList<>(Arrays.asList(POLICY_1_ID)));
         setUpGetPolicyTypeIdentitiesToReturn(new ArrayList<>(Arrays.asList(POLICY_TYPE_1_NAME)));
 
-        RepositorySupervision supervisorUnderTest =
-            spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null));
+        RicSupervision supervisorUnderTest = spy(new RicSupervision(rics, policies, a1ClientFactory, types, null));
 
         supervisorUnderTest.checkAllRics();
 
@@ -147,8 +146,7 @@ public class RepositorySupervisionTest {
         RIC_1.setState(RicState.UNDEFINED);
         rics.put(RIC_1);
 
-        RepositorySupervision supervisorUnderTest =
-            spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null));
+        RicSupervision supervisorUnderTest = spy(new RicSupervision(rics, policies, a1ClientFactory, types, null));
 
         doReturn(recoveryTaskMock).when(supervisorUnderTest).createSynchronizationTask();
 
@@ -165,8 +163,7 @@ public class RepositorySupervisionTest {
         RIC_1.setState(RicState.SYNCHRONIZING);
         rics.put(RIC_1);
 
-        RepositorySupervision supervisorUnderTest =
-            spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null));
+        RicSupervision supervisorUnderTest = spy(new RicSupervision(rics, policies, a1ClientFactory, types, null));
 
         supervisorUnderTest.checkAllRics();
 
@@ -182,8 +179,7 @@ public class RepositorySupervisionTest {
 
         setUpGetPolicyIdentitiesToReturn(new Exception("Failed"));
 
-        RepositorySupervision supervisorUnderTest =
-            spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null));
+        RicSupervision supervisorUnderTest = spy(new RicSupervision(rics, policies, a1ClientFactory, types, null));
         supervisorUnderTest.checkAllRics();
 
         verify(supervisorUnderTest).checkAllRics();
@@ -200,8 +196,7 @@ public class RepositorySupervisionTest {
 
         setUpGetPolicyIdentitiesToReturn(new ArrayList<>(Arrays.asList(POLICY_1_ID)));
 
-        RepositorySupervision supervisorUnderTest =
-            spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null));
+        RicSupervision supervisorUnderTest = spy(new RicSupervision(rics, policies, a1ClientFactory, types, null));
 
         doReturn(recoveryTaskMock).when(supervisorUnderTest).createSynchronizationTask();
 
@@ -223,8 +218,7 @@ public class RepositorySupervisionTest {
 
         setUpGetPolicyIdentitiesToReturn(new ArrayList<>(Arrays.asList(POLICY_1_ID, "Another_policy")));
 
-        RepositorySupervision supervisorUnderTest =
-            spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null));
+        RicSupervision supervisorUnderTest = spy(new RicSupervision(rics, policies, a1ClientFactory, types, null));
 
         doReturn(recoveryTaskMock).when(supervisorUnderTest).createSynchronizationTask();
 
@@ -245,8 +239,7 @@ public class RepositorySupervisionTest {
         setUpGetPolicyIdentitiesToReturn(Collections.emptyList());
         setUpGetPolicyTypeIdentitiesToReturn(new Exception("Failed"));
 
-        RepositorySupervision supervisorUnderTest =
-            spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null));
+        RicSupervision supervisorUnderTest = spy(new RicSupervision(rics, policies, a1ClientFactory, types, null));
         supervisorUnderTest.checkAllRics();
 
         verify(supervisorUnderTest).checkAllRics();
@@ -264,8 +257,7 @@ public class RepositorySupervisionTest {
         setUpGetPolicyIdentitiesToReturn(Collections.emptyList());
         setUpGetPolicyTypeIdentitiesToReturn(new ArrayList<>(Arrays.asList(POLICY_TYPE_1_NAME, "another_policy_type")));
 
-        RepositorySupervision supervisorUnderTest =
-            spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null));
+        RicSupervision supervisorUnderTest = spy(new RicSupervision(rics, policies, a1ClientFactory, types, null));
 
         doReturn(recoveryTaskMock).when(supervisorUnderTest).createSynchronizationTask();
 
@@ -292,8 +284,7 @@ public class RepositorySupervisionTest {
         setUpGetPolicyIdentitiesToReturn(Collections.emptyList());
         setUpGetPolicyTypeIdentitiesToReturn(new ArrayList<>(Arrays.asList(POLICY_TYPE_1_NAME, "another_policy_type")));
 
-        RepositorySupervision supervisorUnderTest =
-            spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null));
+        RicSupervision supervisorUnderTest = spy(new RicSupervision(rics, policies, a1ClientFactory, types, null));
 
         doReturn(recoveryTaskMock).when(supervisorUnderTest).createSynchronizationTask();
 
index a984528..070e8da 100644 (file)
@@ -101,7 +101,7 @@ public class ServiceSupervisionTest {
 
         await().atMost(Durations.FIVE_SECONDS).with().pollInterval(Durations.ONE_SECOND).until(service::isExpired);
 
-        serviceSupervisionUnderTest.checkAllServices();
+        serviceSupervisionUnderTest.checkAllServices().blockLast();
 
         assertThat(policies.size()).isEqualTo(0);
         assertThat(services.size()).isEqualTo(0);
@@ -125,7 +125,7 @@ public class ServiceSupervisionTest {
 
         final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ServiceSupervision.class, WARN);
 
-        serviceSupervisionUnderTest.checkAllServices();
+        serviceSupervisionUnderTest.checkAllServices().blockLast();
 
         assertThat(policies.size()).isEqualTo(0);
         assertThat(services.size()).isEqualTo(0);
@@ -143,7 +143,7 @@ public class ServiceSupervisionTest {
         ServiceSupervision serviceSupervisionUnderTest =
             new ServiceSupervision(services, policies, a1ClientFactoryMock);
 
-        serviceSupervisionUnderTest.checkAllServices();
+        serviceSupervisionUnderTest.checkAllServices().blockLast();
 
         assertThat(policies.size()).isEqualTo(1);
         assertThat(services.size()).isEqualTo(1);
@@ -159,7 +159,7 @@ public class ServiceSupervisionTest {
         ServiceSupervision serviceSupervisionUnderTest =
             new ServiceSupervision(services, policies, a1ClientFactoryMock);
 
-        serviceSupervisionUnderTest.checkAllServices();
+        serviceSupervisionUnderTest.checkAllServices().blockLast();
 
         assertThat(policies.size()).isEqualTo(1);
         assertThat(services.size()).isEqualTo(1);