Merge "Added support for using oauth token for Kafka"
[nonrtric/plt/ranpm.git] / pmproducer / src / main / java / org / oran / pmproducer / tasks / ProducerRegstrationTask.java
1 /*-
2  * ========================LICENSE_START=================================
3  * O-RAN-SC
4  * %%
5  * Copyright (C) 2023 Nordix Foundation
6  * %%
7  * Licensed under the Apache License, Version 2.0 (the "License");
8  * you may not use this file except in compliance with the License.
9  * You may obtain a copy of the License at
10  *
11  *      http://www.apache.org/licenses/LICENSE-2.0
12  *
13  * Unless required by applicable law or agreed to in writing, software
14  * distributed under the License is distributed on an "AS IS" BASIS,
15  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16  * See the License for the specific language governing permissions and
17  * limitations under the License.
18  * ========================LICENSE_END===================================
19  */
20
21 package org.oran.pmproducer.tasks;
22
23 import com.google.common.io.CharStreams;
24 import com.google.gson.JsonParser;
25
26 import java.io.IOException;
27 import java.io.InputStream;
28 import java.io.InputStreamReader;
29 import java.nio.charset.StandardCharsets;
30
31 import lombok.Getter;
32
33 import org.oran.pmproducer.clients.AsyncRestClient;
34 import org.oran.pmproducer.clients.AsyncRestClientFactory;
35 import org.oran.pmproducer.configuration.ApplicationConfig;
36 import org.oran.pmproducer.controllers.ProducerCallbacksController;
37 import org.oran.pmproducer.exceptions.ServiceException;
38 import org.oran.pmproducer.oauth2.SecurityContext;
39 import org.oran.pmproducer.r1.ConsumerJobInfo;
40 import org.oran.pmproducer.r1.ProducerInfoTypeInfo;
41 import org.oran.pmproducer.r1.ProducerRegistrationInfo;
42 import org.oran.pmproducer.repository.InfoType;
43 import org.oran.pmproducer.repository.InfoTypes;
44 import org.slf4j.Logger;
45 import org.slf4j.LoggerFactory;
46 import org.springframework.beans.factory.annotation.Autowired;
47 import org.springframework.http.HttpStatus;
48 import org.springframework.scheduling.annotation.EnableScheduling;
49 import org.springframework.scheduling.annotation.Scheduled;
50 import org.springframework.stereotype.Component;
51
52 import reactor.core.publisher.Flux;
53 import reactor.core.publisher.Mono;
54
55 /**
56  * Registers the types and this producer in Innformation Coordinator Service.
57  * This is done when needed.
58  */
59 @Component
60 @EnableScheduling
61 @SuppressWarnings("squid:S2629") // Invoke method(s) only conditionally
62 public class ProducerRegstrationTask {
63
64     private static final Logger logger = LoggerFactory.getLogger(ProducerRegstrationTask.class);
65     private final AsyncRestClient restClient;
66     private final ApplicationConfig applicationConfig;
67     private final InfoTypes types;
68     private static com.google.gson.Gson gson = new com.google.gson.GsonBuilder().disableHtmlEscaping().create();
69
70     @Getter
71     private boolean isRegisteredInIcs = false;
72     private static final int REGISTRATION_SUPERVISION_INTERVAL_MS = 1000 * 10;
73
74     public ProducerRegstrationTask(@Autowired ApplicationConfig applicationConfig, @Autowired InfoTypes types,
75             @Autowired SecurityContext securityContext) {
76         AsyncRestClientFactory restClientFactory =
77                 new AsyncRestClientFactory(applicationConfig.getWebClientConfig(), securityContext);
78         this.restClient = restClientFactory.createRestClientNoHttpProxy("");
79         this.applicationConfig = applicationConfig;
80         this.types = types;
81     }
82
83     @Scheduled(fixedRate = REGISTRATION_SUPERVISION_INTERVAL_MS)
84     public void runSupervisionTask() {
85         if (this.isRegisteredInIcs) {
86             return;
87         }
88         registerTypesAndProducer().subscribe( //
89                 null, //
90                 this::handleRegistrationFailure//
91         );
92     }
93
94     private void handleRegistrationCompleted() {
95         isRegisteredInIcs = true;
96     }
97
98     private void handleRegistrationFailure(Throwable t) {
99         logger.warn("Registration of producer failed {}", t.getMessage());
100     }
101
102     private String producerRegistrationUrl() {
103         final String producerId = this.applicationConfig.getSelfUrl().replace("/", "_");
104         return applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-producers/" + producerId;
105     }
106
107     private String registerTypeUrl(InfoType type) {
108         return applicationConfig.getIcsBaseUrl() + "/data-producer/v1/info-types/" + type.getId();
109     }
110
111     public Mono<String> registerTypesAndProducer() {
112         final int CONCURRENCY = 1;
113
114         return Flux.fromIterable(this.types.getAll()) //
115                 .doOnNext(type -> logger.info("Registering type {}", type.getId())) //
116                 .flatMap(this::createInputDataJob, CONCURRENCY)
117                 .flatMap(type -> restClient.put(registerTypeUrl(type), gson.toJson(typeRegistrationInfo(type))),
118                         CONCURRENCY) //
119                 .collectList() //
120                 .doOnNext(type -> logger.info("Registering producer")) //
121                 .flatMap(resp -> restClient.put(producerRegistrationUrl(), gson.toJson(producerRegistrationInfo())))
122                 .doOnNext(n -> handleRegistrationCompleted());
123     }
124
125     private Mono<InfoType> createInputDataJob(InfoType type) {
126         if (type.getInputJobType() == null) {
127             return Mono.just(type);
128         }
129
130         ConsumerJobInfo info =
131                 new ConsumerJobInfo(type.getInputJobType(), type.getInputJobDefinition(), "pmproducer", "");
132
133         final String JOB_ID = type.getId() + "_5b3f4db6-3d9e-11ed-b878-0242ac120002";
134         String body = gson.toJson(info);
135
136         return restClient.put(consumerJobUrl(JOB_ID), body)
137                 .doOnError(t -> logger.error("Could not create job of type {}, reason: {}", type.getInputJobType(),
138                         t.getMessage()))
139                 .doOnNext(n -> logger.info("Created input job: {}, type: {}", JOB_ID, type.getInputJobType())) //
140                 .map(x -> type);
141     }
142
143     private String consumerJobUrl(String jobId) {
144         return applicationConfig.getIcsBaseUrl() + "/data-consumer/v1/info-jobs/" + jobId;
145     }
146
147     private Object typeSpecifcInfoObject() {
148         return jsonObject("{}");
149     }
150
151     private ProducerInfoTypeInfo typeRegistrationInfo(InfoType type) {
152         try {
153             return new ProducerInfoTypeInfo(jsonSchemaObject(type), typeSpecifcInfoObject());
154         } catch (Exception e) {
155             logger.error("Fatal error {}", e.getMessage());
156             return null;
157         }
158     }
159
160     private Object jsonSchemaObject(InfoType type) throws IOException, ServiceException {
161         final String schemaFile = "/typeSchemaPmData.json";
162         return jsonObject(readSchemaFile(schemaFile));
163     }
164
165     private String readSchemaFile(String filePath) throws IOException, ServiceException {
166         InputStream in = getClass().getResourceAsStream(filePath);
167         logger.debug("Reading application schema file from: {} with: {}", filePath, in);
168         if (in == null) {
169             throw new ServiceException("Could not readfile: " + filePath, HttpStatus.INTERNAL_SERVER_ERROR);
170         }
171         return CharStreams.toString(new InputStreamReader(in, StandardCharsets.UTF_8));
172     }
173
174     @SuppressWarnings("java:S2139") // Log exception
175     private Object jsonObject(String json) {
176         try {
177             return JsonParser.parseString(json).getAsJsonObject();
178         } catch (Exception e) {
179             logger.error("Bug, error in JSON: {} {}", json, e.getMessage());
180             throw new NullPointerException(e.getMessage());
181         }
182     }
183
184     private boolean isEqual(ProducerRegistrationInfo a, ProducerRegistrationInfo b) {
185         return a.jobCallbackUrl.equals(b.jobCallbackUrl) //
186                 && a.producerSupervisionCallbackUrl.equals(b.producerSupervisionCallbackUrl) //
187                 && a.supportedTypeIds.size() == b.supportedTypeIds.size();
188     }
189
190     private ProducerRegistrationInfo producerRegistrationInfo() {
191         return ProducerRegistrationInfo.builder() //
192                 .jobCallbackUrl(baseUrl() + ProducerCallbacksController.JOB_URL) //
193                 .producerSupervisionCallbackUrl(baseUrl() + ProducerCallbacksController.SUPERVISION_URL) //
194                 .supportedTypeIds(this.types.typeIds()) //
195                 .build();
196     }
197
198     private String baseUrl() {
199         return this.applicationConfig.getSelfUrl();
200     }
201 }