2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2021 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
20 package org.o.ran.oam.nf.oam.adopter.event.notifier;
22 import static java.util.Objects.requireNonNull;
24 import com.google.gson.Gson;
25 import io.reactivex.rxjava3.core.Completable;
26 import io.reactivex.rxjava3.core.Single;
27 import java.util.List;
28 import org.apache.hc.client5.http.async.methods.SimpleHttpRequests;
29 import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
30 import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
31 import org.apache.hc.core5.http.ContentType;
32 import org.apache.hc.core5.http.HttpHeaders;
33 import org.apache.hc.core5.http.HttpStatus;
34 import org.apache.hc.core5.http.message.StatusLine;
35 import org.o.ran.oam.nf.oam.adopter.api.CommonEventFormat302ONAP;
36 import org.o.ran.oam.nf.oam.adopter.api.Event;
37 import org.o.ran.oam.nf.oam.adopter.api.VesEventNotifier;
38 import org.o.ran.oam.nf.oam.adopter.event.notifier.properties.VesCollectorProperties;
39 import org.slf4j.Logger;
40 import org.slf4j.LoggerFactory;
41 import org.springframework.beans.factory.annotation.Autowired;
42 import org.springframework.stereotype.Service;
45 public final class NotificationProvider implements VesEventNotifier {
47 private static final Logger LOG = LoggerFactory.getLogger(NotificationProvider.class);
49 private static final String EVENT_BATCH = "/eventBatch";
50 private static final Gson GSON = new Gson();
51 private final CloseableHttpAsyncClient httpClient;
52 private final String vesUrl;
53 private final String authHeader;
56 * Default constructor.
59 public NotificationProvider(final VesCollectorProperties vesCollectorProperties,
60 final CloseableHttpAsyncClient httpClient) {
61 this.vesUrl = requireNonNull(vesCollectorProperties.getUrl(), "VES Url must not be null");
62 this.authHeader = "Basic " + requireNonNull(vesCollectorProperties.getVesEncodedAuth(),
63 "VES Authentication must not be null");
64 this.httpClient = httpClient;
65 LOG.debug("VES Collector {}", vesUrl);
69 public Completable notifyEvents(final CommonEventFormat302ONAP event) {
70 LOG.debug("Sending VES Messages");
71 final String payload = GSON.toJson(event, CommonEventFormat302ONAP.class);
72 final List<Event> eventsList = event.getEventList();
73 if (eventsList != null && !eventsList.isEmpty()) {
74 return notify(vesUrl + EVENT_BATCH, payload);
76 return notify(vesUrl, payload);
79 private Completable notify(final String url, final String payload) {
80 final var request = SimpleHttpRequests.post(url);
81 request.setBody(payload, ContentType.APPLICATION_JSON);
82 request.setHeader(HttpHeaders.AUTHORIZATION, authHeader);
83 return Single.fromFuture(httpClient.execute(request, null))
84 .flatMapCompletable(NotificationProvider::validatePost)
85 .doOnSubscribe(result -> LOG.trace("Request sent {} / payload {}", request, payload))
86 .doOnComplete(() -> LOG.debug("Request finished {}", request))
87 .doOnError(error -> LOG.warn("Request failed {}", request, error));
90 private static Completable validatePost(final SimpleHttpResponse response) {
91 final var statusLine = new StatusLine(response).toString();
93 final int code = response.getCode();
94 if (code == HttpStatus.SC_OK || code == HttpStatus.SC_ACCEPTED) {
95 return Completable.complete();
97 entity = response.getBody().getBodyText();
98 return Completable.error(new Exception("Failed to post: " + statusLine + " " + entity));