2 * ========================LICENSE_START=================================
5 * Copyright (C) 2019 Nordix Foundation
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ========================LICENSE_END===================================
21 package org.oransc.policyagent.tasks;
23 import static org.oransc.policyagent.repository.Ric.RicState;
25 import java.util.Collection;
26 import java.util.Vector;
28 import org.oransc.policyagent.clients.A1Client;
29 import org.oransc.policyagent.clients.A1ClientFactory;
30 import org.oransc.policyagent.clients.AsyncRestClient;
31 import org.oransc.policyagent.repository.ImmutablePolicyType;
32 import org.oransc.policyagent.repository.Policies;
33 import org.oransc.policyagent.repository.Policy;
34 import org.oransc.policyagent.repository.PolicyType;
35 import org.oransc.policyagent.repository.PolicyTypes;
36 import org.oransc.policyagent.repository.Ric;
37 import org.oransc.policyagent.repository.Service;
38 import org.oransc.policyagent.repository.Services;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
42 import reactor.core.publisher.Flux;
43 import reactor.core.publisher.Mono;
46 * Synchronizes the content of a RIC with the content in the repository.
48 * - load all policy types
49 * - send all policy instances to the RIC
50 * --- if that fails remove all policy instances
51 * - Notify subscribing services
53 public class RicSynchronizationTask {
55 private static final Logger logger = LoggerFactory.getLogger(RicSynchronizationTask.class);
57 private final A1ClientFactory a1ClientFactory;
58 private final PolicyTypes policyTypes;
59 private final Policies policies;
60 private final Services services;
62 public RicSynchronizationTask(A1ClientFactory a1ClientFactory, PolicyTypes policyTypes, Policies policies,
64 this.a1ClientFactory = a1ClientFactory;
65 this.policyTypes = policyTypes;
66 this.policies = policies;
67 this.services = services;
70 @SuppressWarnings("squid:S2629")
71 public void run(Ric ric) {
72 logger.debug("Handling ric: {}", ric.getConfig().name());
75 if (ric.getState() == RicState.SYNCHRONIZING) {
76 logger.debug("Ric: {} is already being synchronized", ric.getConfig().name());
79 ric.setState(RicState.SYNCHRONIZING);
81 this.a1ClientFactory.createA1Client(ric)//
82 .flatMapMany(client -> startSynchronization(ric, client)) //
83 .subscribe(x -> logger.debug("Synchronize: {}", x), //
84 throwable -> onSynchronizationError(ric, throwable), //
85 () -> onSynchronizationComplete(ric));
88 private Flux<Object> startSynchronization(Ric ric, A1Client a1Client) {
89 Flux<PolicyType> recoverTypes = synchronizePolicyTypes(ric, a1Client);
90 Collection<Policy> policiesForRic = policies.getForRic(ric.name());
91 Flux<?> policiesDeletedInRic = Flux.empty();
92 Flux<?> policiesRecreatedInRic = Flux.empty();
93 if (!policiesForRic.isEmpty()) {
94 policiesDeletedInRic = a1Client.deleteAllPolicies();
95 policiesRecreatedInRic = recreateAllPoliciesInRic(ric, a1Client);
97 return Flux.concat(recoverTypes, policiesDeletedInRic, policiesRecreatedInRic);
100 @SuppressWarnings("squid:S2629")
101 private void onSynchronizationComplete(Ric ric) {
102 logger.debug("Synchronization completed for: {}", ric.name());
103 ric.setState(RicState.IDLE);
104 notifyAllServices("Synchronization completed for:" + ric.name());
107 private void notifyAllServices(String body) {
108 synchronized (services) {
109 for (Service service : services.getAll()) {
110 String url = service.getCallbackUrl();
111 if (service.getCallbackUrl().length() > 0) {
112 createNotificationClient(url) //
115 notUsed -> logger.debug("Service {} notified", service.getName()), throwable -> logger
116 .warn("Service notification failed for service: {}", service.getName(), throwable),
117 () -> logger.debug("All services notified"));
123 @SuppressWarnings("squid:S2629")
124 private void onSynchronizationError(Ric ric, Throwable t) {
125 logger.warn("Synchronization failed for ric: {}, reason: {}", ric.name(), t.getMessage());
126 deleteAllPoliciesInRepository(ric);
128 Flux<PolicyType> typesRecoveredForRic = this.a1ClientFactory.createA1Client(ric) //
129 .flatMapMany(a1Client -> synchronizePolicyTypes(ric, a1Client));
131 // If recovery fails, try to remove all instances
132 Flux<?> policiesDeletedInRic = this.a1ClientFactory.createA1Client(ric) //
133 .flatMapMany(A1Client::deleteAllPolicies);
135 Flux.merge(typesRecoveredForRic, policiesDeletedInRic) //
136 .subscribe(x -> logger.debug("Brute recover: {}", x), //
137 throwable -> onRecoveryError(ric, throwable), //
138 () -> onSynchronizationComplete(ric));
141 @SuppressWarnings("squid:S2629")
142 private void onRecoveryError(Ric ric, Throwable t) {
143 logger.warn("Synchronization failure recovery failed for ric: {}, reason: {}", ric.name(), t.getMessage());
144 ric.setState(RicState.UNDEFINED);
147 AsyncRestClient createNotificationClient(final String url) {
148 return new AsyncRestClient(url);
151 private Flux<PolicyType> synchronizePolicyTypes(Ric ric, A1Client a1Client) {
152 ric.clearSupportedPolicyTypes();
153 return a1Client.getPolicyTypeIdentities() //
154 .flatMapMany(Flux::fromIterable) //
155 .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId)) //
156 .flatMap(policyTypeId -> getPolicyType(policyTypeId, a1Client)) //
157 .doOnNext(ric::addSupportedPolicyType); //
160 private Mono<PolicyType> getPolicyType(String policyTypeId, A1Client a1Client) {
161 if (policyTypes.contains(policyTypeId)) {
162 return Mono.just(policyTypes.get(policyTypeId));
164 return a1Client.getPolicyTypeSchema(policyTypeId) //
165 .flatMap(schema -> createPolicyType(policyTypeId, schema));
168 private Mono<PolicyType> createPolicyType(String policyTypeId, String schema) {
169 PolicyType pt = ImmutablePolicyType.builder().name(policyTypeId).schema(schema).build();
171 return Mono.just(pt);
174 private void deleteAllPoliciesInRepository(Ric ric) {
175 synchronized (policies) {
176 for (Policy policy : policies.getForRic(ric.name())) {
177 this.policies.remove(policy);
182 private Flux<String> recreateAllPoliciesInRic(Ric ric, A1Client a1Client) {
183 synchronized (policies) {
184 return Flux.fromIterable(new Vector<>(policies.getForRic(ric.name()))) //
186 policy -> logger.debug("Recreating policy: {}, for ric: {}", policy.id(), ric.getConfig().name()))
187 .flatMap(a1Client::putPolicy);