1 package org.commscope.tr069adapter.vesagent.async;
\r
3 import java.util.HashMap;
\r
4 import java.util.Map;
\r
5 import java.util.concurrent.Future;
\r
6 import java.util.concurrent.Semaphore;
\r
7 import java.util.concurrent.TimeUnit;
\r
9 import org.commscope.tr069adapter.acs.common.DeviceRPCResponse;
\r
10 import org.commscope.tr069adapter.acs.common.OperationCode;
\r
11 import org.commscope.tr069adapter.acs.common.OperationResponse;
\r
12 import org.commscope.tr069adapter.acs.common.dto.CustomOperationCode;
\r
13 import org.commscope.tr069adapter.mapper.model.VESNotification;
\r
14 import org.commscope.tr069adapter.vesagent.util.VesAgentConstants;
\r
15 import org.commscope.tr069adapter.vesagent.util.VesAgentUtils;
\r
16 import org.slf4j.Logger;
\r
17 import org.slf4j.LoggerFactory;
\r
18 import org.springframework.stereotype.Component;
\r
21 public class WaitForNotifications {
\r
23 private static final Logger LOG = LoggerFactory.getLogger(WaitForNotifications.class);
\r
25 private static Map<String, Future<DeviceRPCResponse>> opFutureMap = new HashMap<>();
\r
26 private static Map<String, DeviceRPCResponse> opResultMap = new HashMap<>();
\r
27 private static Map<String, Semaphore> semaphoreMap = new HashMap<>();
\r
29 public void notifyDeviceNotification(VESNotification notification) {
\r
30 String deviceOperationKey = VesAgentUtils.getDeviceOperationKey(
\r
31 notification.getDevnotification().getDeviceDetails().getDeviceId(),
\r
32 CustomOperationCode.CONNECT);
\r
34 if (!semaphoreMap.containsKey(deviceOperationKey)) {
\r
38 DeviceRPCResponse response = new DeviceRPCResponse();
\r
39 response.setDeviceDetails(notification.getDevnotification().getDeviceDetails());
\r
41 OperationResponse operationResponse = new OperationResponse();
\r
42 operationResponse.setStatus(VesAgentConstants.DEVICE_IS_REACHABLE);
\r
43 operationResponse.setOperationCode(CustomOperationCode.CONNECT);
\r
45 response.setOperationResponse(operationResponse);
\r
47 opResultMap.put(deviceOperationKey, response);
\r
48 Semaphore mutex = semaphoreMap.remove(deviceOperationKey);
\r
53 public void notifyResult(DeviceRPCResponse opResult) {
\r
54 String deviceOperationKey =
\r
55 VesAgentUtils.getDeviceOperationKey(opResult.getDeviceDetails().getDeviceId(),
\r
56 opResult.getOperationResponse().getOperationCode());
\r
58 if (!semaphoreMap.containsKey(deviceOperationKey)) {
\r
62 opResultMap.put(deviceOperationKey, opResult);
\r
63 Semaphore mutex = semaphoreMap.remove(deviceOperationKey);
\r
67 public DeviceRPCResponse getOperationResult(String deviceId, OperationCode opCode) {
\r
68 return opResultMap.remove(VesAgentUtils.getDeviceOperationKey(deviceId, opCode));
\r
71 public boolean waitForResult(String deviceId, OperationCode opCode,
\r
72 Future<DeviceRPCResponse> futureResponse, long timeout) throws InterruptedException {
\r
73 LOG.debug("Waiting for operation result for device:{}, operation: {}", deviceId, opCode);
\r
75 String deviceOperationKey = VesAgentUtils.getDeviceOperationKey(deviceId, opCode);
\r
76 opFutureMap.put(deviceOperationKey, futureResponse);
\r
78 Semaphore semaphore = new Semaphore(0);
\r
79 semaphoreMap.put(deviceOperationKey, semaphore);
\r
81 LOG.debug("Semaphore MAP size = {}", semaphoreMap.size());
\r
82 LOG.debug("opResultMap MAP size = {}", opResultMap.size());
\r
83 LOG.debug("opFutureMap MAP size = {}", opFutureMap.size());
\r
85 return semaphore.tryAcquire(timeout, TimeUnit.SECONDS);
\r
88 public void stopOperation(String deviceId, OperationCode opCode) {
\r
89 LOG.debug("Stopping waiting for operation result thread for device:{}, operation: {}", deviceId,
\r
92 Future<DeviceRPCResponse> operationInstance =
\r
93 opFutureMap.remove(VesAgentUtils.getDeviceOperationKey(deviceId, opCode));
\r
95 if (null != operationInstance) {
\r
96 LOG.info("Stopping operation result waiting thread for operation : {}", operationInstance);
\r
97 operationInstance.cancel(true);
\r