ad588259d379d48b7ebf481640097afef5cb7119
[oam/oam-controller.git] /
1 /*
2  * ============LICENSE_START========================================================================
3  * ONAP : ccsdk feature sdnr wt
4  * =================================================================================================
5  * Copyright (C) 2019 highstreet technologies GmbH Intellectual Property. All rights reserved.
6  * =================================================================================================
7  * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
8  * in compliance with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software distributed under the License
13  * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
14  * or implied. See the License for the specific language governing permissions and limitations under
15  * the License.
16  * ============LICENSE_END==========================================================================
17  */
18 package org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl;
19
20 import java.util.Collection;
21 import java.util.List;
22 import java.util.Objects;
23 import java.util.Optional;
24 import java.util.concurrent.CopyOnWriteArrayList;
25 import javax.annotation.Nullable;
26 import org.eclipse.jdt.annotation.NonNull;
27 import org.onap.ccsdk.features.sdnr.wt.common.configuration.ConfigurationFileRepresentation;
28 import org.onap.ccsdk.features.sdnr.wt.common.configuration.filechange.IConfigChangedListener;
29 import org.onap.ccsdk.features.sdnr.wt.common.threading.GenericRunnableFactory;
30 import org.onap.ccsdk.features.sdnr.wt.common.threading.KeyBasedThreadpool;
31 import org.onap.ccsdk.features.sdnr.wt.dataprovider.model.IEntityDataProvider;
32 import org.onap.ccsdk.features.sdnr.wt.dataprovider.model.StatusChangedHandler.StatusKey;
33 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.DomContext;
34 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfAccessor;
35 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeConnectListener;
36 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeStateListener;
37 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.NetconfNodeStateService;
38 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.VesNotificationListener;
39 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.NetconfAccessorManager;
40 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.NetconfCommunicatorManager;
41 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.access.dom.DomContextImpl;
42 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.conf.NetconfStateConfig;
43 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.conf.odlAkka.AkkaConfig;
44 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.conf.odlAkka.ClusterConfig;
45 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.conf.odlGeo.GeoConfig;
46 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.rpc.NetconfnodeStateServiceRpcApiImpl;
47 import org.onap.ccsdk.features.sdnr.wt.netconfnodestateservice.impl.rpc.RpcApigetStateCallback;
48 import org.opendaylight.mdsal.binding.api.DataBroker;
49 import org.opendaylight.mdsal.binding.api.DataObjectModification;
50 import org.opendaylight.mdsal.binding.api.DataObjectModification.ModificationType;
51 import org.opendaylight.mdsal.binding.api.DataTreeChangeListener;
52 import org.opendaylight.mdsal.binding.api.DataTreeIdentifier;
53 import org.opendaylight.mdsal.binding.api.DataTreeModification;
54 import org.opendaylight.mdsal.binding.api.MountPointService;
55 import org.opendaylight.mdsal.binding.api.NotificationPublishService;
56 import org.opendaylight.mdsal.binding.api.RpcProviderService;
57 import org.opendaylight.mdsal.common.api.LogicalDatastoreType;
58 import org.opendaylight.mdsal.dom.api.DOMDataBroker;
59 import org.opendaylight.mdsal.dom.api.DOMMountPointService;
60 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev241009.ConnectionOper.ConnectionStatus;
61 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.device.rev241009.connection.oper.ClusteredConnectionStatus;
62 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev240911.NetconfNodeAugment;
63 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev240911.netconf.node.augment.NetconfNode;
64 import org.opendaylight.yang.gen.v1.urn.opendaylight.netconf.node.topology.rev240911.network.topology.topology.topology.types.TopologyNetconf;
65 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.netconfnode.state.rev191011.GetStatusInput;
66 import org.opendaylight.yang.gen.v1.urn.opendaylight.params.xml.ns.yang.netconfnode.state.rev191011.GetStatusOutputBuilder;
67 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NetworkTopology;
68 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.NodeId;
69 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.TopologyId;
70 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.Topology;
71 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.TopologyKey;
72 import org.opendaylight.yang.gen.v1.urn.tbd.params.xml.ns.yang.network.topology.rev131021.network.topology.topology.Node;
73 import org.opendaylight.yangtools.binding.data.codec.api.BindingNormalizedNodeSerializer;
74 import org.opendaylight.yangtools.concepts.Registration;
75 import org.opendaylight.yangtools.yang.binding.InstanceIdentifier;
76 import org.opendaylight.yangtools.yang.parser.api.YangParserException;
77 import org.opendaylight.yangtools.yang.parser.api.YangParserFactory;
78 import org.slf4j.Logger;
79 import org.slf4j.LoggerFactory;
80
81 public class NetconfNodeStateServiceImpl
82         implements NetconfNodeStateService, RpcApigetStateCallback, AutoCloseable, IConfigChangedListener {
83
84     private static final Logger LOG = LoggerFactory.getLogger(NetconfNodeStateServiceImpl.class);
85     private static final String APPLICATION_NAME = "NetconfNodeStateService";
86     private static final String CONFIGURATIONFILE = "etc/netconfnode-status-service.properties";
87
88     private static final @NonNull InstanceIdentifier<Topology> NETCONF_TOPO_IID =
89             InstanceIdentifier.create(NetworkTopology.class).child(Topology.class,
90                     new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName())));
91
92     private static final @NonNull InstanceIdentifier<Node> NETCONF_NODE_TOPO_IID =
93             InstanceIdentifier.create(NetworkTopology.class)
94                     .child(Topology.class, new TopologyKey(new TopologyId(TopologyNetconf.QNAME.getLocalName())))
95                     .child(Node.class);
96
97     private static final @NonNull DataTreeIdentifier<Node> NETCONF_NODE_TOPO_TREE_ID =
98             DataTreeIdentifier.create(LogicalDatastoreType.OPERATIONAL, NETCONF_NODE_TOPO_IID);
99
100     // Name of ODL controller NETCONF instance
101     private static final NodeId CONTROLLER = new NodeId("controller-config");
102     private static final int ASYNC_EXECUTION_POOLSIZE = 20;
103
104     // -- OSGi services, provided
105     private DataBroker dataBroker;
106     private DOMDataBroker domDataBroker;
107     private MountPointService mountPointService;
108     private DOMMountPointService domMountPointService;
109     private RpcProviderService rpcProviderRegistry;
110     private IEntityDataProvider iEntityDataProvider;
111     @SuppressWarnings("unused")
112     private NotificationPublishService notificationPublishService;
113     private YangParserFactory yangParserFactory;
114     private BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer;
115
116     // -- Parameter
117     private Registration listenerL1;
118
119     private NetconfnodeStateServiceRpcApiImpl rpcApiService;
120
121     /**
122      * Indication if init() function called and fully executed
123      **/
124     private Boolean initializationSuccessful;
125
126     /**
127      * Manager accessor objects for connection
128      **/
129     private NetconfAccessorManager accessorManager;
130
131     /**
132      * List of all registered listeners
133      **/
134     private final List<NetconfNodeConnectListener> netconfNodeConnectListenerList;
135
136     /**
137      * List of all registered listeners
138      **/
139     private final List<NetconfNodeStateListener> netconfNodeStateListenerList;
140
141     /**
142      * List of all registered listeners
143      **/
144     private final List<VesNotificationListener> vesNotificationListenerList;
145
146     /**
147      * Indicates if running in cluster configuration
148      **/
149     private boolean isCluster;
150
151     /**
152      * Indicates the name of the cluster
153      **/
154     private String clusterName;
155
156     /**
157      * nodeId to threadPool (size=1) for datatreechange handling)
158      **/
159     //    private final Map<String, ExecutorService> handlingPool;
160     private KeyBasedThreadpool<NodeId, NetconfChangeDataHolder> handlingPool;
161
162     private boolean handleDataTreeAsync;
163
164     private ConfigurationFileRepresentation configFileRepresentation;
165     private NetconfStateConfig config;
166     private NetconfCommunicatorManager netconfCommunicatorManager;
167     private DomContext domContext;
168
169     /**
170      * Blueprint
171      **/
172     public NetconfNodeStateServiceImpl() {
173         LOG.info("Creating provider for {}", APPLICATION_NAME);
174
175         this.dataBroker = null;
176         this.domDataBroker = null;
177         this.mountPointService = null;
178         this.domMountPointService = null;
179         this.rpcProviderRegistry = null;
180         this.notificationPublishService = null;
181         this.yangParserFactory = null;
182         this.domContext = null;
183
184         this.listenerL1 = null;
185         this.initializationSuccessful = false;
186         this.netconfNodeConnectListenerList = new CopyOnWriteArrayList<>();
187         this.netconfNodeStateListenerList = new CopyOnWriteArrayList<>();
188         this.vesNotificationListenerList = new CopyOnWriteArrayList<>();
189         this.accessorManager = null;
190         this.handlingPool = null;
191     }
192
193     public void setDataBroker(DataBroker dataBroker) {
194         this.dataBroker = dataBroker;
195     }
196
197     public void setDomDataBroker(DOMDataBroker domDataBroker) {
198         this.domDataBroker = domDataBroker;
199     }
200
201     public void setRpcProviderRegistry(RpcProviderService rpcProviderRegistry) {
202         this.rpcProviderRegistry = rpcProviderRegistry;
203     }
204
205     public void setNotificationPublishService(NotificationPublishService notificationPublishService) {
206         this.notificationPublishService = notificationPublishService;
207     }
208
209     public void setMountPointService(MountPointService mountPointService) {
210         this.mountPointService = mountPointService;
211     }
212
213     public void setDomMountPointService(DOMMountPointService domMountPointService) {
214         this.domMountPointService = domMountPointService;
215     }
216
217     public void setEntityDataProvider(IEntityDataProvider iEntityDataProvider) {
218         this.iEntityDataProvider = iEntityDataProvider;
219     }
220
221     public void setYangParserFactory(YangParserFactory yangParserFactory) {
222         this.yangParserFactory = yangParserFactory;
223     }
224
225     public void setBindingNormalizedNodeSerializer(BindingNormalizedNodeSerializer bindingNormalizedNodeSerializer) {
226         this.bindingNormalizedNodeSerializer = bindingNormalizedNodeSerializer;
227     }
228
229     /**
230      * Blueprint initialization
231      *
232      * @throws YangParserException
233      **/
234     public void init() {
235
236         LOG.info("Session Initiated start {}", APPLICATION_NAME);
237         this.domContext = new DomContextImpl(this.yangParserFactory, this.bindingNormalizedNodeSerializer);
238         this.netconfCommunicatorManager =
239                 new NetconfCommunicatorManager(mountPointService, domMountPointService, domContext);
240         this.accessorManager = new NetconfAccessorManager(netconfCommunicatorManager, domContext, this);
241         // Start RPC Service
242         this.rpcApiService = new NetconfnodeStateServiceRpcApiImpl(rpcProviderRegistry, vesNotificationListenerList);
243         // Get configuration
244         this.configFileRepresentation = new ConfigurationFileRepresentation(CONFIGURATIONFILE);
245         this.config = new NetconfStateConfig(this.configFileRepresentation);
246         this.handleDataTreeAsync = this.config.handleAsync();
247         this.configFileRepresentation.registerConfigChangedListener(this);
248
249         // Akka setup
250         AkkaConfig akkaConfig = getAkkaConfig();
251         this.isCluster = akkaConfig == null ? false : akkaConfig.isCluster();
252         this.clusterName = akkaConfig == null ? "" : akkaConfig.getClusterConfig().getClusterSeedNodeName("abc");
253
254         // Provide status information
255         ClusterConfig cc = akkaConfig == null ? null : akkaConfig.getClusterConfig();
256         this.iEntityDataProvider.setStatus(StatusKey.CLUSTER_SIZE,
257                 cc == null ? "1" : String.format("%d", cc.getClusterSize()));
258
259         // RPC Service for specific services
260         this.rpcApiService.setStatusCallback(this);
261
262         LOG.debug("start NetconfSubscriptionManager Service");
263         //this.netconfChangeListener = new NetconfChangeListener(this, dataBroker);
264         //this.netconfChangeListener.register();
265         //DataTreeIdentifier<Node> treeId = new DataTreeIdentifier<>(LogicalDatastoreType.OPERATIONAL, NETCONF_NODE_TOPO_IID);
266
267         listenerL1 = dataBroker.registerTreeChangeListener(NETCONF_NODE_TOPO_TREE_ID, new L1());
268
269         this.handlingPool = new KeyBasedThreadpool<NodeId, NetconfChangeDataHolder>(
270                 this.config.getAsyncHandlingPoolsize(), 1,
271                 new GenericRunnableFactory<>() {
272                     public Runnable create(final NodeId key, final NetconfChangeDataHolder arg) {
273                         return new Runnable() {
274
275                             @Override
276                             public void run() {
277                                 NetconfNodeStateServiceImpl.this.handleDataTreeChange(arg.root, key,
278                                         arg.modificationTyp);
279                             }
280                         };
281                     }
282
283                     ;
284                 });
285         this.initializationSuccessful = true;
286
287         LOG.info("Session Initiated end. Initialization done {}", initializationSuccessful);
288
289     }
290
291     /**
292      * Blueprint destroy-method method
293      */
294     public void destroy() {
295         close();
296     }
297
298     public DomContext getDomContext() {
299         return Objects.requireNonNull(domContext, "Initialization not completed for domContext");
300     }
301
302     public DataBroker getDataBroker() {
303         return dataBroker;
304     }
305
306     public DOMDataBroker getDOMDataBroker() {
307         return domDataBroker;
308     }
309
310     public NetconfnodeStateServiceRpcApiImpl getNetconfnodeStateServiceRpcApiImpl() {
311         return Objects.requireNonNull(rpcApiService, "Initialization not completed for rpcApiService");
312     }
313
314     @Override
315     public GetStatusOutputBuilder getStatus(GetStatusInput input) {
316         return new GetStatusOutputBuilder();
317     }
318
319     @Override
320     public <L extends NetconfNodeConnectListener> @NonNull Registration registerNetconfNodeConnectListener(
321             final @NonNull L netconfNodeConnectListener) {
322         LOG.debug("Register connect listener {}", netconfNodeConnectListener.getClass().getName());
323         netconfNodeConnectListenerList.add(netconfNodeConnectListener);
324
325         return new Registration() {
326
327             //@Override
328             public @NonNull L getInstance() {
329                 return netconfNodeConnectListener;
330             }
331
332             @Override
333             public void close() {
334                 LOG.debug("Remove connect listener {}", netconfNodeConnectListener);
335                 netconfNodeConnectListenerList.remove(netconfNodeConnectListener);
336             }
337         };
338     }
339
340     @Override
341     public <L extends NetconfNodeStateListener> @NonNull Registration registerNetconfNodeStateListener(
342             @NonNull L netconfNodeStateListener) {
343         LOG.debug("Register state listener {}", netconfNodeStateListener.getClass().getName());
344         netconfNodeStateListenerList.add(netconfNodeStateListener);
345
346         return new Registration() {
347             //@Override
348             public @NonNull L getInstance() {
349                 return netconfNodeStateListener;
350             }
351
352             @Override
353             public void close() {
354                 LOG.debug("Remove state listener {}", netconfNodeStateListener);
355                 netconfNodeStateListenerList.remove(netconfNodeStateListener);
356             }
357         };
358     }
359
360     @Override
361     public <L extends VesNotificationListener> @NonNull Registration registerVesNotifications(
362             @NonNull L vesNotificationListener) {
363         LOG.debug("Register Ves notification listener {}", vesNotificationListener.getClass().getName());
364         vesNotificationListenerList.add(vesNotificationListener);
365
366         return new Registration() {
367             //@Override
368             public @NonNull L getInstance() {
369                 return vesNotificationListener;
370             }
371
372             @Override
373             public void close() {
374                 LOG.debug("Remove Ves notification listener {}", vesNotificationListener);
375                 vesNotificationListenerList.remove(vesNotificationListener);
376             }
377         };
378     }
379
380     @Override
381     public void close() {
382         LOG.info("Closing start ...");
383         try {
384             close(rpcApiService, listenerL1);
385         } catch (Exception e) {
386             LOG.debug("Closing", e);
387         }
388         LOG.info("Closing done");
389     }
390
391     /**
392      * Used to close all Services, that should support AutoCloseable Pattern
393      *
394      * @param toCloseList
395      * @throws Exception
396      */
397     private void close(AutoCloseable... toCloseList) throws Exception {
398         for (AutoCloseable element : toCloseList) {
399             if (element != null) {
400                 element.close();
401             }
402         }
403         this.configFileRepresentation.unregisterConfigChangedListener(this);
404     }
405
406     /**
407      * Indication if init() of this bundle successfully done.
408      *
409      * @return true if init() was successful. False if not done or not successful.
410      */
411     public boolean isInitializationSuccessful() {
412         return this.initializationSuccessful;
413     }
414
415     /*-------------------------------------------------------------------------------------------
416      * Functions for interface DeviceManagerService
417      */
418
419     /**
420      * For each mounted device a mountpoint is created and this listener is called. Mountpoint was created or existing.
421      * Managed device is now fully connected to node/mountpoint.
422      *
423      * @param nNodeId     id of the mountpoint
424      * @param netconfNode mountpoint contents
425      */
426     private void enterConnectedState(NodeId nNodeId, NetconfNode netconfNode) {
427
428         String mountPointNodeName = nNodeId.getValue();
429         LOG.debug("Access connected state for mountpoint {}", mountPointNodeName);
430
431         boolean preConditionMissing = false;
432         if (mountPointService == null) {
433             preConditionMissing = true;
434             LOG.warn("No mountservice available.");
435         }
436         if (!initializationSuccessful) {
437             preConditionMissing = true;
438             LOG.warn("Devicemanager initialization still pending.");
439         }
440         if (preConditionMissing) {
441             return;
442         }
443
444         boolean isNetconfNodeMaster = isNetconfNodeMaster(netconfNode);
445         LOG.debug("isNetconfNodeMaster indication {} for mountpoint {}", isNetconfNodeMaster, mountPointNodeName);
446         if (isNetconfNodeMaster) {
447             NetconfAccessor acessor = accessorManager.getAccessor(nNodeId, netconfNode);
448             /*
449              * --> Call Listers for onConnect() Indication
450                for (all)
451              */
452             netconfNodeConnectListenerList.forEach(item -> {
453                 try {
454                     item.onEnterConnected(acessor);
455                 } catch (Exception e) {
456                     LOG.debug("Exception during onEnterConnected listener call", e);
457                 }
458             });
459
460             LOG.debug("Connect indication forwarded for {}", mountPointNodeName);
461         }
462     }
463
464     /**
465      * Leave the connected status to a non connected or removed status for master mountpoint
466      *
467      * @param nNodeId             id of the mountpoint
468      * @param optionalNetconfNode mountpoint contents or not available on remove
469      */
470     private void leaveConnectedState(NodeId nNodeId, Optional<NetconfNode> optionalNetconfNode) {
471         String mountPointNodeName = nNodeId.getValue();
472         LOG.debug("leaveConnectedState id {}", mountPointNodeName);
473
474         if (this.accessorManager.containes(nNodeId)) {
475             netconfNodeConnectListenerList.forEach(item -> {
476                 try {
477                     if (item != null) {
478                         item.onLeaveConnected(nNodeId, optionalNetconfNode);
479                     } else {
480                         LOG.warn("Unexpeced null item during onleave");
481                     }
482                 } catch (Exception e) {
483                     LOG.debug("Exception during onLeaveConnected listener call", e);
484                 }
485             });
486             LOG.debug("Remove Master mountpoint {}", mountPointNodeName);
487             this.accessorManager.removeAccessor(nNodeId);
488         } else {
489             LOG.debug("Master mountpoint already removed {}", mountPointNodeName);
490         }
491     }
492
493     // ---- onDataTreeChangedHandler
494
495     private void handleDataTreeChange(DataObjectModification<Node> root, NodeId nodeId,
496             ModificationType modificationTyp) {
497         // Move status into boolean flags for
498         boolean connectedBefore, connectedAfter, created;
499         NetconfNode nNodeAfter = getNetconfNode(root.dataAfter());
500         connectedAfter = isConnected(nNodeAfter);
501         if (root.getDataBefore() != null) {
502             // It is an update or delete
503             NetconfNode nodeBefore = getNetconfNode(root.dataBefore());
504             connectedBefore = isConnected(nodeBefore);
505             created = false;
506         } else {
507             // It is a create
508             connectedBefore = false;
509             created = true;
510         }
511         LOG.debug("L1 NETCONF id:{} t:{} created {} before:{} after:{} akkaIsCluster:{} cl stat:{}", nodeId,
512                 modificationTyp, created, connectedBefore, connectedAfter, isCluster,
513                 getClusteredConnectionStatus(nNodeAfter));
514         switch (modificationTyp) {
515             case SUBTREE_MODIFIED: // Create or modify sub level node
516             case WRITE: // Create or modify top level node
517                 // Treat an overwrite as an update
518                 // leaveConnected state.before = connected; state.after != connected
519                 // enterConnected state.after == connected
520                 // => Here create or update by checking root.getDataBefore() != null
521                 boolean handled = false;
522                 if (created) {
523                     handled = true;
524                     netconfNodeStateListenerList.forEach(item -> {
525                         try {
526                             item.onCreated(nodeId, nNodeAfter);
527                         } catch (Exception e) {
528                             LOG.info("Exception during onCreated listener call", e);
529                         }
530                     });
531                 }
532                 if (!connectedBefore && connectedAfter) {
533                     handled = true;
534                     enterConnectedState(nodeId, nNodeAfter);
535                 }
536                 if (connectedBefore && !connectedAfter) {
537                     handled = true;
538                     leaveConnectedState(nodeId, Optional.of(nNodeAfter));
539                 }
540                 if (!handled) {
541                     //Change if not handled by the messages before
542                     netconfNodeStateListenerList.forEach(item -> {
543                         try {
544                             item.onStateChange(nodeId, nNodeAfter);
545                         } catch (Exception e) {
546                             LOG.info("Exception during onStateChange listener call", e);
547                         }
548                     });
549                 }
550                 // doProcessing(update ? Action.UPDATE : Action.CREATE, nodeId, root);
551                 break;
552             case DELETE:
553                 // Node removed
554                 // leaveconnected state.before = connected;
555                 if (!connectedBefore) {
556                     leaveConnectedState(nodeId, Optional.empty());
557                 }
558                 netconfNodeStateListenerList.forEach(item -> {
559                     try {
560                         item.onRemoved(nodeId);
561                     } catch (Exception e) {
562                         LOG.info("Exception during onRemoved listener call", e);
563                     }
564                 });
565                 // doProcessing(Action.REMOVE, nodeId, root);
566                 break;
567         }
568     }
569
570     private void onDataTreeChangedHandler(@NonNull Collection<DataTreeModification<Node>> changes) {
571         for (final DataTreeModification<Node> change : changes) {
572
573             final DataObjectModification<Node> root = change.getRootNode();
574             if (LOG.isTraceEnabled()) {
575                 LOG.trace("Handle this modificationType:{} path:{} root:{}", root.getModificationType(),
576                         change.getRootPath(), root);
577             }
578
579             // Catch potential nullpointer exceptions ..
580             try {
581                 ModificationType modificationTyp = root.getModificationType();
582                 Node node = modificationTyp == ModificationType.DELETE ? root.getDataBefore() : root.getDataAfter();
583                 NodeId nodeId = node != null ? node.getNodeId() : null;
584                 if (nodeId == null) {
585                     LOG.warn("L1 without nodeid.");
586                 } else {
587                     if (nodeId.equals(CONTROLLER)) {
588                         // Do not forward any controller related events to devicemanager
589                         LOG.debug("Stop processing for [{}]", nodeId);
590                     } else {
591                         if (modificationTyp == null) {
592                             LOG.warn("L1 empty modification type");
593                         } else {
594                             LOG.trace("handle data tree change with async={}", this.handleDataTreeAsync);
595                             if (this.handleDataTreeAsync) {
596                                 this.handlingPool.execute(nodeId, new NetconfChangeDataHolder(root, modificationTyp));
597
598                             } else {
599                                 handleDataTreeChange(root, nodeId, modificationTyp);
600                             }
601                         }
602                     }
603                 }
604             } catch (NullPointerException | IllegalStateException e) {
605                 LOG.debug("Data not available at ", e);
606             }
607         } //for
608         LOG.debug("datatreechanged handler completed");
609     }
610
611     // ---- subclasses for listeners
612
613     /**
614      * Clustered listener function to select the right node from DataObjectModification. Called at all nodes.
615      */
616     private class L1 implements DataTreeChangeListener<Node> {
617
618         @Override
619         public void onDataTreeChanged(@NonNull List<DataTreeModification<Node>> changes) {
620             LOG.debug("L1 TreeChange enter changes:{}", changes.size());
621             //Debug AkkTimeout NetconfNodeStateServiceImpl.this.pool.execute(new Thread( () -> onDataTreeChangedHandler(changes)));
622             onDataTreeChangedHandler(changes);
623             LOG.debug("L1 TreeChange leave");
624         }
625     }
626
627     /* --- private helpers --- */
628     private static @Nullable NetconfNode getNetconfNode(Node node) {
629         if (node == null) {
630             return null;
631         }
632         final var aug = node.augmentation(NetconfNodeAugment.class);
633         return aug != null ? aug.getNetconfNode() : null;
634     }
635
636     private static boolean isConnected(NetconfNode nNode) {
637         return nNode != null ? ConnectionStatus.Connected.equals(nNode.getConnectionStatus()) : false;
638     }
639
640     private static @Nullable ClusteredConnectionStatus getClusteredConnectionStatus(NetconfNode node) {
641         return node != null ? node.getClusteredConnectionStatus() : null;
642     }
643
644     /* -- LOG related functions -- */
645
646     /**
647      * Analyze configuration
648      **/
649     private static @Nullable AkkaConfig getAkkaConfig() {
650         AkkaConfig akkaConfig;
651         try {
652             akkaConfig = AkkaConfig.load();
653             LOG.debug("akka.conf loaded: " + akkaConfig.toString());
654         } catch (Exception e1) {
655             akkaConfig = null;
656             LOG.warn("problem loading akka.conf: " + e1.getMessage());
657         }
658         if (akkaConfig != null && akkaConfig.isCluster()) {
659             LOG.info("cluster mode detected");
660             if (GeoConfig.fileExists()) {
661                 try {
662                     LOG.debug("try to load geoconfig");
663                     GeoConfig.load();
664                 } catch (Exception err) {
665                     LOG.warn("problem loading geoconfig: " + err.getMessage());
666                 }
667             } else {
668                 LOG.debug("no geoconfig file found");
669             }
670         } else {
671             LOG.info("single node mode detected");
672         }
673         return akkaConfig;
674     }
675
676     private boolean isNetconfNodeMaster(NetconfNode nNode) {
677         if (this.isCluster) {
678             LOG.debug("check if me is responsible for node");
679             ClusteredConnectionStatus ccs = nNode.getClusteredConnectionStatus();
680             @NonNull
681             String masterNodeName =
682                     ccs == null || ccs.getNetconfMasterNode() == null ? "null" : ccs.getNetconfMasterNode();
683             LOG.debug("sdnMasterNode=" + masterNodeName + " and sdnMyNode=" + clusterName);
684             if (!masterNodeName.equals(clusterName)) {
685                 LOG.debug("netconf change but me is not master for this node");
686                 return false;
687             }
688         }
689         return true;
690     }
691
692
693     @Override
694     public void onConfigChanged() {
695         this.handleDataTreeAsync = this.config.handleAsync();
696         //setting poolsize is not possible atm
697         //this.handlingPool.setPoolSize(this.config.getAsyncHandlingPoolsize());
698
699     }
700
701     public class NetconfChangeDataHolder {
702
703         protected final DataObjectModification<Node> root;
704         protected final ModificationType modificationTyp;
705
706         public NetconfChangeDataHolder(DataObjectModification<Node> root, ModificationType modificationTyp) {
707             this.root = root;
708             this.modificationTyp = modificationTyp;
709         }
710
711     }
712 }