package org.oransc.policyagent.tasks;
+import org.oransc.policyagent.clients.A1ClientFactory;
import org.oransc.policyagent.repository.Policies;
import org.oransc.policyagent.repository.Policy;
import org.oransc.policyagent.repository.Service;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
+
import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+/**
+ * Periodically checks that services with a keepAliveInterval set are alive. If a service is deemed not alive,
+ * all the service's policies are deleted, both in the repository and in the affected Rics, and the service is
+ * removed from the repository. This means that the service needs to register again after this.
+ */
@Component
@EnableScheduling
public class ServiceSupervision {
private static final Logger logger = LoggerFactory.getLogger(ServiceSupervision.class);
private final Services services;
private final Policies policies;
+ private A1ClientFactory a1ClientFactory;
@Autowired
- public ServiceSupervision(Services services, Policies policies) {
+ public ServiceSupervision(Services services, Policies policies, A1ClientFactory a1ClientFactory) {
this.services = services;
this.policies = policies;
+ this.a1ClientFactory = a1ClientFactory;
}
@Scheduled(fixedRate = 1000 * 60)
public void checkAllServices() {
logger.debug("Checking services starting");
- createTask().subscribe(this::onPolicyDeleted, this::onError, this::onComplete);
+ createTask().subscribe(this::onPolicyDeleted, null, this::onComplete);
}
+ @SuppressWarnings("squid:S2629")
private void onPolicyDeleted(Policy policy) {
- logger.info("Policy deleted due to inactivity: " + policy.id() + ", service: " + policy.ownerServiceName());
- }
-
- private void onError(Throwable t) {
- logger.error("Service supervision failed", t);
+ logger.debug("Policy deleted due to inactivity: {}, service: {}", policy.id(), policy.ownerServiceName());
}
private void onComplete() {
logger.debug("Checking services completed");
}
- Flux<Policy> createTask() {
- return Flux.fromIterable(services.getAll()) //
- .filter(service -> service.isExpired()) //
- .doOnNext(service -> logger.info("Service is expired:" + service.getName()))
- .flatMap(service -> getAllPolicies(service)) //
- .flatMap(policy -> deletePolicy(policy));
+ private Flux<Policy> createTask() {
+ synchronized (services) {
+ return Flux.fromIterable(services.getAll()) //
+ .filter(Service::isExpired) //
+ .doOnNext(service -> logger.info("Service is expired: {}", service.getName())) //
+ .doOnNext(service -> services.remove(service.getName())) //
+ .flatMap(this::getAllPoliciesForService) //
+ .doOnNext(policies::remove) //
+ .flatMap(this::deletePolicyInRic);
+ }
}
- Flux<Policy> getAllPolicies(Service service) {
- return Flux.fromIterable(policies.getForService(service.getName()));
+ private Flux<Policy> getAllPoliciesForService(Service service) {
+ synchronized (policies) {
+ return Flux.fromIterable(policies.getForService(service.getName()));
+ }
}
- Flux<Policy> deletePolicy(Policy policy) {
- this.policies.remove(policy);
- return Flux.just(policy);
+ private Mono<Policy> deletePolicyInRic(Policy policy) {
+ return a1ClientFactory.createA1Client(policy.ric()) //
+ .flatMap(client -> client.deletePolicy(policy) //
+ .onErrorResume(exception -> handleDeleteFromRicFailure(policy, exception)) //
+ .map(nothing -> policy));
}
+ @SuppressWarnings("squid:S2629")
+ private Mono<String> handleDeleteFromRicFailure(Policy policy, Throwable e) {
+ logger.warn("Could not delete policy: {} from ric: {}", policy.id(), policy.ric().name(), e);
+ return Mono.empty();
+ }
}