+ ric.setState(Ric.RicState.IDLE);
+ notifyAllServices("Recovery completed for:" + ric.name());
+ }
+
+ private void notifyAllServices(String body) {
+ synchronized (services) {
+ for (Service service : services.getAll()) {
+ String url = service.getCallbackUrl();
+ if (service.getCallbackUrl().length() > 0) {
+ createClient(url) //
+ .put("", body) //
+ .subscribe(rsp -> logger.debug("Service called"),
+ throwable -> logger.warn("Service called failed", throwable),
+ () -> logger.debug("Service called complete"));
+ }
+ }
+ }
+ }
+
+ private void onRecoveryError(Ric ric, Throwable t) {
+ logger.warn("Recovery failed for: {}, reason: {}", ric.name(), t.getMessage());
+ // If recovery fails, try to remove all instances
+ deleteAllPolicies(ric);
+ Flux<PolicyType> recoverTypes = this.a1ClientFactory.createA1Client(ric) //
+ .flatMapMany(a1Client -> recoverPolicyTypes(ric, a1Client));
+ Flux<?> deletePoliciesInRic = this.a1ClientFactory.createA1Client(ric) //
+ .flatMapMany(a1Client -> a1Client.deleteAllPolicies());
+
+ Flux.merge(recoverTypes, deletePoliciesInRic) //
+ .subscribe(x -> logger.debug("Brute recover: " + x), //
+ throwable -> onRemoveAllError(ric, throwable), //
+ () -> onRecoveryComplete(ric));
+ }