Merge "Remove using of DMAAP client from ONAP"
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / tasks / RicSupervision.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2020 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oransc.policyagent.tasks;
22
23 import java.util.Collection;
24
25 import org.oransc.policyagent.clients.A1Client;
26 import org.oransc.policyagent.clients.A1ClientFactory;
27 import org.oransc.policyagent.exceptions.ServiceException;
28 import org.oransc.policyagent.repository.Lock.LockType;
29 import org.oransc.policyagent.repository.Policies;
30 import org.oransc.policyagent.repository.PolicyTypes;
31 import org.oransc.policyagent.repository.Ric;
32 import org.oransc.policyagent.repository.Ric.RicState;
33 import org.oransc.policyagent.repository.Rics;
34 import org.oransc.policyagent.repository.Services;
35 import org.slf4j.Logger;
36 import org.slf4j.LoggerFactory;
37 import org.springframework.beans.factory.annotation.Autowired;
38 import org.springframework.scheduling.annotation.EnableScheduling;
39 import org.springframework.scheduling.annotation.Scheduled;
40 import org.springframework.stereotype.Component;
41
42 import reactor.core.publisher.Flux;
43 import reactor.core.publisher.Mono;
44
45 /**
46  * Regularly checks the existing rics towards the local repository to keep it
47  * consistent. When the policy types or instances in the Near-RT RIC is not
48  * consistent, a synchronization is performed.
49  */
50 @Component
51 @EnableScheduling
52 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
53 public class RicSupervision {
54     private static final Logger logger = LoggerFactory.getLogger(RicSupervision.class);
55
56     private final Rics rics;
57     private final Policies policies;
58     private final PolicyTypes policyTypes;
59     private final A1ClientFactory a1ClientFactory;
60     private final Services services;
61
62     private static class SynchStartedException extends ServiceException {
63         private static final long serialVersionUID = 1L;
64
65         public SynchStartedException(String message) {
66             super(message);
67         }
68     }
69
70     private static class RicData {
71         RicData(Ric ric, A1Client a1Client) {
72             this.ric = ric;
73             this.a1Client = a1Client;
74         }
75
76         A1Client getClient() {
77             return a1Client;
78         }
79
80         final Ric ric;
81         private final A1Client a1Client;
82     }
83
84     @Autowired
85     public RicSupervision(Rics rics, Policies policies, A1ClientFactory a1ClientFactory, PolicyTypes policyTypes,
86         Services services) {
87         this.rics = rics;
88         this.policies = policies;
89         this.a1ClientFactory = a1ClientFactory;
90         this.policyTypes = policyTypes;
91         this.services = services;
92     }
93
94     /**
95      * Regularly contacts all Rics to check if they are alive and synchronized.
96      */
97     @Scheduled(fixedRate = 1000 * 60)
98     public void checkAllRics() {
99         logger.debug("Checking Rics starting");
100         createTask().subscribe(null, null, () -> logger.debug("Checking all RICs completed"));
101     }
102
103     private Flux<RicData> createTask() {
104         return Flux.fromIterable(rics.getRics()) //
105             .flatMap(this::createRicData) //
106             .flatMap(this::checkOneRic);
107     }
108
109     private Mono<RicData> checkOneRic(RicData ricData) {
110         return checkRicState(ricData) //
111             .flatMap(x -> ricData.ric.getLock().lock(LockType.EXCLUSIVE)) //
112             .flatMap(notUsed -> setRicState(ricData)) //
113             .flatMap(x -> checkRicPolicies(ricData)) //
114             .flatMap(x -> checkRicPolicyTypes(ricData)) //
115             .doOnNext(x -> onRicCheckedOk(ricData)) //
116             .doOnError(t -> onRicCheckedError(t, ricData)) //
117             .onErrorResume(throwable -> Mono.empty());
118     }
119
120     private void onRicCheckedError(Throwable t, RicData ricData) {
121         logger.debug("Ric: {} check stopped, exception: {}", ricData.ric.name(), t.getMessage());
122         if (t instanceof SynchStartedException) {
123             // this is just a temporary state,
124             ricData.ric.setState(RicState.AVAILABLE);
125         } else {
126             ricData.ric.setState(RicState.UNAVAILABLE);
127         }
128         ricData.ric.getLock().unlockBlocking();
129     }
130
131     private void onRicCheckedOk(RicData ricData) {
132         logger.debug("Ric: {} checked OK", ricData.ric.name());
133         ricData.ric.setState(RicState.AVAILABLE);
134         ricData.ric.getLock().unlockBlocking();
135     }
136
137     @SuppressWarnings("squid:S2445") // Blocks should be synchronized on "private final" fields
138     private Mono<RicData> setRicState(RicData ric) {
139         synchronized (ric) {
140             if (ric.ric.getState() == RicState.CONSISTENCY_CHECK) {
141                 logger.debug("Ric: {} is already being checked", ric.ric.getConfig().name());
142                 return Mono.empty();
143             }
144             ric.ric.setState(RicState.CONSISTENCY_CHECK);
145             return Mono.just(ric);
146         }
147     }
148
149     private Mono<RicData> createRicData(Ric ric) {
150         return Mono.just(ric) //
151             .flatMap(aRic -> this.a1ClientFactory.createA1Client(ric)) //
152             .flatMap(a1Client -> Mono.just(new RicData(ric, a1Client)));
153     }
154
155     private Mono<RicData> checkRicState(RicData ric) {
156         if (ric.ric.getState() == RicState.UNAVAILABLE) {
157             return startSynchronization(ric) //
158                 .onErrorResume(t -> Mono.empty());
159         } else if (ric.ric.getState() == RicState.SYNCHRONIZING || ric.ric.getState() == RicState.CONSISTENCY_CHECK) {
160             return Mono.empty();
161         } else {
162             return Mono.just(ric);
163         }
164     }
165
166     private Mono<RicData> checkRicPolicies(RicData ric) {
167         return ric.getClient().getPolicyIdentities() //
168             .flatMap(ricP -> validateInstances(ricP, ric));
169     }
170
171     private Mono<RicData> validateInstances(Collection<String> ricPolicies, RicData ric) {
172         synchronized (this.policies) {
173             if (ricPolicies.size() != policies.getForRic(ric.ric.name()).size()) {
174                 return startSynchronization(ric);
175             }
176
177             for (String policyId : ricPolicies) {
178                 if (!policies.containsPolicy(policyId)) {
179                     return startSynchronization(ric);
180                 }
181             }
182             return Mono.just(ric);
183         }
184     }
185
186     private Mono<RicData> checkRicPolicyTypes(RicData ric) {
187         return ric.getClient().getPolicyTypeIdentities() //
188             .flatMap(ricTypes -> validateTypes(ricTypes, ric));
189     }
190
191     private Mono<RicData> validateTypes(Collection<String> ricTypes, RicData ric) {
192         if (ricTypes.size() != ric.ric.getSupportedPolicyTypes().size()) {
193             return startSynchronization(ric);
194         }
195         for (String typeName : ricTypes) {
196             if (!ric.ric.isSupportingType(typeName)) {
197                 return startSynchronization(ric);
198             }
199         }
200         return Mono.just(ric);
201     }
202
203     private Mono<RicData> startSynchronization(RicData ric) {
204         RicSynchronizationTask synchronizationTask = createSynchronizationTask();
205         synchronizationTask.run(ric.ric);
206         return Mono.error(new SynchStartedException("Syncronization started"));
207     }
208
209     RicSynchronizationTask createSynchronizationTask() {
210         return new RicSynchronizationTask(a1ClientFactory, policyTypes, policies, services);
211     }
212 }