import org.oransc.policyagent.repository.PolicyType;
import org.oransc.policyagent.repository.PolicyTypes;
import org.oransc.policyagent.repository.Ric;
+import org.oransc.policyagent.repository.Rics;
import org.oransc.policyagent.repository.Service;
import org.oransc.policyagent.repository.Services;
import org.slf4j.Logger;
this.services = services;
}
- public void run(Collection<Ric> rics) {
- for (Ric ric : rics) {
- run(ric);
+ public void run(Rics rics) {
+ synchronized (rics) {
+ for (Ric ric : rics.getRics()) {
+ run(ric);
+ }
}
}
private void onComplete(Ric ric) {
logger.debug("Recovery completed for:" + ric.name());
- ric.setState(Ric.RicState.ACTIVE);
+ ric.setState(Ric.RicState.IDLE);
notifyAllServices("Recovery completed for:" + ric.name());
}
private void notifyAllServices(String body) {
- for (Service service : services.getAll()) {
- String url = service.getCallbackUrl();
- if (service.getCallbackUrl().length() > 0) {
- createClient(url) //
- .put("", body) //
- .subscribe(rsp -> logger.debug("Service called"),
- throwable -> logger.warn("Service called failed", throwable),
- () -> logger.debug("Service called complete"));
+ synchronized (services) {
+ for (Service service : services.getAll()) {
+ String url = service.getCallbackUrl();
+ if (service.getCallbackUrl().length() > 0) {
+ createClient(url) //
+ .put("", body) //
+ .subscribe(rsp -> logger.debug("Service called"),
+ throwable -> logger.warn("Service called failed", throwable),
+ () -> logger.debug("Service called complete"));
+ }
}
}
}
private void onError(Ric ric, Throwable t) {
logger.warn("Recovery failed for: {}, reason: {}", ric.name(), t.getMessage());
- ric.setState(Ric.RicState.NOT_REACHABLE);
+ ric.setState(Ric.RicState.UNDEFINED);
}
private AsyncRestClient createClient(final String url) {
ric.clearSupportedPolicyTypes();
return a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) //
.flatMapMany(types -> Flux.fromIterable(types)) //
- .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId))
+ .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId)) //
.flatMap((policyTypeId) -> getPolicyType(ric, policyTypeId)) //
.doOnNext(policyType -> ric.addSupportedPolicyType(policyType)); //
}
}
private Flux<String> deletePolicies(Ric ric) {
- Collection<Policy> ricPolicies = new Vector<>(policies.getForRic(ric.name()));
- for (Policy policy : ricPolicies) {
- this.policies.remove(policy);
+ synchronized (policies) {
+ Collection<Policy> ricPolicies = new Vector<>(policies.getForRic(ric.name()));
+ for (Policy policy : ricPolicies) {
+ this.policies.remove(policy);
+ }
}
return a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) //