--- /dev/null
+package org.commscope.tr069adapter.vesagent.async;\r
+\r
+import java.util.HashMap;\r
+import java.util.Map;\r
+import java.util.concurrent.Future;\r
+import java.util.concurrent.Semaphore;\r
+import java.util.concurrent.TimeUnit;\r
+\r
+import org.commscope.tr069adapter.acs.common.DeviceRPCResponse;\r
+import org.commscope.tr069adapter.acs.common.OperationCode;\r
+import org.commscope.tr069adapter.acs.common.OperationResponse;\r
+import org.commscope.tr069adapter.acs.common.dto.CustomOperationCode;\r
+import org.commscope.tr069adapter.mapper.model.VESNotification;\r
+import org.commscope.tr069adapter.vesagent.util.VesAgentConstants;\r
+import org.commscope.tr069adapter.vesagent.util.VesAgentUtils;\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+import org.springframework.stereotype.Component;\r
+\r
+@Component\r
+public class WaitForNotifications {\r
+\r
+ private static final Logger LOG = LoggerFactory.getLogger(WaitForNotifications.class);\r
+\r
+ private static Map<String, Future<DeviceRPCResponse>> opFutureMap = new HashMap<>();\r
+ private static Map<String, DeviceRPCResponse> opResultMap = new HashMap<>();\r
+ private static Map<String, Semaphore> semaphoreMap = new HashMap<>();\r
+\r
+ public void notifyDeviceNotification(VESNotification notification) {\r
+ String deviceOperationKey = VesAgentUtils.getDeviceOperationKey(\r
+ notification.getDevnotification().getDeviceDetails().getDeviceId(),\r
+ CustomOperationCode.CONNECT);\r
+\r
+ if (!semaphoreMap.containsKey(deviceOperationKey)) {\r
+ return;\r
+ }\r
+\r
+ DeviceRPCResponse response = new DeviceRPCResponse();\r
+ response.setDeviceDetails(notification.getDevnotification().getDeviceDetails());\r
+\r
+ OperationResponse operationResponse = new OperationResponse();\r
+ operationResponse.setStatus(VesAgentConstants.DEVICE_IS_REACHABLE);\r
+ operationResponse.setOperationCode(CustomOperationCode.CONNECT);\r
+\r
+ response.setOperationResponse(operationResponse);\r
+\r
+ opResultMap.put(deviceOperationKey, response);\r
+ Semaphore mutex = semaphoreMap.remove(deviceOperationKey);\r
+ mutex.release();\r
+ }\r
+\r
+\r
+ public void notifyResult(DeviceRPCResponse opResult) {\r
+ String deviceOperationKey =\r
+ VesAgentUtils.getDeviceOperationKey(opResult.getDeviceDetails().getDeviceId(),\r
+ opResult.getOperationResponse().getOperationCode());\r
+\r
+ if (!semaphoreMap.containsKey(deviceOperationKey)) {\r
+ return;\r
+ }\r
+\r
+ opResultMap.put(deviceOperationKey, opResult);\r
+ Semaphore mutex = semaphoreMap.remove(deviceOperationKey);\r
+ mutex.release();\r
+ }\r
+\r
+ public DeviceRPCResponse getOperationResult(String deviceId, OperationCode opCode) {\r
+ return opResultMap.remove(VesAgentUtils.getDeviceOperationKey(deviceId, opCode));\r
+ }\r
+\r
+ public boolean waitForResult(String deviceId, OperationCode opCode,\r
+ Future<DeviceRPCResponse> futureResponse, long timeout) throws InterruptedException {\r
+ LOG.debug("Waiting for operation result for device:{}, operation: {}", deviceId, opCode);\r
+\r
+ String deviceOperationKey = VesAgentUtils.getDeviceOperationKey(deviceId, opCode);\r
+ opFutureMap.put(deviceOperationKey, futureResponse);\r
+\r
+ Semaphore semaphore = new Semaphore(0);\r
+ semaphoreMap.put(deviceOperationKey, semaphore);\r
+\r
+ LOG.debug("Semaphore MAP size = {}", semaphoreMap.size());\r
+ LOG.debug("opResultMap MAP size = {}", opResultMap.size());\r
+ LOG.debug("opFutureMap MAP size = {}", opFutureMap.size());\r
+\r
+ return semaphore.tryAcquire(timeout, TimeUnit.SECONDS);\r
+ }\r
+\r
+ public void stopOperation(String deviceId, OperationCode opCode) {\r
+ LOG.debug("Stopping waiting for operation result thread for device:{}, operation: {}", deviceId,\r
+ opCode);\r
+\r
+ Future<DeviceRPCResponse> operationInstance =\r
+ opFutureMap.remove(VesAgentUtils.getDeviceOperationKey(deviceId, opCode));\r
+\r
+ if (null != operationInstance) {\r
+ LOG.info("Stopping operation result waiting thread for operation : {}", operationInstance);\r
+ operationInstance.cancel(true);\r
+ }\r
+ }\r
+}\r