Merge "Changed in config will add and recover Rics"
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / tasks / RicRecoveryTask.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2019 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oransc.policyagent.tasks;
22
23 import java.util.Collection;
24 import java.util.Vector;
25
26 import org.oransc.policyagent.clients.A1Client;
27 import org.oransc.policyagent.clients.AsyncRestClient;
28 import org.oransc.policyagent.exceptions.ServiceException;
29 import org.oransc.policyagent.repository.ImmutablePolicyType;
30 import org.oransc.policyagent.repository.Policies;
31 import org.oransc.policyagent.repository.Policy;
32 import org.oransc.policyagent.repository.PolicyType;
33 import org.oransc.policyagent.repository.PolicyTypes;
34 import org.oransc.policyagent.repository.Ric;
35 import org.oransc.policyagent.repository.Rics;
36 import org.oransc.policyagent.repository.Service;
37 import org.oransc.policyagent.repository.Services;
38 import org.slf4j.Logger;
39 import org.slf4j.LoggerFactory;
40
41 import reactor.core.publisher.Flux;
42 import reactor.core.publisher.Mono;
43
44 /**
45  * Loads information about RealTime-RICs at startup.
46  */
47 public class RicRecoveryTask {
48
49     private static final Logger logger = LoggerFactory.getLogger(RicRecoveryTask.class);
50
51     private final A1Client a1Client;
52     private final PolicyTypes policyTypes;
53     private final Policies policies;
54     private final Services services;
55
56     public RicRecoveryTask(A1Client a1Client, PolicyTypes policyTypes, Policies policies, Services services) {
57         this.a1Client = a1Client;
58         this.policyTypes = policyTypes;
59         this.policies = policies;
60         this.services = services;
61     }
62
63     public void run(Rics rics) {
64         synchronized (rics) {
65             for (Ric ric : rics.getRics()) {
66                 run(ric);
67             }
68         }
69     }
70
71     public void run(Ric ric) {
72         logger.debug("Handling ric: {}", ric.getConfig().name());
73
74         synchronized (ric) {
75             if (ric.state().equals(Ric.RicState.RECOVERING)) {
76                 return; // Already running
77             }
78             ric.setState(Ric.RicState.RECOVERING);
79         }
80         Flux<PolicyType> recoveredTypes = recoverPolicyTypes(ric);
81         Flux<?> deletedPolicies = deletePolicies(ric);
82
83         Flux.merge(recoveredTypes, deletedPolicies) //
84             .subscribe(x -> logger.debug("Recover: " + x), //
85                 throwable -> onError(ric, throwable), //
86                 () -> onComplete(ric));
87     }
88
89     private void onComplete(Ric ric) {
90         logger.debug("Recovery completed for:" + ric.name());
91         ric.setState(Ric.RicState.IDLE);
92         notifyAllServices("Recovery completed for:" + ric.name());
93     }
94
95     private void notifyAllServices(String body) {
96         synchronized (services) {
97             for (Service service : services.getAll()) {
98                 String url = service.getCallbackUrl();
99                 if (service.getCallbackUrl().length() > 0) {
100                     createClient(url) //
101                         .put("", body) //
102                         .subscribe(rsp -> logger.debug("Service called"),
103                             throwable -> logger.warn("Service called failed", throwable),
104                             () -> logger.debug("Service called complete"));
105                 }
106             }
107         }
108     }
109
110     private void onError(Ric ric, Throwable t) {
111         logger.warn("Recovery failed for: {}, reason: {}", ric.name(), t.getMessage());
112         ric.setState(Ric.RicState.UNDEFINED);
113     }
114
115     private AsyncRestClient createClient(final String url) {
116         return new AsyncRestClient(url);
117     }
118
119     private Flux<PolicyType> recoverPolicyTypes(Ric ric) {
120         ric.clearSupportedPolicyTypes();
121         return a1Client.getPolicyTypeIdentities(ric.getConfig().baseUrl()) //
122             .flatMapMany(types -> Flux.fromIterable(types)) //
123             .doOnNext(typeId -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), typeId)) //
124             .flatMap((policyTypeId) -> getPolicyType(ric, policyTypeId)) //
125             .doOnNext(policyType -> ric.addSupportedPolicyType(policyType)); //
126     }
127
128     private Mono<PolicyType> getPolicyType(Ric ric, String policyTypeId) {
129         if (policyTypes.contains(policyTypeId)) {
130             try {
131                 return Mono.just(policyTypes.getType(policyTypeId));
132             } catch (ServiceException e) {
133                 return Mono.error(e);
134             }
135         }
136         return a1Client.getPolicyType(ric.getConfig().baseUrl(), policyTypeId) //
137             .flatMap(schema -> createPolicyType(policyTypeId, schema));
138     }
139
140     private Mono<PolicyType> createPolicyType(String policyTypeId, String schema) {
141         PolicyType pt = ImmutablePolicyType.builder().name(policyTypeId).schema(schema).build();
142         policyTypes.put(pt);
143         return Mono.just(pt);
144     }
145
146     private Flux<String> deletePolicies(Ric ric) {
147         synchronized (policies) {
148             Collection<Policy> ricPolicies = new Vector<>(policies.getForRic(ric.name()));
149             for (Policy policy : ricPolicies) {
150                 this.policies.remove(policy);
151             }
152         }
153
154         return a1Client.getPolicyIdentities(ric.getConfig().baseUrl()) //
155             .flatMapMany(policyIds -> Flux.fromIterable(policyIds)) //
156             .doOnNext(policyId -> logger.debug("Deleting policy: {}, for ric: {}", policyId, ric.getConfig().name()))
157             .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)); //
158     }
159 }