Changed in config will add and recover Rics
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / tasks / RepositorySupervision.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2019 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oransc.policyagent.tasks;
22
23 import java.util.Collection;
24
25 import org.oransc.policyagent.clients.A1Client;
26 import org.oransc.policyagent.repository.Policies;
27 import org.oransc.policyagent.repository.PolicyTypes;
28 import org.oransc.policyagent.repository.Ric;
29 import org.oransc.policyagent.repository.Ric.RicState;
30 import org.oransc.policyagent.repository.Rics;
31 import org.oransc.policyagent.repository.Services;
32 import org.slf4j.Logger;
33 import org.slf4j.LoggerFactory;
34 import org.springframework.beans.factory.annotation.Autowired;
35 import org.springframework.scheduling.annotation.EnableScheduling;
36 import org.springframework.scheduling.annotation.Scheduled;
37 import org.springframework.stereotype.Component;
38
39 import reactor.core.publisher.Flux;
40 import reactor.core.publisher.Mono;
41
42 /**
43  * Regularly checks the exisiting rics towards the local repository to keep it consistent.
44  */
45 @Component
46 @EnableScheduling
47 public class RepositorySupervision {
48     private static final Logger logger = LoggerFactory.getLogger(RepositorySupervision.class);
49
50     private final Rics rics;
51     private final Policies policies;
52     private final PolicyTypes policyTypes;
53     private final A1Client a1Client;
54     private final Services services;
55
56     @Autowired
57     public RepositorySupervision(Rics rics, Policies policies, A1Client a1Client, PolicyTypes policyTypes,
58         Services services) {
59         this.rics = rics;
60         this.policies = policies;
61         this.a1Client = a1Client;
62         this.policyTypes = policyTypes;
63         this.services = services;
64     }
65
66     /**
67      * Regularly contacts all Rics to check if they are alive.
68      */
69     @Scheduled(fixedRate = 1000 * 60)
70     public void checkAllRics() {
71         logger.debug("Checking Rics starting");
72         createTask().subscribe(this::onRicChecked, this::onError, this::onComplete);
73     }
74
75     private Flux<Ric> createTask() {
76         synchronized (this.rics) {
77             return Flux.fromIterable(rics.getRics()) //
78                 .flatMap(ric -> checkRicState(ric)) //
79                 .flatMap(ric -> checkRicPolicies(ric)) //
80                 .flatMap(ric -> checkRicPolicyTypes(ric));
81         }
82     }
83
84     private Mono<Ric> checkRicState(Ric ric) {
85         if (ric.state() == RicState.UNDEFINED) {
86             return startRecovery(ric);
87         } else if (ric.state() == RicState.RECOVERING) {
88             return Mono.empty();
89         } else {
90             return Mono.just(ric);
91         }
92     }
93
94     private Mono<Ric> checkRicPolicies(Ric ric) {
95         return a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) //
96             .onErrorResume(t -> Mono.empty()) //
97             .flatMap(ricP -> validateInstances(ricP, ric));
98     }
99
100     private Mono<Ric> validateInstances(Collection<String> ricPolicies, Ric ric) {
101         synchronized (this.policies) {
102             if (ricPolicies.size() != policies.getForRic(ric.name()).size()) {
103                 return startRecovery(ric);
104             }
105         }
106         for (String policyId : ricPolicies) {
107             if (!policies.containsPolicy(policyId)) {
108                 return startRecovery(ric);
109             }
110         }
111         return Mono.just(ric);
112     }
113
114     private Mono<Ric> checkRicPolicyTypes(Ric ric) {
115         return a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) //
116             .onErrorResume(t -> {
117                 return Mono.empty();
118             }) //
119             .flatMap(ricTypes -> validateTypes(ricTypes, ric));
120     }
121
122     private Mono<Ric> validateTypes(Collection<String> ricTypes, Ric ric) {
123         if (ricTypes.size() != ric.getSupportedPolicyTypes().size()) {
124             return startRecovery(ric);
125         }
126         for (String typeName : ricTypes) {
127             if (!ric.isSupportingType(typeName)) {
128                 return startRecovery(ric);
129             }
130         }
131         return Mono.just(ric);
132     }
133
134     private Mono<Ric> startRecovery(Ric ric) {
135         RicRecoveryTask recovery = new RicRecoveryTask(a1Client, policyTypes, policies, services);
136         recovery.run(ric);
137         return Mono.empty();
138     }
139
140     private void onRicChecked(Ric ric) {
141         logger.info("Ric: " + ric.name() + " checked");
142     }
143
144     private void onError(Throwable t) {
145         logger.error("Rics supervision failed", t);
146     }
147
148     private void onComplete() {
149         logger.debug("Checking Rics completed");
150     }
151
152 }