2 * ============LICENSE_START=======================================================
4 * ================================================================================
5 * Copyright © 2021 AT&T Intellectual Property. All rights reserved.
6 * ================================================================================
7 * Licensed under the Apache License, Version 2.0 (the "License");
8 * you may not use this file except in compliance with the License.
9 * You may obtain a copy of the License at
10 * http://www.apache.org/licenses/LICENSE-2.0
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 * ============LICENSE_END=========================================================
20 package org.o.ran.oam.nf.oam.adopter.pm.rest.manager;
22 import io.reactivex.rxjava3.core.Observable;
23 import io.reactivex.rxjava3.core.Single;
24 import io.reactivex.rxjava3.schedulers.Schedulers;
25 import org.o.ran.oam.nf.oam.adopter.api.VesEventNotifier;
26 import org.o.ran.oam.nf.oam.adopter.pm.rest.manager.api.HttpRestClient;
27 import org.o.ran.oam.nf.oam.adopter.pm.rest.manager.mapper.PerformanceManagementFile2VesMapper;
28 import org.o.ran.oam.nf.oam.adopter.pm.rest.manager.pojos.Adapter;
29 import org.o.ran.oam.nf.oam.adopter.pm.rest.manager.properties.PerformanceManagementManagerProperties;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
33 public class PerformanceManagementRestAgentFactory {
34 private static final Logger LOG = LoggerFactory.getLogger(PerformanceManagementRestAgentFactory.class);
36 private final VesEventNotifier eventListener;
37 private final PerformanceManagementManagerProperties properties;
38 private final PerformanceManagementFile2VesMapper pmFileMapper;
39 private final HttpRestClient httpRestClient;
42 * Default constructor.
44 public PerformanceManagementRestAgentFactory(final VesEventNotifier eventListener,
45 final PerformanceManagementFile2VesMapper pmFileMapper,
46 final PerformanceManagementManagerProperties properties, final HttpRestClient httpRestClient) {
47 this.eventListener = eventListener;
48 this.pmFileMapper = pmFileMapper;
49 this.properties = properties;
50 this.httpRestClient = httpRestClient;
54 * Generates new PM Agent which will get pm files via rest at specific time each day and
55 * send it as CommonEventFormat302ONAP event via rest.
56 * @param adapter IP address fo the adapter, adapter login username, adapter login password
59 public final Single<PerformanceManagementRestAgent> createPerformanceManagementRestAgent(final Adapter adapter) {
60 return httpRestClient.getTimeZone(adapter).map(timeZone -> {
61 final PerformanceManagementAgentRunnable pmAgentRunnable =
62 new PerformanceManagementAgentRunnable(httpRestClient, eventListener, pmFileMapper, adapter);
63 return new PerformanceManagementRestAgent(pmAgentRunnable, properties.getSynchronizationTimeStart(),
64 properties.getSynchronizationTimeFrequency(), timeZone);
68 private static class PerformanceManagementAgentRunnable implements Runnable {
69 final HttpRestClient httpClient;
70 private final VesEventNotifier pmEventListener;
71 private final PerformanceManagementFile2VesMapper pmFileMapper;
72 private final Adapter adapter;
74 public PerformanceManagementAgentRunnable(final HttpRestClient httpClient,
75 final VesEventNotifier pmEventListener,
76 final PerformanceManagementFile2VesMapper pmFileMapper, final Adapter adapter) {
77 this.httpClient = httpClient;
78 this.pmEventListener = pmEventListener;
79 this.pmFileMapper = pmFileMapper;
80 this.adapter = adapter;
84 public synchronized void run() {
85 final String hostIp = adapter.getHostIpAddress();
86 httpClient.readFiles(adapter)
87 .flatMap(zip -> pmFileMapper.map(zip, hostIp))
88 .flatMapCompletable(events -> Observable.fromIterable(events)
89 .concatMapCompletable(pmEventListener::notifyEvents))
90 .doOnSubscribe(result -> LOG.info("PM VES notification forwarding for adapter {} started", hostIp))
91 .doOnComplete(() -> LOG.info("PM VES notification forwarding for adapter {} finished", hostIp))
92 .doOnError(error -> LOG.warn("PM VES notification forwarding for adapter {} failed", hostIp))
93 .subscribeOn(Schedulers.single())