OAM NF Adopter REST PM
[oam/nf-oam-adopter.git] / ves-nf-oam-adopter / ves-nf-oam-adopter-pm-manager / src / main / java / org / o / ran / oam / nf / oam / adopter / pm / rest / manager / PerformanceManagementRestAgentFactory.java
1 /*
2  *  ============LICENSE_START=======================================================
3  *  O-RAN-SC
4  *  ================================================================================
5  *  Copyright © 2021 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  */
19
20 package org.o.ran.oam.nf.oam.adopter.pm.rest.manager;
21
22 import io.reactivex.rxjava3.core.Observable;
23 import io.reactivex.rxjava3.core.Single;
24 import io.reactivex.rxjava3.schedulers.Schedulers;
25 import org.o.ran.oam.nf.oam.adopter.api.VesEventNotifier;
26 import org.o.ran.oam.nf.oam.adopter.pm.rest.manager.api.HttpRestClient;
27 import org.o.ran.oam.nf.oam.adopter.pm.rest.manager.mapper.PerformanceManagementFile2VesMapper;
28 import org.o.ran.oam.nf.oam.adopter.pm.rest.manager.pojos.Adapter;
29 import org.o.ran.oam.nf.oam.adopter.pm.rest.manager.properties.PerformanceManagementManagerProperties;
30 import org.slf4j.Logger;
31 import org.slf4j.LoggerFactory;
32
33 public class PerformanceManagementRestAgentFactory {
34     private static final Logger LOG = LoggerFactory.getLogger(PerformanceManagementRestAgentFactory.class);
35
36     private final VesEventNotifier eventListener;
37     private final PerformanceManagementManagerProperties properties;
38     private final PerformanceManagementFile2VesMapper pmFileMapper;
39     private final HttpRestClient httpRestClient;
40
41     /**
42      * Default constructor.
43      */
44     public PerformanceManagementRestAgentFactory(final VesEventNotifier eventListener,
45             final PerformanceManagementFile2VesMapper pmFileMapper,
46             final PerformanceManagementManagerProperties properties, final HttpRestClient httpRestClient) {
47         this.eventListener = eventListener;
48         this.pmFileMapper = pmFileMapper;
49         this.properties = properties;
50         this.httpRestClient = httpRestClient;
51     }
52
53     /**
54      * Generates new PM Agent which will get pm files via rest at specific time each day and
55      * send it as CommonEventFormat302ONAP event via rest.
56      * @param adapter IP address fo the adapter, adapter login username, adapter login password
57      * @return PMRestAgent
58      */
59     public final Single<PerformanceManagementRestAgent> createPerformanceManagementRestAgent(final Adapter adapter) {
60         return httpRestClient.getTimeZone(adapter).map(timeZone -> {
61             final PerformanceManagementAgentRunnable pmAgentRunnable =
62                     new PerformanceManagementAgentRunnable(httpRestClient, eventListener, pmFileMapper, adapter);
63             return new PerformanceManagementRestAgent(pmAgentRunnable, properties.getSynchronizationTimeStart(),
64                     properties.getSynchronizationTimeFrequency(), timeZone);
65         });
66     }
67
68     private static class PerformanceManagementAgentRunnable implements Runnable {
69         final HttpRestClient httpClient;
70         private final VesEventNotifier pmEventListener;
71         private final PerformanceManagementFile2VesMapper pmFileMapper;
72         private final Adapter adapter;
73
74         public PerformanceManagementAgentRunnable(final HttpRestClient httpClient,
75                 final VesEventNotifier pmEventListener,
76                 final PerformanceManagementFile2VesMapper pmFileMapper, final Adapter adapter) {
77             this.httpClient = httpClient;
78             this.pmEventListener = pmEventListener;
79             this.pmFileMapper = pmFileMapper;
80             this.adapter = adapter;
81         }
82
83         @Override
84         public synchronized void run() {
85             final String hostIp = adapter.getHostIpAddress();
86             httpClient.readFiles(adapter)
87                     .flatMap(zip -> pmFileMapper.map(zip, hostIp))
88                     .flatMapCompletable(events -> Observable.fromIterable(events)
89                             .concatMapCompletable(pmEventListener::notifyEvents))
90                     .doOnSubscribe(result -> LOG.info("PM VES notification forwarding for adapter {} started", hostIp))
91                     .doOnComplete(() -> LOG.info("PM VES notification forwarding for adapter {} finished", hostIp))
92                     .doOnError(error -> LOG.warn("PM VES notification forwarding for adapter {} failed", hostIp))
93                     .subscribeOn(Schedulers.single())
94                     .subscribe();
95         }
96     }
97 }