Merge "Make StartupService use asynchronous client"
[nonrtric.git] / policy-agent / src / main / java / org / oransc / policyagent / tasks / StartupService.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2019 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oransc.policyagent.tasks;
22
23 import com.google.gson.Gson;
24 import com.google.gson.GsonBuilder;
25
26 import org.oransc.policyagent.clients.A1Client;
27 import org.oransc.policyagent.configuration.ApplicationConfig;
28 import org.oransc.policyagent.repository.ImmutablePolicyType;
29 import org.oransc.policyagent.repository.PolicyType;
30 import org.oransc.policyagent.repository.PolicyTypes;
31 import org.oransc.policyagent.repository.Ric;
32 import org.oransc.policyagent.repository.Ric.RicState;
33 import org.oransc.policyagent.repository.Rics;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36 import org.springframework.beans.factory.annotation.Autowired;
37 import org.springframework.stereotype.Service;
38
39 import reactor.core.publisher.Flux;
40 import reactor.core.publisher.Mono;
41
42 /**
43  * Loads information about RealTime-RICs at startup.
44  */
45 @Service("startupService")
46 public class StartupService {
47
48     private static Gson gson = new GsonBuilder() //
49         .serializeNulls() //
50         .create(); //
51
52     private static final Logger logger = LoggerFactory.getLogger(StartupService.class);
53
54     @Autowired
55     ApplicationConfig applicationConfig;
56
57     @Autowired
58     private Rics rics;
59
60     @Autowired
61     PolicyTypes policyTypes;
62
63     @Autowired
64     private A1Client a1Client;
65
66     StartupService(ApplicationConfig appConfig, Rics rics, PolicyTypes policyTypes, A1Client a1Client) {
67         this.applicationConfig = appConfig;
68         this.rics = rics;
69         this.policyTypes = policyTypes;
70         this.a1Client = a1Client;
71     }
72
73     /**
74      * Reads the configured Rics and performs the service discovery. The result is put into the repository.
75      */
76     public void startup() {
77         applicationConfig.initialize();
78         Flux.fromIterable(applicationConfig.getRicConfigs()) //
79             .map(ricConfig -> new Ric(ricConfig)) //
80             .doOnNext(ric -> logger.debug("Handling ric: {}", ric.getConfig().name())).flatMap(this::handlePolicyTypes)
81             .flatMap(this::setRicToActive) //
82             .flatMap(this::addRicToRepo) //
83             .subscribe();
84     }
85
86     private Mono<Ric> handlePolicyTypes(Ric ric) {
87         a1Client.getAllPolicyTypes(ric.getConfig().baseUrl()) //
88             .map(policyTypeString -> gson.fromJson(policyTypeString, ImmutablePolicyType.class)) //
89             .doOnNext(type -> logger.debug("For ric: {}, handling type: {}", ric.getConfig().name(), type.name()))
90             .flatMap(this::addTypeToRepo) //
91             .flatMap(type -> addTypeToRic(ric, type)) //
92             .flatMap(type -> deletePoliciesForType(ric, type)) //
93             .subscribe();
94         return Mono.just(ric);
95     }
96
97     private Mono<PolicyType> addTypeToRepo(PolicyType policyType) {
98         if (!policyTypes.contains(policyType)) {
99             policyTypes.put(policyType);
100         }
101         return Mono.just(policyType);
102     }
103
104     private Mono<PolicyType> addTypeToRic(Ric ric, PolicyType policyType) {
105         ric.addSupportedPolicyType(policyType);
106         return Mono.just(policyType);
107     }
108
109     private Mono<Void> deletePoliciesForType(Ric ric, PolicyType policyType) {
110         a1Client.getPoliciesForType(ric.getConfig().baseUrl(), policyType.name()) //
111             .doOnNext(policyId -> logger.debug("deleting policy: {}, for ric: {}", policyId, ric.getConfig().name())) //
112             .flatMap(policyId -> a1Client.deletePolicy(ric.getConfig().baseUrl(), policyId)) //
113             .subscribe();
114
115         return Mono.empty();
116     }
117
118     private Mono<Ric> setRicToActive(Ric ric) {
119         ric.setState(RicState.ACTIVE);
120
121         return Mono.just(ric);
122     }
123
124     private Mono<Void> addRicToRepo(Ric ric) {
125         rics.put(ric);
126
127         return Mono.empty();
128     }
129 }