NONRTRIC - Implement DMaaP mediator producer service in Java
[nonrtric.git] / dmaap-adaptor-java / src / main / java / org / oran / dmaapadapter / tasks / ProducerRegstrationTask.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2021 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oran.dmaapadapter.tasks;
22
23 import com.google.gson.JsonParser;
24
25 import org.oran.dmaapadapter.clients.AsyncRestClient;
26 import org.oran.dmaapadapter.clients.AsyncRestClientFactory;
27 import org.oran.dmaapadapter.configuration.ApplicationConfig;
28 import org.oran.dmaapadapter.controllers.ProducerCallbacksController;
29 import org.oran.dmaapadapter.exceptions.ServiceException;
30 import org.oran.dmaapadapter.r1.ProducerInfoTypeInfo;
31 import org.oran.dmaapadapter.r1.ProducerRegistrationInfo;
32 import org.oran.dmaapadapter.repository.InfoType;
33 import org.oran.dmaapadapter.repository.InfoTypes;
34 import org.slf4j.Logger;
35 import org.slf4j.LoggerFactory;
36 import org.springframework.beans.factory.annotation.Autowired;
37 import org.springframework.scheduling.annotation.EnableScheduling;
38 import org.springframework.scheduling.annotation.Scheduled;
39 import org.springframework.stereotype.Component;
40
41 import reactor.core.publisher.Flux;
42 import reactor.core.publisher.Mono;
43
44 /**
45  * Registers the types and this producer in ECS. This is done when needed.
46  */
47 @Component
48 @EnableScheduling
49 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
50 public class ProducerRegstrationTask {
51
52     private static final Logger logger = LoggerFactory.getLogger(ProducerRegstrationTask.class);
53     private final AsyncRestClient restClient;
54     private final ApplicationConfig applicationConfig;
55     private final InfoTypes types;
56     private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().create();
57
58     private static final String PRODUCER_ID = "DmaapGenericInfoProducer";
59     private boolean isRegisteredInEcs = false;
60     private static final int REGISTRATION_SUPERVISION_INTERVAL_MS = 1000 * 5;
61
62     public ProducerRegstrationTask(@Autowired ApplicationConfig applicationConfig, @Autowired InfoTypes types) {
63         AsyncRestClientFactory restClientFactory = new AsyncRestClientFactory(applicationConfig.getWebClientConfig());
64         this.restClient = restClientFactory.createRestClientNoHttpProxy("");
65         this.applicationConfig = applicationConfig;
66         this.types = types;
67     }
68
69     @Scheduled(fixedRate = REGISTRATION_SUPERVISION_INTERVAL_MS)
70     public void supervisionTask() {
71         logger.debug("Checking producers starting");
72         createTask().subscribe(null, null, () -> logger.debug("Producer registration completed"));
73     }
74
75     public Mono<Object> createTask() {
76         return checkProducerRegistration() //
77                 .doOnError(t -> isRegisteredInEcs = false) //
78                 .onErrorResume(t -> registerTypesAndProducer());
79     }
80
81     public boolean isRegisteredInEcs() {
82         return this.isRegisteredInEcs;
83     }
84
85     private Mono<Object> checkProducerRegistration() {
86         final String url = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
87         return restClient.get(url) //
88                 .flatMap(this::checkRegistrationInfo) //
89         ;
90     }
91
92     private String registerTypeUrl(InfoType type) {
93         String url = applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId();
94         return url;
95     }
96
97     private Mono<String> registerTypesAndProducer() {
98         final String producerUrl =
99                 applicationConfig.getEcsBaseUrl() + "/data-producer/v1/info-producers/" + PRODUCER_ID;
100
101         return Flux.fromIterable(this.types.getAll()) //
102                 .doOnNext(type -> logger.info("Registering type {}", type.getId())) //
103                 .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo()))) //
104                 .collectList() //
105                 .flatMap(resp -> restClient.put(producerUrl, gson.toJson(producerRegistrationInfo()))) //
106                 .onErrorResume(t -> {
107                     logger.warn("Registration failed {}", t.getMessage());
108                     isRegisteredInEcs = false;
109                     return Mono.empty();
110                 }) //
111                 .doOnNext(x -> logger.debug("Registering types and producer completed"));
112     }
113
114     private Object typeSpecifcInfoObject() {
115         return jsonObject("{}");
116     }
117
118     private ProducerInfoTypeInfo typeRegistrationInfo() {
119         return new ProducerInfoTypeInfo(jsonSchemaObject(), typeSpecifcInfoObject());
120     }
121
122     private Object jsonSchemaObject() {
123         // An object with no properties
124         String schemaStr = "{" //
125                 + "\"type\": \"object\"," //
126                 + "\"properties\": {}," //
127                 + "\"additionalProperties\": false" //
128                 + "}"; //
129         return jsonObject(schemaStr);
130     }
131
132     private Object jsonObject(String json) {
133         try {
134             return JsonParser.parseString(json).getAsJsonObject();
135         } catch (Exception e) {
136             logger.error("Bug, error in JSON: {}", json);
137             throw new NullPointerException(e.toString());
138         }
139     }
140
141     private Mono<String> checkRegistrationInfo(String resp) {
142         ProducerRegistrationInfo info = gson.fromJson(resp, ProducerRegistrationInfo.class);
143         if (isEqual(producerRegistrationInfo(), info)) {
144             logger.debug("Already registered");
145             this.isRegisteredInEcs = true;
146             return Mono.empty();
147         } else {
148             return Mono.error(new ServiceException("Producer registration will be started"));
149         }
150     }
151
152     private boolean isEqual(ProducerRegistrationInfo a, ProducerRegistrationInfo b) {
153         return a.jobCallbackUrl.equals(b.jobCallbackUrl) //
154                 && a.producerSupervisionCallbackUrl.equals(b.producerSupervisionCallbackUrl) //
155                 && a.supportedTypeIds.size() == b.supportedTypeIds.size();
156     }
157
158     private ProducerRegistrationInfo producerRegistrationInfo() {
159
160         return ProducerRegistrationInfo.builder() //
161                 .jobCallbackUrl(baseUrl() + ProducerCallbacksController.JOB_URL) //
162                 .producerSupervisionCallbackUrl(baseUrl() + ProducerCallbacksController.SUPERVISION_URL) //
163                 .supportedTypeIds(types.typeIds()) //
164                 .build();
165     }
166
167     private String baseUrl() {
168         return this.applicationConfig.getSelfUrl();
169     }
170 }