org.springframework: ERROR
org.springframework.data: ERROR
org.springframework.web.reactive.function.client.ExchangeFunctions: ERROR
- org.oransc.policyagent: WARN
+ org.oransc.policyagent: INFO
file: /var/log/policy-agent/application.log
app:
filepath: /opt/app/policy-agent/config/application_configuration.json
'200':
description: Policy updated
schema:
- type: string
+ type: object
'201':
description: Policy created
schema:
- type: string
+ type: object
'401':
description: Unauthorized
'403':
title: RicInfo
ServiceRegistrationInfo:
type: object
+ required:
+ - serviceName
properties:
callbackUrl:
type: string
import java.util.List;
import org.oransc.policyagent.clients.A1ClientFactory;
-import org.oransc.policyagent.configuration.ApplicationConfig;
import org.oransc.policyagent.exceptions.ServiceException;
import org.oransc.policyagent.repository.ImmutablePolicy;
import org.oransc.policyagent.repository.Lock.LockType;
import org.oransc.policyagent.repository.PolicyTypes;
import org.oransc.policyagent.repository.Ric;
import org.oransc.policyagent.repository.Rics;
+import org.oransc.policyagent.repository.Service;
+import org.oransc.policyagent.repository.Services;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
@Api(tags = "A1 Policy Management")
public class PolicyController {
- private final Rics rics;
- private final PolicyTypes policyTypes;
- private final Policies policies;
- private final A1ClientFactory a1ClientFactory;
+ @Autowired
+ private Rics rics;
+ @Autowired
+ private PolicyTypes policyTypes;
+ @Autowired
+ private Policies policies;
+ @Autowired
+ private A1ClientFactory a1ClientFactory;
+ @Autowired
+ private Services services;
private static Gson gson = new GsonBuilder() //
.serializeNulls() //
.create(); //
- @Autowired
- PolicyController(ApplicationConfig config, PolicyTypes types, Policies policies, Rics rics,
- A1ClientFactory a1ClientFactory) {
- this.policyTypes = types;
- this.policies = policies;
- this.rics = rics;
- this.a1ClientFactory = a1ClientFactory;
- }
-
@GetMapping("/policy_schemas")
@ApiOperation(value = "Returns policy type schema definitions")
@ApiResponses(
@ApiResponse(code = 423, message = "RIC is locked", response = String.class)})
public Mono<ResponseEntity<Object>> deletePolicy( //
@RequestParam(name = "instance", required = true) String id) {
- Policy policy = policies.get(id);
- if (policy != null && policy.ric().getState() == Ric.RicState.IDLE) {
+ Policy policy;
+ try {
+ policy = policies.getPolicy(id);
+ keepServiceAlive(policy.ownerServiceName());
+ if (policy.ric().getState() != Ric.RicState.IDLE) {
+ return Mono.just(new ResponseEntity<>("Busy, recovering", HttpStatus.LOCKED));
+ }
Ric ric = policy.ric();
return ric.getLock().lock(LockType.SHARED) // //
.flatMap(lock -> a1ClientFactory.createA1Client(policy.ric())) //
.doOnNext(notUsed -> ric.getLock().unlockBlocking()) //
.doOnError(notUsed -> ric.getLock().unlockBlocking()) //
.flatMap(notUsed -> Mono.just(new ResponseEntity<>(HttpStatus.NO_CONTENT)));
- } else if (policy != null) {
- return Mono.just(new ResponseEntity<>("Busy, recovering", HttpStatus.LOCKED));
- } else {
+ } catch (ServiceException e) {
return Mono.just(new ResponseEntity<>(HttpStatus.NOT_FOUND));
}
}
@ApiOperation(value = "Put a policy", response = String.class)
@ApiResponses(
value = { //
- @ApiResponse(code = 201, message = "Policy created"), //
- @ApiResponse(code = 200, message = "Policy updated"), //
+ @ApiResponse(code = 201, message = "Policy created", response = Object.class), //
+ @ApiResponse(code = 200, message = "Policy updated", response = Object.class), //
@ApiResponse(code = 423, message = "RIC is locked", response = String.class), //
@ApiResponse(code = 404, message = "RIC or policy type is not found", response = String.class), //
@ApiResponse(code = 405, message = "Change is not allowed", response = String.class)})
String jsonString = gson.toJson(jsonBody);
Ric ric = rics.get(ricName);
PolicyType type = policyTypes.get(typeName);
+ keepServiceAlive(service);
if (ric != null && type != null && ric.getState() == Ric.RicState.IDLE) {
Policy policy = ImmutablePolicy.builder() //
.id(instanceId) //
}
}
+ private void keepServiceAlive(String name) {
+ Service s = this.services.get(name);
+ if (s != null) {
+ s.keepAlive();
+ }
+ }
+
private boolean include(String filter, String value) {
return filter == null || value.equals(filter);
}
private final Policies policies;
private static Gson gson = new GsonBuilder() //
- .serializeNulls() //
.create(); //
@Autowired
s.getCallbackUrl());
}
+ private void validateRegistrationInfo(ServiceRegistrationInfo registrationInfo) throws ServiceException {
+ if (registrationInfo.serviceName.isEmpty()) {
+ throw new ServiceException("Missing mandatory parameter 'serviceName'");
+ }
+ }
+
@ApiOperation(value = "Register a service")
@ApiResponses(
value = { //
public ResponseEntity<String> putService(//
@RequestBody ServiceRegistrationInfo registrationInfo) {
try {
+ validateRegistrationInfo(registrationInfo);
this.services.put(toService(registrationInfo));
return new ResponseEntity<>("OK", HttpStatus.OK);
} catch (Exception e) {
}
}
- @ApiOperation(value = "Keep the policies alive for a service")
+ @ApiOperation(value = "Heartbeat from a serice")
@ApiResponses(
value = { //
- @ApiResponse(code = 200, message = "Policies timeout supervision refreshed"),
+ @ApiResponse(code = 200, message = "Service supervision timer refreshed, OK"),
@ApiResponse(code = 404, message = "The service is not found, needs re-registration")})
@PostMapping("/services/keepalive")
public ResponseEntity<String> keepAliveService(//
@RequestParam(name = "name", required = true) String serviceName) {
try {
- services.getService(serviceName).ping();
+ services.getService(serviceName).keepAlive();
return new ResponseEntity<>("OK", HttpStatus.OK);
- } catch (Exception e) {
+ } catch (ServiceException e) {
return new ResponseEntity<>(e.getMessage(), HttpStatus.NOT_FOUND);
}
}
package org.oransc.policyagent.controllers;
+import com.google.gson.annotations.SerializedName;
+
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
@ApiModel(value = "ServiceRegistrationInfo")
public class ServiceRegistrationInfo {
- @ApiModelProperty(value = "identity of the service")
- public String serviceName;
+ @ApiModelProperty(value = "identity of the service", required = true, allowEmptyValue = false)
+ @SerializedName(value = "serviceName", alternate = {"name"})
- @ApiModelProperty(
- value = "keep alive interval for policies owned by the service. 0 means no timeout supervision."
- + " Polcies that are not refreshed within this time are removed")
- public long keepAliveIntervalSeconds;
+ public String serviceName = "";
- @ApiModelProperty(value = "callback for notifying of RIC recovery")
- public String callbackUrl;
+ @ApiModelProperty(
+ value = "keep alive interval for the service. This is a heartbeat supervision of the service, "
+ + "which in regular intevals must invoke a 'keepAlive' REST call. "
+ + "When a service does not invoke this call within the given time, it is considered unavailble. "
+ + "An unavailable service will be automatically deregistered and its policies will be deleted. "
+ + "Value 0 means no timeout supervision.")
+ @SerializedName("keepAliveIntervalSeconds")
+ public long keepAliveIntervalSeconds = 0;
+
+ @ApiModelProperty(value = "callback for notifying of RIC recovery", required = false, allowEmptyValue = true)
+ @SerializedName("callbackUrl")
+ public String callbackUrl = "";
public ServiceRegistrationInfo() {
}
public final long keepAliveIntervalSeconds;
@ApiModelProperty(value = "time since last invocation by the service")
- public final long timeSincePingSeconds;
+ public final long timeSinceLastActivitySeconds;
@ApiModelProperty(value = "callback for notifying of RIC recovery")
public String callbackUrl;
ServiceStatus(String name, long keepAliveIntervalSeconds, long timeSincePingSeconds, String callbackUrl) {
this.serviceName = name;
this.keepAliveIntervalSeconds = keepAliveIntervalSeconds;
- this.timeSincePingSeconds = timeSincePingSeconds;
+ this.timeSinceLastActivitySeconds = timeSincePingSeconds;
this.callbackUrl = callbackUrl;
}
this.name = name;
this.keepAliveInterval = keepAliveInterval;
this.callbackUrl = callbackUrl;
- ping();
+ keepAlive();
}
public synchronized Duration getKeepAliveInterval() {
return this.keepAliveInterval;
}
- public synchronized void ping() {
+ public synchronized void keepAlive() {
this.lastPing = Instant.now();
}
package org.oransc.policyagent.repository;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
}
public synchronized Iterable<Service> getAll() {
- return registeredServices.values();
+ return Collections.unmodifiableCollection(registeredServices.values());
}
public synchronized void remove(String name) {
*/
@Component
@EnableScheduling
-public class RepositorySupervision {
- private static final Logger logger = LoggerFactory.getLogger(RepositorySupervision.class);
+public class RicSupervision {
+ private static final Logger logger = LoggerFactory.getLogger(RicSupervision.class);
private final Rics rics;
private final Policies policies;
private final Services services;
@Autowired
- public RepositorySupervision(Rics rics, Policies policies, A1ClientFactory a1ClientFactory, PolicyTypes policyTypes,
+ public RicSupervision(Rics rics, Policies policies, A1ClientFactory a1ClientFactory, PolicyTypes policyTypes,
Services services) {
this.rics = rics;
this.policies = policies;
@Scheduled(fixedRate = 1000 * 60)
public void checkAllRics() {
logger.debug("Checking Rics starting");
- createTask().subscribe(this::onRicChecked, null, this::onComplete);
+ createTask().subscribe(ric -> logger.debug("Ric: {} checked", ric.ric.name()), //
+ null, //
+ () -> logger.debug("Checking Rics completed"));
}
private Flux<RicData> createTask() {
return Mono.error(new Exception("Syncronization started"));
}
- @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
- private void onRicChecked(RicData ric) {
- logger.debug("Ric: {} checked", ric.ric.name());
- }
-
- private void onComplete() {
- logger.debug("Checking Rics completed");
- }
-
RicSynchronizationTask createSynchronizationTask() {
return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services);
}
@SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
private void onSynchronizationComplete(Ric ric) {
- logger.debug("Synchronization completed for: {}", ric.name());
+ logger.info("Synchronization completed for: {}", ric.name());
ric.setState(RicState.IDLE);
notifyAllServices("Synchronization completed for:" + ric.name());
}
package org.oransc.policyagent.tasks;
+import java.time.Duration;
+
import org.oransc.policyagent.clients.A1ClientFactory;
+import org.oransc.policyagent.repository.Lock;
+import org.oransc.policyagent.repository.Lock.LockType;
import org.oransc.policyagent.repository.Policies;
import org.oransc.policyagent.repository.Policy;
import org.oransc.policyagent.repository.Service;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.EnableScheduling;
-import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
/**
- * Periodically checks that services with a keepAliveInterval set are alive. If a service is deemed not alive,
- * all the service's policies are deleted, both in the repository and in the affected Rics, and the service is
- * removed from the repository. This means that the service needs to register again after this.
+ * Periodically checks that services with a keepAliveInterval set are alive. If
+ * a service is deemed not alive, all the service's policies are deleted, both
+ * in the repository and in the affected Rics, and the service is removed from
+ * the repository. This means that the service needs to register again after
+ * this.
*/
@Component
@EnableScheduling
private final Services services;
private final Policies policies;
private A1ClientFactory a1ClientFactory;
+ private final Duration checkInterval;
@Autowired
public ServiceSupervision(Services services, Policies policies, A1ClientFactory a1ClientFactory) {
+ this(services, policies, a1ClientFactory, Duration.ofMinutes(1));
+ }
+
+ public ServiceSupervision(Services services, Policies policies, A1ClientFactory a1ClientFactory,
+ Duration checkInterval) {
this.services = services;
this.policies = policies;
this.a1ClientFactory = a1ClientFactory;
+ this.checkInterval = checkInterval;
+ start();
}
- @Scheduled(fixedRate = 1000 * 60)
- public void checkAllServices() {
+ private void start() {
logger.debug("Checking services starting");
- createTask().subscribe(this::onPolicyDeleted, null, this::onComplete);
+ createTask().subscribe(null, null, () -> logger.error("Checking services unexpectedly terminated"));
}
- @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
- private void onPolicyDeleted(Policy policy) {
- logger.debug("Policy deleted due to inactivity: {}, service: {}", policy.id(), policy.ownerServiceName());
+ private Flux<?> createTask() {
+ return Flux.interval(this.checkInterval) //
+ .flatMap(notUsed -> checkAllServices());
}
- private void onComplete() {
- logger.debug("Checking services completed");
+ Flux<Policy> checkAllServices() {
+ return Flux.fromIterable(services.getAll()) //
+ .filter(Service::isExpired) //
+ .doOnNext(service -> logger.info("Service is expired: {}", service.getName())) //
+ .doOnNext(service -> services.remove(service.getName())) //
+ .flatMap(this::getAllPoliciesForService) //
+ .flatMap(this::deletePolicy);
}
- private Flux<Policy> createTask() {
- synchronized (services) {
- return Flux.fromIterable(services.getAll()) //
- .filter(Service::isExpired) //
- .doOnNext(service -> logger.info("Service is expired: {}", service.getName())) //
- .doOnNext(service -> services.remove(service.getName())) //
- .flatMap(this::getAllPoliciesForService) //
- .doOnNext(policies::remove) //
- .flatMap(this::deletePolicyInRic);
- }
+ @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
+ private Flux<Policy> deletePolicy(Policy policy) {
+ Lock lock = policy.ric().getLock();
+ return lock.lock(LockType.SHARED) //
+ .doOnNext(notUsed -> policies.remove(policy)) //
+ .flatMap(notUsed -> deletePolicyInRic(policy))
+ .doOnNext(notUsed -> logger.debug("Policy deleted due to service inactivity: {}, service: {}", policy.id(),
+ policy.ownerServiceName())) //
+ .doOnNext(notUsed -> lock.unlockBlocking()) //
+ .doOnError(throwable -> lock.unlockBlocking()) //
+ .doOnError(throwable -> logger.debug("Failed to delete inactive policy: {}, reason: {}", policy.id(),
+ throwable.getMessage())) //
+ .flatMapMany(notUsed -> Flux.just(policy)) //
+ .onErrorResume(throwable -> Flux.empty());
}
private Flux<Policy> getAllPoliciesForService(Service service) {
import org.oransc.policyagent.repository.Ric.RicState;
import org.oransc.policyagent.repository.Rics;
import org.oransc.policyagent.repository.Services;
-import org.oransc.policyagent.tasks.RepositorySupervision;
+import org.oransc.policyagent.tasks.RicSupervision;
+import org.oransc.policyagent.tasks.ServiceSupervision;
import org.oransc.policyagent.utils.MockA1Client;
import org.oransc.policyagent.utils.MockA1ClientFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
@ExtendWith(SpringExtension.class)
@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
public class ApplicationTest {
+ private static final Logger logger = LoggerFactory.getLogger(ApplicationTest.class);
+
@Autowired
ApplicationContext context;
MockA1ClientFactory a1ClientFactory;
@Autowired
- RepositorySupervision supervision;
+ RicSupervision supervision;
@Autowired
Services services;
@TestConfiguration
static class TestBeanFactory {
private final PolicyTypes policyTypes = new PolicyTypes();
+ private final Services services = new Services();
+ private final Policies policies = new Policies();
+ MockA1ClientFactory a1ClientFactory = null;
@Bean
public ApplicationConfig getApplicationConfig() {
@Bean
MockA1ClientFactory getA1ClientFactory() {
- return new MockA1ClientFactory(this.policyTypes);
+ if (a1ClientFactory == null) {
+ this.a1ClientFactory = new MockA1ClientFactory(this.policyTypes);
+ }
+ return this.a1ClientFactory;
}
@Bean
public PolicyTypes getPolicyTypes() {
return this.policyTypes;
}
+
+ @Bean
+ Policies getPolicies() {
+ return this.policies;
+ }
+
+ @Bean
+ Services getServices() {
+ return this.services;
+ }
+
+ @Bean
+ public ServiceSupervision getServiceSupervision() {
+ Duration checkInterval = Duration.ofMillis(1);
+ return new ServiceSupervision(this.services, this.policies, this.getA1ClientFactory(), checkInterval);
+ }
}
@LocalServerPort
String url = "/policy_schema?id=type1";
String rsp = restClient().get(url).block();
- System.out.println(rsp);
+ logger.info(rsp);
assertThat(rsp).contains("type1");
assertThat(rsp).contains("title");
String url = "/policies";
String rsp = restClient().get(url).block();
- System.out.println(rsp);
+ logger.info(rsp);
List<PolicyInfo> info = parseList(rsp, PolicyInfo.class);
assertThat(info).size().isEqualTo(1);
PolicyInfo policyInfo = info.get(0);
String url = "/policies?type=type1";
String rsp = restClient().get(url).block();
- System.out.println(rsp);
+ logger.info(rsp);
assertThat(rsp).contains("id1");
assertThat(rsp).contains("id2");
assertThat(rsp.contains("id3")).isFalse();
url = "/policies?type=type1&service=service2";
rsp = restClient().get(url).block();
- System.out.println(rsp);
+ logger.info(rsp);
assertThat(rsp.contains("id1")).isFalse();
assertThat(rsp).contains("id2");
assertThat(rsp.contains("id3")).isFalse();
@Test
public void testPutAndGetService() throws Exception {
// PUT
- putService("name");
+ putService("name", 0);
// GET one service
String url = "/services?name=name";
List<ServiceStatus> info = parseList(rsp, ServiceStatus.class);
assertThat(info.size()).isEqualTo(1);
ServiceStatus status = info.iterator().next();
- assertThat(status.keepAliveIntervalSeconds).isEqualTo(1);
+ assertThat(status.keepAliveIntervalSeconds).isEqualTo(0);
assertThat(status.serviceName).isEqualTo("name");
// GET (all)
url = "/services";
rsp = restClient().get(url).block();
assertThat(rsp.contains("name")).isTrue();
- System.out.println(rsp);
+ logger.info(rsp);
// Keep alive
url = "/services/keepalive?name=name";
testErrorCode(restClient().post("/services/keepalive?name=name", ""), HttpStatus.NOT_FOUND);
// PUT servive with crap payload
- testErrorCode(restClient().put("/service", "junk"), HttpStatus.BAD_REQUEST);
+ testErrorCode(restClient().put("/service", "crap"), HttpStatus.BAD_REQUEST);
+ testErrorCode(restClient().put("/service", "{}"), HttpStatus.BAD_REQUEST);
// GET non existing servive
testErrorCode(restClient().get("/services?name=XXX"), HttpStatus.NOT_FOUND);
}
+ @Test
+ public void testServiceSupervision() throws Exception {
+ putService("service1", 1);
+ addPolicyType("type1", "ric1");
+
+ String url = putPolicyUrl("service1", "ric1", "type1", "instance1");
+ final String policyBody = jsonString();
+ restClient().put(url, policyBody).block();
+
+ assertThat(policies.size()).isEqualTo(1);
+ assertThat(services.size()).isEqualTo(1);
+
+ // Timeout after ~1 second
+ await().untilAsserted(() -> assertThat(policies.size()).isEqualTo(0));
+ assertThat(services.size()).isEqualTo(0);
+ }
+
@Test
public void testGetPolicyStatus() throws Exception {
addPolicy("id", "typeName", "service1", "ric1");
return addPolicy(id, typeName, service, "ric");
}
- private String createServiceJson(String name) {
- ServiceRegistrationInfo service = new ServiceRegistrationInfo(name, 1, "callbackUrl");
+ private String createServiceJson(String name, long keepAliveIntervalSeconds) {
+ ServiceRegistrationInfo service = new ServiceRegistrationInfo(name, keepAliveIntervalSeconds, "callbackUrl");
String json = gson.toJson(service);
return json;
}
private void putService(String name) {
+ putService(name, 0);
+ }
+
+ private void putService(String name, long keepAliveIntervalSeconds) {
String url = "/service";
- restClient().put(url, createServiceJson(name)).block();
+ String body = createServiceJson(name, keepAliveIntervalSeconds);
+ restClient().put(url, body).block();
}
private String baseUrl() {
private final String baseUrl;
static AtomicInteger nextCount = new AtomicInteger(0);
private final int count;
- private final RepositorySupervision supervision;
+ private final RicSupervision supervision;
- ConcurrencyTestRunnable(String baseUrl, RepositorySupervision supervision) {
+ ConcurrencyTestRunnable(String baseUrl, RicSupervision supervision) {
this.baseUrl = baseUrl;
this.count = nextCount.incrementAndGet();
this.supervision = supervision;
t.join();
}
assertThat(policies.size()).isEqualTo(0);
- System.out.println("Concurrency test took " + Duration.between(startTime, Instant.now()));
+ logger.info("Concurrency test took " + Duration.between(startTime, Instant.now()));
}
private AsyncRestClient restClient() {
import org.oransc.policyagent.repository.PolicyTypes;
import org.oransc.policyagent.repository.Rics;
import org.oransc.policyagent.utils.MockA1ClientFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
@ExtendWith(SpringExtension.class)
@SpringBootTest(webEnvironment = WebEnvironment.DEFINED_PORT)
public class MockPolicyAgent {
+ private static final Logger logger = LoggerFactory.getLogger(MockPolicyAgent.class);
@Autowired
Rics rics;
PolicyType type = ImmutablePolicyType.builder().name(typeName).schema(schema).build();
policyTypes.put(type);
} catch (Exception e) {
- System.out.println("Could not load json schema " + e);
+ logger.error("Could not load json schema ", e);
}
}
}
private int port;
private void keepServerAlive() {
- System.out.println("Keeping server alive!");
+ logger.info("Keeping server alive!");
try {
synchronized (this) {
this.wait();
}
} catch (Exception ex) {
- System.out.println("Unexpected: " + ex.toString());
+ logger.error("Unexpected: " + ex);
}
}
import org.oransc.policyagent.repository.ImmutablePolicyType;
import org.oransc.policyagent.repository.PolicyType;
import org.oransc.policyagent.utils.LoggingUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.reactive.function.client.WebClientResponseException;
import reactor.test.StepVerifier;
public class DmaapMessageHandlerTest {
-
+ private static final Logger logger = LoggerFactory.getLogger(DmaapMessageHandlerTest.class);
private static final String URL = "url";
private final MRBatchingPublisher dmaapClient = mock(MRBatchingPublisher.class);
@Test
public void testMessageParsing() {
String message = dmaapInputMessage(Operation.DELETE);
- System.out.println(message);
+ logger.info(message);
DmaapRequestMessage parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class);
assertTrue(parsedMessage != null);
assertFalse(parsedMessage.payload().isPresent());
message = dmaapInputMessage(Operation.PUT);
- System.out.println(message);
+ logger.info(message);
parsedMessage = gson.fromJson(message, ImmutableDmaapRequestMessage.class);
assertTrue(parsedMessage != null);
assertTrue(parsedMessage.payload().isPresent());
Lock lock = new Lock();
Mono<Lock> seq = lock.lock(LockType.EXCLUSIVE) //
- .doOnNext(l -> System.out.println("1 " + l)) //
.flatMap(l -> lock.lock(LockType.EXCLUSIVE)) //
- .flatMap(l -> lock.unlock()) //
- .doOnNext(l -> System.out.println("2 " + l)); //
+ .flatMap(l -> lock.unlock());
asynchUnlock(lock);
StepVerifier.create(seq) //
import reactor.core.publisher.Mono;
@ExtendWith(MockitoExtension.class)
-public class RepositorySupervisionTest {
+public class RicSupervisionTest {
private static final String POLICY_TYPE_1_NAME = "type1";
private static final PolicyType POLICY_TYPE_1 = ImmutablePolicyType.builder() //
.name(POLICY_TYPE_1_NAME) //
setUpGetPolicyIdentitiesToReturn(new ArrayList<>(Arrays.asList(POLICY_1_ID)));
setUpGetPolicyTypeIdentitiesToReturn(new ArrayList<>(Arrays.asList(POLICY_TYPE_1_NAME)));
- RepositorySupervision supervisorUnderTest =
- spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null));
+ RicSupervision supervisorUnderTest = spy(new RicSupervision(rics, policies, a1ClientFactory, types, null));
supervisorUnderTest.checkAllRics();
RIC_1.setState(RicState.UNDEFINED);
rics.put(RIC_1);
- RepositorySupervision supervisorUnderTest =
- spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null));
+ RicSupervision supervisorUnderTest = spy(new RicSupervision(rics, policies, a1ClientFactory, types, null));
doReturn(recoveryTaskMock).when(supervisorUnderTest).createSynchronizationTask();
RIC_1.setState(RicState.SYNCHRONIZING);
rics.put(RIC_1);
- RepositorySupervision supervisorUnderTest =
- spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null));
+ RicSupervision supervisorUnderTest = spy(new RicSupervision(rics, policies, a1ClientFactory, types, null));
supervisorUnderTest.checkAllRics();
setUpGetPolicyIdentitiesToReturn(new Exception("Failed"));
- RepositorySupervision supervisorUnderTest =
- spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null));
+ RicSupervision supervisorUnderTest = spy(new RicSupervision(rics, policies, a1ClientFactory, types, null));
supervisorUnderTest.checkAllRics();
verify(supervisorUnderTest).checkAllRics();
setUpGetPolicyIdentitiesToReturn(new ArrayList<>(Arrays.asList(POLICY_1_ID)));
- RepositorySupervision supervisorUnderTest =
- spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null));
+ RicSupervision supervisorUnderTest = spy(new RicSupervision(rics, policies, a1ClientFactory, types, null));
doReturn(recoveryTaskMock).when(supervisorUnderTest).createSynchronizationTask();
setUpGetPolicyIdentitiesToReturn(new ArrayList<>(Arrays.asList(POLICY_1_ID, "Another_policy")));
- RepositorySupervision supervisorUnderTest =
- spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null));
+ RicSupervision supervisorUnderTest = spy(new RicSupervision(rics, policies, a1ClientFactory, types, null));
doReturn(recoveryTaskMock).when(supervisorUnderTest).createSynchronizationTask();
setUpGetPolicyIdentitiesToReturn(Collections.emptyList());
setUpGetPolicyTypeIdentitiesToReturn(new Exception("Failed"));
- RepositorySupervision supervisorUnderTest =
- spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null));
+ RicSupervision supervisorUnderTest = spy(new RicSupervision(rics, policies, a1ClientFactory, types, null));
supervisorUnderTest.checkAllRics();
verify(supervisorUnderTest).checkAllRics();
setUpGetPolicyIdentitiesToReturn(Collections.emptyList());
setUpGetPolicyTypeIdentitiesToReturn(new ArrayList<>(Arrays.asList(POLICY_TYPE_1_NAME, "another_policy_type")));
- RepositorySupervision supervisorUnderTest =
- spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null));
+ RicSupervision supervisorUnderTest = spy(new RicSupervision(rics, policies, a1ClientFactory, types, null));
doReturn(recoveryTaskMock).when(supervisorUnderTest).createSynchronizationTask();
setUpGetPolicyIdentitiesToReturn(Collections.emptyList());
setUpGetPolicyTypeIdentitiesToReturn(new ArrayList<>(Arrays.asList(POLICY_TYPE_1_NAME, "another_policy_type")));
- RepositorySupervision supervisorUnderTest =
- spy(new RepositorySupervision(rics, policies, a1ClientFactory, types, null));
+ RicSupervision supervisorUnderTest = spy(new RicSupervision(rics, policies, a1ClientFactory, types, null));
doReturn(recoveryTaskMock).when(supervisorUnderTest).createSynchronizationTask();
await().atMost(Durations.FIVE_SECONDS).with().pollInterval(Durations.ONE_SECOND).until(service::isExpired);
- serviceSupervisionUnderTest.checkAllServices();
+ serviceSupervisionUnderTest.checkAllServices().blockLast();
assertThat(policies.size()).isEqualTo(0);
assertThat(services.size()).isEqualTo(0);
final ListAppender<ILoggingEvent> logAppender = LoggingUtils.getLogListAppender(ServiceSupervision.class, WARN);
- serviceSupervisionUnderTest.checkAllServices();
+ serviceSupervisionUnderTest.checkAllServices().blockLast();
assertThat(policies.size()).isEqualTo(0);
assertThat(services.size()).isEqualTo(0);
ServiceSupervision serviceSupervisionUnderTest =
new ServiceSupervision(services, policies, a1ClientFactoryMock);
- serviceSupervisionUnderTest.checkAllServices();
+ serviceSupervisionUnderTest.checkAllServices().blockLast();
assertThat(policies.size()).isEqualTo(1);
assertThat(services.size()).isEqualTo(1);
ServiceSupervision serviceSupervisionUnderTest =
new ServiceSupervision(services, policies, a1ClientFactoryMock);
- serviceSupervisionUnderTest.checkAllServices();
+ serviceSupervisionUnderTest.checkAllServices().blockLast();
assertThat(policies.size()).isEqualTo(1);
assertThat(services.size()).isEqualTo(1);