2814706a86ab4d2af6d913ff732e8843f5a07402
[oam/nf-oam-adopter.git] / ves-nf-oam-adopter / ves-nf-oam-adopter-event-notifier / src / main / java / org / o / ran / oam / nf / oam / adopter / event / notifier / NotificationProvider.java
1 /*
2  *  ============LICENSE_START=======================================================
3  *  O-RAN-SC
4  *  ================================================================================
5  *  Copyright © 2021 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  */
19
20 package org.o.ran.oam.nf.oam.adopter.event.notifier;
21
22 import static java.util.Objects.requireNonNull;
23
24 import com.google.gson.Gson;
25 import io.reactivex.rxjava3.core.Completable;
26 import io.reactivex.rxjava3.core.Single;
27 import java.util.List;
28 import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
29 import org.apache.hc.client5.http.async.methods.SimpleHttpRequests;
30 import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
31 import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
32 import org.apache.hc.core5.http.ContentType;
33 import org.apache.hc.core5.http.HttpHeaders;
34 import org.apache.hc.core5.http.HttpStatus;
35 import org.apache.hc.core5.http.message.StatusLine;
36 import org.o.ran.oam.nf.oam.adopter.api.CommonEventFormat302ONAP;
37 import org.o.ran.oam.nf.oam.adopter.api.Event;
38 import org.o.ran.oam.nf.oam.adopter.api.VesEventNotifier;
39 import org.o.ran.oam.nf.oam.adopter.event.notifier.properties.VesCollectorProperties;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42 import org.springframework.beans.factory.annotation.Autowired;
43 import org.springframework.stereotype.Service;
44
45 @Service
46 public final class NotificationProvider implements VesEventNotifier {
47
48     private static final Logger LOG = LoggerFactory.getLogger(NotificationProvider.class);
49
50     private static final String EVENT_BATCH = "/eventBatch";
51     private static final Gson GSON = new Gson();
52     private final CloseableHttpAsyncClient httpClient;
53     private final String vesUrl;
54     private final String authHeader;
55
56     /**
57      * Default constructor.
58      */
59     @Autowired
60     public NotificationProvider(final VesCollectorProperties vesCollectorProperties,
61             final CloseableHttpAsyncClient httpClient) {
62         this.vesUrl = requireNonNull(vesCollectorProperties.getUrl(), "VES Url must not be null");
63         this.authHeader = "Basic " + requireNonNull(vesCollectorProperties.getVesEncodedAuth(),
64                 "VES Authentication must not be null");
65         this.httpClient = httpClient;
66         LOG.debug("VES Collector {}", vesUrl);
67     }
68
69     @Override
70     public Completable notifyEvents(final CommonEventFormat302ONAP event) {
71         LOG.debug("Sending VES Messages");
72         final String payload = GSON.toJson(event, CommonEventFormat302ONAP.class);
73         final List<Event> eventsList = event.getEventList();
74         if (eventsList != null && !eventsList.isEmpty()) {
75             return notify(vesUrl + EVENT_BATCH, payload);
76         }
77         return notify(vesUrl, payload);
78     }
79
80     private Completable notify(final String url, final String payload) {
81         final SimpleHttpRequest request = SimpleHttpRequests.post(url);
82         request.setBody(payload, ContentType.APPLICATION_JSON);
83         request.setHeader(HttpHeaders.AUTHORIZATION, authHeader);
84         return Single.fromFuture(httpClient.execute(request, null))
85                        .flatMapCompletable(NotificationProvider::validatePost)
86                        .doOnSubscribe(result -> LOG.trace("Request sent {} / payload {}", request, payload))
87                        .doOnComplete(() -> LOG.debug("Request finished {}", request))
88                        .doOnError(error -> LOG.warn("Request failed {}", request, error));
89     }
90
91     private static Completable validatePost(final SimpleHttpResponse response) {
92         final String statusLine = new StatusLine(response).toString();
93         final String entity;
94         final int code = response.getCode();
95         if (code == HttpStatus.SC_OK || code == HttpStatus.SC_ACCEPTED) {
96             return Completable.complete();
97         }
98         entity = response.getBody().getBodyText();
99         return Completable.error(new Exception("Failed to post: " + statusLine + " " + entity));
100     }
101 }