X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;f=policy-agent%2Fsrc%2Fmain%2Fjava%2Forg%2Foransc%2Fpolicyagent%2Ftasks%2FStartupService.java;h=283e8ea92fa3f78622ab2e67cb368514960bca34;hb=ffe0c150f08205d73ee362f58f492aeb2703f295;hp=a6accbbf8e1360695e9e45ab536e4c7b83e5dbd3;hpb=8b92754714856090ad81f5a18aa9c93e6b19fe99;p=nonrtric.git diff --git a/policy-agent/src/main/java/org/oransc/policyagent/tasks/StartupService.java b/policy-agent/src/main/java/org/oransc/policyagent/tasks/StartupService.java index a6accbbf..283e8ea9 100644 --- a/policy-agent/src/main/java/org/oransc/policyagent/tasks/StartupService.java +++ b/policy-agent/src/main/java/org/oransc/policyagent/tasks/StartupService.java @@ -20,10 +20,10 @@ package org.oransc.policyagent.tasks; -import java.util.Vector; -import org.oransc.policyagent.clients.RicClient; +import org.oransc.policyagent.clients.A1Client; import org.oransc.policyagent.configuration.ApplicationConfig; -import org.oransc.policyagent.configuration.RicConfig; +import org.oransc.policyagent.exceptions.ServiceException; +import org.oransc.policyagent.repository.ImmutablePolicyType; import org.oransc.policyagent.repository.PolicyType; import org.oransc.policyagent.repository.PolicyTypes; import org.oransc.policyagent.repository.Ric; @@ -34,6 +34,9 @@ import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + /** * Loads information about RealTime-RICs at startup. */ @@ -52,13 +55,13 @@ public class StartupService { PolicyTypes policyTypes; @Autowired - private RicClient ricClient; + private A1Client a1Client; - StartupService(ApplicationConfig appConfig, Rics rics, PolicyTypes policyTypes, RicClient ricClient) { + StartupService(ApplicationConfig appConfig, Rics rics, PolicyTypes policyTypes, A1Client a1Client) { this.applicationConfig = appConfig; this.rics = rics; this.policyTypes = policyTypes; - this.ricClient = ricClient; + this.a1Client = a1Client; } /** @@ -66,22 +69,66 @@ public class StartupService { */ public void startup() { applicationConfig.initialize(); - Vector ricConfigs = applicationConfig.getRicConfigs(); - for (RicConfig ricConfig : ricConfigs) { - Ric ric = new Ric(ricConfig); - String baseUrl = ricConfig.baseUrl(); - ricClient.deleteAllPolicies(baseUrl); - Vector types = ricClient.getPolicyTypes(baseUrl); - for (PolicyType policyType : types) { - if (!policyTypes.contains(policyType)) { - policyTypes.put(policyType); - } + Flux.fromIterable(applicationConfig.getRicConfigs()) // + .map(ricConfig -> new Ric(ricConfig)) // + .doOnNext(ric -> logger.debug("Handling ric: {}", ric.getConfig().name())) // + .flatMap(this::addPolicyTypesForRic) // + .flatMap(this::deletePoliciesForRic) // + .doOnNext(rics::put) // + .subscribe(); + } + + private Mono addPolicyTypesForRic(Ric ric) { + a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) // + .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId)) + .flatMap((policyTypeId) -> addTypeToRepo(ric, policyTypeId)) // + .flatMap(type -> addTypeToRic(ric, type)) // + .subscribe(null, cause -> setRicToNotReachable(ric, cause), () -> setRicToActive(ric)); + return Mono.just(ric); + } + + private Mono addTypeToRepo(Ric ric, String policyTypeId) { + if (policyTypes.contains(policyTypeId)) { + try { + return Mono.just(policyTypes.getType(policyTypeId)); + } catch (ServiceException e) { + return Mono.error(e); } - ric.addSupportedPolicyTypes(types); - ric.setState(RicState.ACTIVE); - rics.put(ric); } + return a1Client.getPolicyType(ric.getConfig().baseUrl(), policyTypeId) // + .flatMap(schema -> createPolicyType(policyTypeId, schema)); + } + + private Mono createPolicyType(String policyTypeId, String schema) { + PolicyType pt = ImmutablePolicyType.builder().name(policyTypeId).schema(schema).build(); + policyTypes.put(pt); + return Mono.just(pt); + } + + private Mono addTypeToRic(Ric ric, PolicyType policyType) { + ric.addSupportedPolicyType(policyType); + return Mono.just(policyType); + } + + private Mono deletePoliciesForRic(Ric ric) { + if (!Ric.RicState.NOT_REACHABLE.equals(ric.state())) { + a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) // + .doOnNext( + policyId -> logger.debug("Deleting policy: {}, for ric: {}", policyId, ric.getConfig().name())) + .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)) // + .subscribe(null, cause -> setRicToNotReachable(ric, cause), null); + } + + return Mono.just(ric); + } + + private void setRicToNotReachable(Ric ric, Throwable t) { + ric.setState(Ric.RicState.NOT_REACHABLE); + logger.info("Unable to connect to ric {}. Cause: {}", ric.name(), t.getMessage()); + } + private void setRicToActive(Ric ric) { + ric.setState(RicState.ACTIVE); } }