Fix Sonar complains
[oam/nf-oam-adopter.git] / ves-nf-oam-adopter / ves-nf-oam-adopter-event-notifier / src / main / java / org / o / ran / oam / nf / oam / adopter / event / notifier / NotificationProvider.java
1 /*
2  *  ============LICENSE_START=======================================================
3  *  O-RAN-SC
4  *  ================================================================================
5  *  Copyright © 2021 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  */
19
20 package org.o.ran.oam.nf.oam.adopter.event.notifier;
21
22 import static java.util.Objects.requireNonNull;
23
24 import com.google.gson.Gson;
25 import io.reactivex.rxjava3.core.Completable;
26 import io.reactivex.rxjava3.core.Single;
27 import java.util.List;
28 import org.apache.hc.client5.http.async.methods.SimpleHttpRequests;
29 import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
30 import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
31 import org.apache.hc.core5.http.ContentType;
32 import org.apache.hc.core5.http.HttpHeaders;
33 import org.apache.hc.core5.http.HttpStatus;
34 import org.apache.hc.core5.http.message.StatusLine;
35 import org.o.ran.oam.nf.oam.adopter.api.CommonEventFormat302ONAP;
36 import org.o.ran.oam.nf.oam.adopter.api.Event;
37 import org.o.ran.oam.nf.oam.adopter.api.VesEventNotifier;
38 import org.o.ran.oam.nf.oam.adopter.event.notifier.properties.VesCollectorProperties;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41 import org.springframework.beans.factory.annotation.Autowired;
42 import org.springframework.stereotype.Service;
43
44 @Service
45 public final class NotificationProvider implements VesEventNotifier {
46
47     private static final Logger LOG = LoggerFactory.getLogger(NotificationProvider.class);
48
49     private static final String EVENT_BATCH = "/eventBatch";
50     private static final Gson GSON = new Gson();
51     private final CloseableHttpAsyncClient httpClient;
52     private final String vesUrl;
53     private final String authHeader;
54
55     /**
56      * Default constructor.
57      */
58     @Autowired
59     public NotificationProvider(final VesCollectorProperties vesCollectorProperties,
60             final CloseableHttpAsyncClient httpClient) {
61         this.vesUrl = requireNonNull(vesCollectorProperties.getUrl(), "VES Url must not be null");
62         this.authHeader = "Basic " + requireNonNull(vesCollectorProperties.getVesEncodedAuth(),
63                 "VES Authentication must not be null");
64         this.httpClient = httpClient;
65         LOG.debug("VES Collector {}", vesUrl);
66     }
67
68     @Override
69     public Completable notifyEvents(final CommonEventFormat302ONAP event) {
70         LOG.debug("Sending VES Messages");
71         final String payload = GSON.toJson(event, CommonEventFormat302ONAP.class);
72         final List<Event> eventsList = event.getEventList();
73         if (eventsList != null && !eventsList.isEmpty()) {
74             return notify(vesUrl + EVENT_BATCH, payload);
75         }
76         return notify(vesUrl, payload);
77     }
78
79     private Completable notify(final String url, final String payload) {
80         final var request = SimpleHttpRequests.post(url);
81         request.setBody(payload, ContentType.APPLICATION_JSON);
82         request.setHeader(HttpHeaders.AUTHORIZATION, authHeader);
83         return Single.fromFuture(httpClient.execute(request, null))
84                        .flatMapCompletable(NotificationProvider::validatePost)
85                        .doOnSubscribe(result -> LOG.trace("Request sent {} / payload {}", request, payload))
86                        .doOnComplete(() -> LOG.debug("Request finished {}", request))
87                        .doOnError(error -> LOG.warn("Request failed {}", request, error));
88     }
89
90     private static Completable validatePost(final SimpleHttpResponse response) {
91         final var statusLine = new StatusLine(response).toString();
92         final String entity;
93         final int code = response.getCode();
94         if (code == HttpStatus.SC_OK || code == HttpStatus.SC_ACCEPTED) {
95             return Completable.complete();
96         }
97         entity = response.getBody().getBodyText();
98         return Completable.error(new Exception("Failed to post: " + statusLine + " " + entity));
99     }
100 }