2 * ========================LICENSE_START=================================
5 * Copyright (C) 2020 Nordix Foundation
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
11 * http://www.apache.org/licenses/LICENSE-2.0
13 * Unless required by applicable law or agreed to in writing, software
14 * distributed under the License is distributed on an "AS IS" BASIS,
15 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16 * See the License for the specific language governing permissions and
17 * limitations under the License.
18 * ========================LICENSE_END===================================
21 package org.oransc.enrichment.clients;
23 import com.google.gson.Gson;
24 import com.google.gson.GsonBuilder;
26 import java.lang.invoke.MethodHandles;
28 import org.oransc.enrichment.configuration.ApplicationConfig;
29 import org.oransc.enrichment.configuration.ImmutableWebClientConfig;
30 import org.oransc.enrichment.configuration.WebClientConfig;
31 import org.oransc.enrichment.repository.EiJob;
32 import org.oransc.enrichment.repository.EiProducer;
33 import org.slf4j.Logger;
34 import org.slf4j.LoggerFactory;
35 import org.springframework.beans.factory.annotation.Autowired;
37 import reactor.core.publisher.Flux;
38 import reactor.core.publisher.Mono;
41 * Callbacks to the EiProducer
43 @SuppressWarnings("java:S3457") // No need to call "toString()" method as formatting and string ..
44 public class ProducerCallbacks {
46 private static final Logger logger = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
47 private static Gson gson = new GsonBuilder() //
52 ApplicationConfig applicationConfig;
54 public void notifyProducersJobDeleted(EiJob eiJob) {
55 AsyncRestClient restClient = restClient(false);
56 ProducerJobInfo request = new ProducerJobInfo(eiJob);
57 String body = gson.toJson(request);
58 for (EiProducer producer : eiJob.type().getProducers()) {
59 restClient.post(producer.jobDeletionCallbackUrl(), body) //
60 .subscribe(notUsed -> logger.debug("Job subscription started OK {}", producer.id()), //
61 throwable -> logger.warn("Job subscription failed {}", producer.id(), throwable.toString()), null);
65 public Mono<Integer> notifyProducersJobStarted(EiJob eiJob) {
66 return Flux.fromIterable(eiJob.type().getProducers()) //
67 .flatMap(eiProducer -> notifyProducerJobStarted(eiProducer, eiJob)) //
69 .flatMap(okResponses -> Mono.just(Integer.valueOf(okResponses.size()))); //
72 public Mono<String> notifyProducerJobStarted(EiProducer producer, EiJob eiJob) {
73 AsyncRestClient restClient = restClient(false);
74 ProducerJobInfo request = new ProducerJobInfo(eiJob);
75 String body = gson.toJson(request);
77 return restClient.post(producer.jobCreationCallbackUrl(), body)
78 .doOnNext(resp -> logger.debug("Job subscription started OK {}", producer.id()))
79 .onErrorResume(throwable -> {
80 logger.warn("Job subscription failed {}", producer.id(), throwable.toString());
85 private AsyncRestClient restClient(boolean useTrustValidation) {
86 WebClientConfig config = this.applicationConfig.getWebClientConfig();
87 config = ImmutableWebClientConfig.builder() //
88 .keyStoreType(config.keyStoreType()) //
89 .keyStorePassword(config.keyStorePassword()) //
90 .keyStore(config.keyStore()) //
91 .keyPassword(config.keyPassword()) //
92 .isTrustStoreUsed(useTrustValidation) //
93 .trustStore(config.trustStore()) //
94 .trustStorePassword(config.trustStorePassword()) //
97 return new AsyncRestClient("", config);