09c7ff31713ab3c8d01cb60a28e142a1e3f5e39e
[oam/nf-oam-adopter.git] / ves-nf-oam-adopter / ves-nf-oam-adopter-snmp-manager / src / main / java / org / o / ran / oam / nf / oam / adopter / snmp / manager / SnmpTrapListener.java
1 /*
2  *  ============LICENSE_START=======================================================
3  *  O-RAN-SC
4  *  ================================================================================
5  *  Copyright © 2021 AT&T Intellectual Property. All rights reserved.
6  *  ================================================================================
7  *  Licensed under the Apache License, Version 2.0 (the "License");
8  *  you may not use this file except in compliance with the License.
9  *  You may obtain a copy of the License at
10  *        http://www.apache.org/licenses/LICENSE-2.0
11  *
12  *  Unless required by applicable law or agreed to in writing, software
13  *  distributed under the License is distributed on an "AS IS" BASIS,
14  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  *  See the License for the specific language governing permissions and
16  *  limitations under the License.
17  *  ============LICENSE_END=========================================================
18  */
19
20 package org.o.ran.oam.nf.oam.adopter.snmp.manager;
21
22 import com.google.gson.Gson;
23 import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
24 import java.io.IOException;
25 import java.time.LocalDateTime;
26 import java.time.ZoneId;
27 import java.util.Optional;
28 import org.o.ran.oam.nf.oam.adopter.api.VesEventNotifier;
29 import org.o.ran.oam.nf.oam.adopter.snmp.manager.api.TimeZoneOffsetService;
30 import org.o.ran.oam.nf.oam.adopter.snmp.manager.mapper.SnmpMapper;
31 import org.slf4j.Logger;
32 import org.slf4j.LoggerFactory;
33 import org.snmp4j.CommandResponder;
34 import org.snmp4j.CommandResponderEvent;
35 import org.snmp4j.MessageDispatcher;
36 import org.snmp4j.MessageDispatcherImpl;
37 import org.snmp4j.PDU;
38 import org.snmp4j.Snmp;
39 import org.snmp4j.mp.MPv2c;
40 import org.snmp4j.smi.UdpAddress;
41 import org.snmp4j.transport.DefaultUdpTransportMapping;
42 import org.snmp4j.util.MultiThreadedMessageDispatcher;
43 import org.snmp4j.util.ThreadPool;
44
45 final class SnmpTrapListener implements CommandResponder, Runnable {
46
47     private static final Logger LOG = LoggerFactory.getLogger(SnmpTrapListener.class);
48     private static final int THREADS_SIZE = 2;
49     private final String hostPortAddress;
50     private final SnmpMapper mapper;
51     private final VesEventNotifier vesEventNotifier;
52     private final Gson gson = new Gson();
53     private final TimeZoneOffsetService timeZoneOffsetService;
54
55     public SnmpTrapListener(final String host, final Integer port, final SnmpMapper mapper,
56             final VesEventNotifier vesEventNotifier, final TimeZoneOffsetService timeZoneOffsetService) {
57         LOG.info("SnmpTrapListener listening on {}:{}", host, port);
58         this.hostPortAddress = host + "/" + port;
59         this.mapper = mapper;
60         this.vesEventNotifier = vesEventNotifier;
61         this.timeZoneOffsetService = timeZoneOffsetService;
62     }
63
64     @Override
65     public synchronized void run() {
66         try (final DefaultUdpTransportMapping snmpTarget = new DefaultUdpTransportMapping(
67                 new UdpAddress(hostPortAddress))) {
68             final ThreadPool threadPool = ThreadPool.create("SNMP_V2_Listener", THREADS_SIZE);
69             final MessageDispatcher dispatcher =
70                     new MultiThreadedMessageDispatcher(threadPool, new MessageDispatcherImpl());
71             dispatcher.addMessageProcessingModel(new MPv2c());
72             listenSnmp(dispatcher, snmpTarget);
73         } catch (final IOException e) {
74             LOG.error("Error occurred while listening to SNMP messages: {}", e.getMessage());
75         }
76     }
77
78     @SuppressFBWarnings("WA_NOT_IN_LOOP")
79     private void listenSnmp(final MessageDispatcher dispatcher, final DefaultUdpTransportMapping snmpTarget) {
80         try (final Snmp snmp = new Snmp(dispatcher, snmpTarget)) {
81             snmp.addCommandResponder(this);
82             snmpTarget.listen();
83             LOG.debug("Listening on {}", snmpTarget);
84             wait();
85         } catch (final InterruptedException | IOException ex) {
86             Thread.currentThread().interrupt();
87         }
88     }
89
90     /**
91      * This method will be called whenever a pdu is received on the given port
92      * specified in the listen() method.
93      */
94     @Override
95     public synchronized void processPdu(final CommandResponderEvent cmdRespEvent) {
96         LOG.info("Received PDU");
97         final PDU pdu = cmdRespEvent.getPDU();
98         if (pdu == null) {
99             LOG.warn("Ignoring PDU.");
100             return;
101         }
102
103         final UdpAddress address = (UdpAddress) cmdRespEvent.getPeerAddress();
104         final ZoneId optZoneId = timeZoneOffsetService.getTimeZone(address.getInetAddress().getHostAddress());
105         final String timeZone = Optional.ofNullable(optZoneId)
106             .map(zoneId -> "UTC" + LocalDateTime.now().atZone(zoneId).getOffset().toString()).orElse(null);
107
108         mapper.toEvent(address, timeZone, pdu)
109                 .flatMapCompletable(vesEventNotifier::notifyEvents)
110                 .doOnSubscribe(result -> LOG.debug("SNMP Trap processing started"))
111                 .doOnComplete(() -> LOG.debug("SNMP Trap processed successfully"))
112                 .doOnError(error -> LOG.error("Failed to process SNMP Trap", error))
113                 .subscribe();
114     }
115 }