eead114135b3a766d17786d857a91df200185031
[oam/nf-oam-adopter.git] / ves-nf-oam-adopter / ves-nf-oam-adopter-snmp-manager / src / main / java / org / o / ran / oam / nf / oam / adopter / snmp / manager / SnmpTrapListener.java
1 /*
2  *  ============LICENSE_START=======================================================
3  *  O-RAN-SC
4  *  ================================================================================
5  *  Copyright © 2021 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  */
19
20 package org.o.ran.oam.nf.oam.adopter.snmp.manager;
21
22 import com.google.gson.Gson;
23 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
24 import java.io.IOException;
25 import java.time.LocalDateTime;
26 import java.time.ZoneId;
27 import java.util.Optional;
28 import org.o.ran.oam.nf.oam.adopter.api.VesEventNotifier;
29 import org.o.ran.oam.nf.oam.adopter.snmp.manager.api.TimeZoneOffsetService;
30 import org.o.ran.oam.nf.oam.adopter.snmp.manager.mapper.SnmpMapper;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33 import org.snmp4j.CommandResponder;
34 import org.snmp4j.CommandResponderEvent;
35 import org.snmp4j.MessageDispatcher;
36 import org.snmp4j.MessageDispatcherImpl;
37 import org.snmp4j.PDU;
38 import org.snmp4j.Snmp;
39 import org.snmp4j.mp.MPv2c;
40 import org.snmp4j.smi.UdpAddress;
41 import org.snmp4j.transport.DefaultUdpTransportMapping;
42 import org.snmp4j.util.MultiThreadedMessageDispatcher;
43 import org.snmp4j.util.ThreadPool;
44
45 final class SnmpTrapListener implements CommandResponder, Runnable {
46
47     private static final Logger LOG = LoggerFactory.getLogger(SnmpTrapListener.class);
48     private static final int THREADS_SIZE = 2;
49     private final String hostPortAddress;
50     private final SnmpMapper mapper;
51     private final VesEventNotifier vesEventNotifier;
52     private final Gson gson = new Gson();
53     private final TimeZoneOffsetService timeZoneOffsetService;
54
55     public SnmpTrapListener(final String host, final Integer port, final SnmpMapper mapper,
56             final VesEventNotifier vesEventNotifier, final TimeZoneOffsetService timeZoneOffsetService) {
57         LOG.info("SnmpTrapListener listening on {}:{}", host, port);
58         this.hostPortAddress = host + "/" + port;
59         this.mapper = mapper;
60         this.vesEventNotifier = vesEventNotifier;
61         this.timeZoneOffsetService = timeZoneOffsetService;
62     }
63
64     @Override
65     public synchronized void run() {
66         try (final DefaultUdpTransportMapping snmpTarget = new DefaultUdpTransportMapping(
67                 new UdpAddress(hostPortAddress))) {
68             final ThreadPool threadPool = ThreadPool.create("SNMP_V2_Listener", THREADS_SIZE);
69             final MessageDispatcher dispatcher =
70                     new MultiThreadedMessageDispatcher(threadPool, new MessageDispatcherImpl());
71             dispatcher.addMessageProcessingModel(new MPv2c());
72             listenSnmp(dispatcher, snmpTarget);
73         } catch (final IOException e) {
74             LOG.error("Error occurred while listening to SNMP messages: {}", e.getMessage());
75         }
76     }
77
78     @SuppressFBWarnings("WA_NOT_IN_LOOP")
79     private synchronized void listenSnmp(final MessageDispatcher dispatcher,
80             final DefaultUdpTransportMapping snmpTarget) {
81         try (final Snmp snmp = new Snmp(dispatcher, snmpTarget)) {
82             snmp.addCommandResponder(this);
83             snmpTarget.listen();
84             LOG.debug("Listening on {}", snmpTarget);
85             wait();
86         } catch (final InterruptedException | IOException ex) {
87             Thread.currentThread().interrupt();
88         }
89     }
90
91     /**
92      * This method will be called whenever a pdu is received on the given port
93      * specified in the listen() method.
94      */
95     @Override
96     public synchronized void processPdu(final CommandResponderEvent cmdRespEvent) {
97         LOG.info("Received PDU");
98         final PDU pdu = cmdRespEvent.getPDU();
99         if (pdu == null) {
100             LOG.warn("Ignoring PDU.");
101             return;
102         }
103
104         final UdpAddress address = (UdpAddress) cmdRespEvent.getPeerAddress();
105         final ZoneId optZoneId = timeZoneOffsetService.getTimeZone(address.getInetAddress().getHostAddress());
106         final String timeZone = Optional.ofNullable(optZoneId)
107             .map(zoneId -> "UTC" + LocalDateTime.now().atZone(zoneId).getOffset().toString()).orElse(null);
108
109         mapper.toEvent(address, timeZone, pdu)
110                 .flatMapCompletable(vesEventNotifier::notifyEvents)
111                 .doOnSubscribe(result -> LOG.debug("SNMP Trap processing started"))
112                 .doOnComplete(() -> LOG.debug("SNMP Trap processed successfully"))
113                 .doOnError(error -> LOG.error("Failed to process SNMP Trap", error))
114                 .subscribe();
115     }
116 }