Merge "Recovery handling"
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / tasks / RicRecoveryTask.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2019 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oransc.policyagent.tasks;
22
23 import java.util.Collection;
24 import java.util.Vector;
25
26 import org.oransc.policyagent.clients.A1Client;
27 import org.oransc.policyagent.exceptions.ServiceException;
28 import org.oransc.policyagent.repository.ImmutablePolicyType;
29 import org.oransc.policyagent.repository.Policies;
30 import org.oransc.policyagent.repository.Policy;
31 import org.oransc.policyagent.repository.PolicyType;
32 import org.oransc.policyagent.repository.PolicyTypes;
33 import org.oransc.policyagent.repository.Ric;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36
37 import reactor.core.publisher.Flux;
38 import reactor.core.publisher.Mono;
39
40 /**
41  * Loads information about RealTime-RICs at startup.
42  */
43 public class RicRecoveryTask {
44
45     private static final Logger logger = LoggerFactory.getLogger(RicRecoveryTask.class);
46
47     private final A1Client a1Client;
48     private final PolicyTypes policyTypes;
49     private final Policies policies;
50
51     public RicRecoveryTask(A1Client a1Client, PolicyTypes policyTypes, Policies policies) {
52         this.a1Client = a1Client;
53         this.policyTypes = policyTypes;
54         this.policies = policies;
55     }
56
57     public void run(Collection<Ric> rics) {
58         for (Ric ric : rics) {
59             run(ric);
60         }
61     }
62
63     public void run(Ric ric) {
64         logger.debug("Handling ric: {}", ric.getConfig().name());
65
66         synchronized (ric) {
67             if (ric.state().equals(Ric.RicState.RECOVERING)) {
68                 return; // Already running
69             }
70             ric.setState(Ric.RicState.RECOVERING);
71         }
72         Flux<PolicyType> recoveredTypes = recoverPolicyTypes(ric);
73         Flux<?> deletedPolicies = deletePolicies(ric);
74
75         Flux.merge(recoveredTypes, deletedPolicies) //
76             .subscribe(x -> logger.debug("Recover: " + x), //
77                 throwable -> onError(ric, throwable), //
78                 () -> onComplete(ric));
79     }
80
81     private void onComplete(Ric ric) {
82         logger.debug("Recovery completed for:" + ric.name());
83         ric.setState(Ric.RicState.ACTIVE);
84
85     }
86
87     private void onError(Ric ric, Throwable t) {
88         logger.debug("Recovery failed for: {}, reason: {}", ric.name(), t.getMessage());
89         ric.setState(Ric.RicState.NOT_REACHABLE);
90     }
91
92     private Flux<PolicyType> recoverPolicyTypes(Ric ric) {
93         ric.clearSupportedPolicyTypes();
94         return a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) //
95             .flatMapMany(types -> Flux.fromIterable(types)) //
96             .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId))
97             .flatMap((policyTypeId) -> getPolicyType(ric, policyTypeId)) //
98             .doOnNext(policyType -> ric.addSupportedPolicyType(policyType)); //
99     }
100
101     private Mono<PolicyType> getPolicyType(Ric ric, String policyTypeId) {
102         if (policyTypes.contains(policyTypeId)) {
103             try {
104                 return Mono.just(policyTypes.getType(policyTypeId));
105             } catch (ServiceException e) {
106                 return Mono.error(e);
107             }
108         }
109         return a1Client.getPolicyType(ric.getConfig().baseUrl(), policyTypeId) //
110             .flatMap(schema -> createPolicyType(policyTypeId, schema));
111     }
112
113     private Mono<PolicyType> createPolicyType(String policyTypeId, String schema) {
114         PolicyType pt = ImmutablePolicyType.builder().name(policyTypeId).schema(schema).build();
115         policyTypes.put(pt);
116         return Mono.just(pt);
117     }
118
119     private Flux<String> deletePolicies(Ric ric) {
120         Collection<Policy> ricPolicies = new Vector<>(policies.getForRic(ric.name()));
121         for (Policy policy : ricPolicies) {
122             this.policies.remove(policy);
123         }
124
125         return a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) //
126             .flatMapMany(policyIds -> Flux.fromIterable(policyIds)) //
127             .doOnNext(policyId -> logger.debug("Deleting policy: {}, for ric: {}", policyId, ric.getConfig().name()))
128             .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)); //
129     }
130 }