Merge "VES Heartbeat and Software Management Feature"
[oam/tr069-adapter.git] / ves-agent / src / main / java / org / commscope / tr069adapter / vesagent / async / WaitForNotifications.java
diff --git a/ves-agent/src/main/java/org/commscope/tr069adapter/vesagent/async/WaitForNotifications.java b/ves-agent/src/main/java/org/commscope/tr069adapter/vesagent/async/WaitForNotifications.java
new file mode 100644 (file)
index 0000000..614ce44
--- /dev/null
@@ -0,0 +1,100 @@
+package org.commscope.tr069adapter.vesagent.async;\r
+\r
+import java.util.HashMap;\r
+import java.util.Map;\r
+import java.util.concurrent.Future;\r
+import java.util.concurrent.Semaphore;\r
+import java.util.concurrent.TimeUnit;\r
+\r
+import org.commscope.tr069adapter.acs.common.DeviceRPCResponse;\r
+import org.commscope.tr069adapter.acs.common.OperationCode;\r
+import org.commscope.tr069adapter.acs.common.OperationResponse;\r
+import org.commscope.tr069adapter.acs.common.dto.CustomOperationCode;\r
+import org.commscope.tr069adapter.mapper.model.VESNotification;\r
+import org.commscope.tr069adapter.vesagent.util.VesAgentConstants;\r
+import org.commscope.tr069adapter.vesagent.util.VesAgentUtils;\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+import org.springframework.stereotype.Component;\r
+\r
+@Component\r
+public class WaitForNotifications {\r
+\r
+  private static final Logger LOG = LoggerFactory.getLogger(WaitForNotifications.class);\r
+\r
+  private static Map<String, Future<DeviceRPCResponse>> opFutureMap = new HashMap<>();\r
+  private static Map<String, DeviceRPCResponse> opResultMap = new HashMap<>();\r
+  private static Map<String, Semaphore> semaphoreMap = new HashMap<>();\r
+\r
+  public void notifyDeviceNotification(VESNotification notification) {\r
+    String deviceOperationKey = VesAgentUtils.getDeviceOperationKey(\r
+        notification.getDevnotification().getDeviceDetails().getDeviceId(),\r
+        CustomOperationCode.CONNECT);\r
+\r
+    if (!semaphoreMap.containsKey(deviceOperationKey)) {\r
+      return;\r
+    }\r
+\r
+    DeviceRPCResponse response = new DeviceRPCResponse();\r
+    response.setDeviceDetails(notification.getDevnotification().getDeviceDetails());\r
+\r
+    OperationResponse operationResponse = new OperationResponse();\r
+    operationResponse.setStatus(VesAgentConstants.DEVICE_IS_REACHABLE);\r
+    operationResponse.setOperationCode(CustomOperationCode.CONNECT);\r
+\r
+    response.setOperationResponse(operationResponse);\r
+\r
+    opResultMap.put(deviceOperationKey, response);\r
+    Semaphore mutex = semaphoreMap.remove(deviceOperationKey);\r
+    mutex.release();\r
+  }\r
+\r
+\r
+  public void notifyResult(DeviceRPCResponse opResult) {\r
+    String deviceOperationKey =\r
+        VesAgentUtils.getDeviceOperationKey(opResult.getDeviceDetails().getDeviceId(),\r
+            opResult.getOperationResponse().getOperationCode());\r
+\r
+    if (!semaphoreMap.containsKey(deviceOperationKey)) {\r
+      return;\r
+    }\r
+\r
+    opResultMap.put(deviceOperationKey, opResult);\r
+    Semaphore mutex = semaphoreMap.remove(deviceOperationKey);\r
+    mutex.release();\r
+  }\r
+\r
+  public DeviceRPCResponse getOperationResult(String deviceId, OperationCode opCode) {\r
+    return opResultMap.remove(VesAgentUtils.getDeviceOperationKey(deviceId, opCode));\r
+  }\r
+\r
+  public boolean waitForResult(String deviceId, OperationCode opCode,\r
+      Future<DeviceRPCResponse> futureResponse, long timeout) throws InterruptedException {\r
+    LOG.debug("Waiting for operation result for device:{}, operation: {}", deviceId, opCode);\r
+\r
+    String deviceOperationKey = VesAgentUtils.getDeviceOperationKey(deviceId, opCode);\r
+    opFutureMap.put(deviceOperationKey, futureResponse);\r
+\r
+    Semaphore semaphore = new Semaphore(0);\r
+    semaphoreMap.put(deviceOperationKey, semaphore);\r
+\r
+    LOG.debug("Semaphore MAP size = {}", semaphoreMap.size());\r
+    LOG.debug("opResultMap MAP size = {}", opResultMap.size());\r
+    LOG.debug("opFutureMap MAP size = {}", opFutureMap.size());\r
+\r
+    return semaphore.tryAcquire(timeout, TimeUnit.SECONDS);\r
+  }\r
+\r
+  public void stopOperation(String deviceId, OperationCode opCode) {\r
+    LOG.debug("Stopping waiting for operation result thread for device:{}, operation: {}", deviceId,\r
+        opCode);\r
+\r
+    Future<DeviceRPCResponse> operationInstance =\r
+        opFutureMap.remove(VesAgentUtils.getDeviceOperationKey(deviceId, opCode));\r
+\r
+    if (null != operationInstance) {\r
+      LOG.info("Stopping operation result waiting thread for operation : {}", operationInstance);\r
+      operationInstance.cancel(true);\r
+    }\r
+  }\r
+}\r