Added callback to R-APPS invoked after RIC recovery
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / tasks / RicRecoveryTask.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2019 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oransc.policyagent.tasks;
22
23 import java.util.Collection;
24 import java.util.Vector;
25
26 import org.oransc.policyagent.clients.A1Client;
27 import org.oransc.policyagent.clients.AsyncRestClient;
28 import org.oransc.policyagent.exceptions.ServiceException;
29 import org.oransc.policyagent.repository.ImmutablePolicyType;
30 import org.oransc.policyagent.repository.Policies;
31 import org.oransc.policyagent.repository.Policy;
32 import org.oransc.policyagent.repository.PolicyType;
33 import org.oransc.policyagent.repository.PolicyTypes;
34 import org.oransc.policyagent.repository.Ric;
35 import org.oransc.policyagent.repository.Service;
36 import org.oransc.policyagent.repository.Services;
37 import org.slf4j.Logger;
38 import org.slf4j.LoggerFactory;
39
40 import reactor.core.publisher.Flux;
41 import reactor.core.publisher.Mono;
42
43 /**
44  * Loads information about RealTime-RICs at startup.
45  */
46 public class RicRecoveryTask {
47
48     private static final Logger logger = LoggerFactory.getLogger(RicRecoveryTask.class);
49
50     private final A1Client a1Client;
51     private final PolicyTypes policyTypes;
52     private final Policies policies;
53     private final Services services;
54
55     public RicRecoveryTask(A1Client a1Client, PolicyTypes policyTypes, Policies policies, Services services) {
56         this.a1Client = a1Client;
57         this.policyTypes = policyTypes;
58         this.policies = policies;
59         this.services = services;
60     }
61
62     public void run(Collection<Ric> rics) {
63         for (Ric ric : rics) {
64             run(ric);
65         }
66     }
67
68     public void run(Ric ric) {
69         logger.debug("Handling ric: {}", ric.getConfig().name());
70
71         synchronized (ric) {
72             if (ric.state().equals(Ric.RicState.RECOVERING)) {
73                 return; // Already running
74             }
75             ric.setState(Ric.RicState.RECOVERING);
76         }
77         Flux<PolicyType> recoveredTypes = recoverPolicyTypes(ric);
78         Flux<?> deletedPolicies = deletePolicies(ric);
79
80         Flux.merge(recoveredTypes, deletedPolicies) //
81             .subscribe(x -> logger.debug("Recover: " + x), //
82                 throwable -> onError(ric, throwable), //
83                 () -> onComplete(ric));
84     }
85
86     private void onComplete(Ric ric) {
87         logger.debug("Recovery completed for:" + ric.name());
88         ric.setState(Ric.RicState.ACTIVE);
89         notifyAllServices("Recovery completed for:" + ric.name());
90     }
91
92     private void notifyAllServices(String body) {
93         for (Service service : services.getAll()) {
94             String url = service.getCallbackUrl();
95             if (service.getCallbackUrl().length() > 0) {
96                 createClient(url) //
97                     .put("", body) //
98                     .subscribe(rsp -> logger.debug("Service called"),
99                         throwable -> logger.warn("Service called failed", throwable),
100                         () -> logger.debug("Service called complete"));
101             }
102         }
103     }
104
105     private void onError(Ric ric, Throwable t) {
106         logger.warn("Recovery failed for: {}, reason: {}", ric.name(), t.getMessage());
107         ric.setState(Ric.RicState.NOT_REACHABLE);
108     }
109
110     private AsyncRestClient createClient(final String url) {
111         return new AsyncRestClient(url);
112     }
113
114     private Flux<PolicyType> recoverPolicyTypes(Ric ric) {
115         ric.clearSupportedPolicyTypes();
116         return a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) //
117             .flatMapMany(types -> Flux.fromIterable(types)) //
118             .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId))
119             .flatMap((policyTypeId) -> getPolicyType(ric, policyTypeId)) //
120             .doOnNext(policyType -> ric.addSupportedPolicyType(policyType)); //
121     }
122
123     private Mono<PolicyType> getPolicyType(Ric ric, String policyTypeId) {
124         if (policyTypes.contains(policyTypeId)) {
125             try {
126                 return Mono.just(policyTypes.getType(policyTypeId));
127             } catch (ServiceException e) {
128                 return Mono.error(e);
129             }
130         }
131         return a1Client.getPolicyType(ric.getConfig().baseUrl(), policyTypeId) //
132             .flatMap(schema -> createPolicyType(policyTypeId, schema));
133     }
134
135     private Mono<PolicyType> createPolicyType(String policyTypeId, String schema) {
136         PolicyType pt = ImmutablePolicyType.builder().name(policyTypeId).schema(schema).build();
137         policyTypes.put(pt);
138         return Mono.just(pt);
139     }
140
141     private Flux<String> deletePolicies(Ric ric) {
142         Collection<Policy> ricPolicies = new Vector<>(policies.getForRic(ric.name()));
143         for (Policy policy : ricPolicies) {
144             this.policies.remove(policy);
145         }
146
147         return a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) //
148             .flatMapMany(policyIds -> Flux.fromIterable(policyIds)) //
149             .doOnNext(policyId -> logger.debug("Deleting policy: {}, for ric: {}", policyId, ric.getConfig().name()))
150             .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)); //
151     }
152 }