X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=policy-agent%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Ftasks%2FStartupService.java;h=590b010789129220504aa0ffc3dd7ee9adde9a1b;hb=2de65533ba065c8a8cad97949d22d04cc3cd6ad9;hp=283e8ea92fa3f78622ab2e67cb368514960bca34;hpb=a76d95e9292f99dfb5cd5782ef6d7bb2ec293fd7;p=nonrtric.git diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/StartupService.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/StartupService.java index 283e8ea9..590b0107 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/StartupService.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/StartupService.java @@ -20,34 +20,37 @@ package org.oransc.policyagent.tasks; -import org.oransc.policyagent.clients.A1Client; +import org.oransc.policyagent.clients.A1ClientFactory; import org.oransc.policyagent.configuration.ApplicationConfig; -import org.oransc.policyagent.exceptions.ServiceException; -import org.oransc.policyagent.repository.ImmutablePolicyType; -import org.oransc.policyagent.repository.PolicyType; +import org.oransc.policyagent.configuration.ApplicationConfig.RicConfigUpdate; +import org.oransc.policyagent.configuration.RicConfig; +import org.oransc.policyagent.repository.Policies; import org.oransc.policyagent.repository.PolicyTypes; import org.oransc.policyagent.repository.Ric; -import org.oransc.policyagent.repository.Ric.RicState; import org.oransc.policyagent.repository.Rics; +import org.oransc.policyagent.repository.Services; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.core.Ordered; +import org.springframework.core.annotation.Order; import org.springframework.stereotype.Service; -import reactor.core.publisher.Flux; -import reactor.core.publisher.Mono; - /** * Loads information about RealTime-RICs at startup. */ @Service("startupService") -public class StartupService { +@Order(Ordered.HIGHEST_PRECEDENCE) +public class StartupService implements ApplicationConfig.Observer { private static final Logger logger = LoggerFactory.getLogger(StartupService.class); @Autowired ApplicationConfig applicationConfig; + @Autowired + RefreshConfigTask refreshConfigTask; + @Autowired private Rics rics; @@ -55,80 +58,59 @@ public class StartupService { PolicyTypes policyTypes; @Autowired - private A1Client a1Client; + private A1ClientFactory a1ClientFactory; + + @Autowired + private Policies policies; + + @Autowired + private Services services; - StartupService(ApplicationConfig appConfig, Rics rics, PolicyTypes policyTypes, A1Client a1Client) { + // Only for unit testing + StartupService(ApplicationConfig appConfig, RefreshConfigTask refreshTask, Rics rics, PolicyTypes policyTypes, + A1ClientFactory a1ClientFactory, Policies policies, Services services) { this.applicationConfig = appConfig; + this.refreshConfigTask = refreshTask; this.rics = rics; this.policyTypes = policyTypes; - this.a1Client = a1Client; - } - - /** - * Reads the configured Rics and performs the service discovery. The result is put into the repository. - */ - public void startup() { - applicationConfig.initialize(); - Flux.fromIterable(applicationConfig.getRicConfigs()) // - .map(ricConfig -> new Ric(ricConfig)) // - .doOnNext(ric -> logger.debug("Handling ric: {}", ric.getConfig().name())) // - .flatMap(this::addPolicyTypesForRic) // - .flatMap(this::deletePoliciesForRic) // - .doOnNext(rics::put) // - .subscribe(); - } - - private Mono addPolicyTypesForRic(Ric ric) { - a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) // - .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId)) - .flatMap((policyTypeId) -> addTypeToRepo(ric, policyTypeId)) // - .flatMap(type -> addTypeToRic(ric, type)) // - .subscribe(null, cause -> setRicToNotReachable(ric, cause), () -> setRicToActive(ric)); - return Mono.just(ric); + this.a1ClientFactory = a1ClientFactory; + this.policies = policies; + this.services = services; } - private Mono addTypeToRepo(Ric ric, String policyTypeId) { - if (policyTypes.contains(policyTypeId)) { - try { - return Mono.just(policyTypes.getType(policyTypeId)); - } catch (ServiceException e) { - return Mono.error(e); + @Override + public void onRicConfigUpdate(RicConfig ricConfig, RicConfigUpdate event) { + synchronized (this.rics) { + switch (event) { + case ADDED: + case CHANGED: + Ric ric = new Ric(ricConfig); + rics.put(ric); + RicSynchronizationTask synchronizationTask = createSynchronizationTask(); + synchronizationTask.run(ric); + break; + + case REMOVED: + rics.remove(ricConfig.name()); + policies.removePoliciesForRic(ricConfig.name()); + break; + + default: + logger.error("Unhandled ric event: {}", event); } } - return a1Client.getPolicyType(ric.getConfig().baseUrl(), policyTypeId) // - .flatMap(schema -> createPolicyType(policyTypeId, schema)); - } - - private Mono createPolicyType(String policyTypeId, String schema) { - PolicyType pt = ImmutablePolicyType.builder().name(policyTypeId).schema(schema).build(); - policyTypes.put(pt); - return Mono.just(pt); - } - - private Mono addTypeToRic(Ric ric, PolicyType policyType) { - ric.addSupportedPolicyType(policyType); - return Mono.just(policyType); - } - - private Mono deletePoliciesForRic(Ric ric) { - if (!Ric.RicState.NOT_REACHABLE.equals(ric.state())) { - a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) // - .doOnNext( - policyId -> logger.debug("Deleting policy: {}, for ric: {}", policyId, ric.getConfig().name())) - .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)) // - .subscribe(null, cause -> setRicToNotReachable(ric, cause), null); - } - - return Mono.just(ric); } - private void setRicToNotReachable(Ric ric, Throwable t) { - ric.setState(Ric.RicState.NOT_REACHABLE); - logger.info("Unable to connect to ric {}. Cause: {}", ric.name(), t.getMessage()); + /** + * Reads the configured Rics and performs the service discovery. The result is put into the repository. + */ + public void startup() { + logger.debug("Starting up"); + applicationConfig.addObserver(this); + refreshConfigTask.start(); } - private void setRicToActive(Ric ric) { - ric.setState(RicState.ACTIVE); + RicSynchronizationTask createSynchronizationTask() { + return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services); } - }