2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2021 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
20 package org.o.ran.oam.nf.oam.adopter.event.notifier;
22 import static java.util.Objects.requireNonNull;
24 import com.google.gson.Gson;
25 import io.reactivex.rxjava3.core.Completable;
26 import io.reactivex.rxjava3.core.Single;
27 import java.util.List;
28 import org.apache.hc.client5.http.async.methods.SimpleHttpRequest;
29 import org.apache.hc.client5.http.async.methods.SimpleHttpRequests;
30 import org.apache.hc.client5.http.async.methods.SimpleHttpResponse;
31 import org.apache.hc.client5.http.impl.async.CloseableHttpAsyncClient;
32 import org.apache.hc.core5.http.ContentType;
33 import org.apache.hc.core5.http.HttpHeaders;
34 import org.apache.hc.core5.http.HttpStatus;
35 import org.apache.hc.core5.http.message.StatusLine;
36 import org.o.ran.oam.nf.oam.adopter.api.CommonEventFormat302ONAP;
37 import org.o.ran.oam.nf.oam.adopter.api.Event;
38 import org.o.ran.oam.nf.oam.adopter.api.VesEventNotifier;
39 import org.o.ran.oam.nf.oam.adopter.event.notifier.properties.VesCollectorProperties;
40 import org.slf4j.Logger;
41 import org.slf4j.LoggerFactory;
42 import org.springframework.beans.factory.annotation.Autowired;
43 import org.springframework.stereotype.Service;
46 public final class NotificationProvider implements VesEventNotifier {
48 private static final Logger LOG = LoggerFactory.getLogger(NotificationProvider.class);
50 private static final String EVENT_BATCH = "/eventBatch";
51 private static final Gson GSON = new Gson();
52 private final CloseableHttpAsyncClient httpClient;
53 private final String vesUrl;
54 private final String authHeader;
57 * Default constructor.
60 public NotificationProvider(final VesCollectorProperties vesCollectorProperties,
61 final CloseableHttpAsyncClient httpClient) {
62 this.vesUrl = requireNonNull(vesCollectorProperties.getUrl(), "VES Url must not be null");
63 this.authHeader = "Basic " + requireNonNull(vesCollectorProperties.getVesEncodedAuth(),
64 "VES Authentication must not be null");
65 this.httpClient = httpClient;
66 LOG.debug("VES Collector {}", vesUrl);
70 public Completable notifyEvents(final CommonEventFormat302ONAP event) {
71 LOG.debug("Sending VES Messages");
72 final String payload = GSON.toJson(event, CommonEventFormat302ONAP.class);
73 final List<Event> eventsList = event.getEventList();
74 if (eventsList != null && !eventsList.isEmpty()) {
75 return notify(vesUrl + EVENT_BATCH, payload);
77 return notify(vesUrl, payload);
80 private Completable notify(final String url, final String payload) {
81 final SimpleHttpRequest request = SimpleHttpRequests.post(url);
82 request.setBody(payload, ContentType.APPLICATION_JSON);
83 request.setHeader(HttpHeaders.AUTHORIZATION, authHeader);
84 return Single.fromFuture(httpClient.execute(request, null))
85 .flatMapCompletable(NotificationProvider::validatePost)
86 .doOnSubscribe(result -> LOG.trace("Request sent {} / payload {}", request, payload))
87 .doOnComplete(() -> LOG.debug("Request finished {}", request))
88 .doOnError(error -> LOG.warn("Request failed {}", request, error));
91 private static Completable validatePost(final SimpleHttpResponse response) {
92 final String statusLine = new StatusLine(response).toString();
94 final int code = response.getCode();
95 if (code == HttpStatus.SC_OK || code == HttpStatus.SC_ACCEPTED) {
96 return Completable.complete();
98 entity = response.getBody().getBodyText();
99 return Completable.error(new Exception("Failed to post: " + statusLine + " " + entity));