added svcapi ui and camunda code
[it/otf.git] / otf-camunda / src / main / java / org / oran / otf / camunda / workflow / utility / WorkflowTask.java
diff --git a/otf-camunda/src/main/java/org/oran/otf/camunda/workflow/utility/WorkflowTask.java b/otf-camunda/src/main/java/org/oran/otf/camunda/workflow/utility/WorkflowTask.java
new file mode 100644 (file)
index 0000000..e7302e6
--- /dev/null
@@ -0,0 +1,169 @@
+/*  Copyright (c) 2019 AT&T Intellectual Property.                             #\r
+#                                                                              #\r
+#   Licensed under the Apache License, Version 2.0 (the "License");            #\r
+#   you may not use this file except in compliance with the License.           #\r
+#   You may obtain a copy of the License at                                    #\r
+#                                                                              #\r
+#       http://www.apache.org/licenses/LICENSE-2.0                             #\r
+#                                                                              #\r
+#   Unless required by applicable law or agreed to in writing, software        #\r
+#   distributed under the License is distributed on an "AS IS" BASIS,          #\r
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #\r
+#   See the License for the specific language governing permissions and        #\r
+#   limitations under the License.                                             #\r
+##############################################################################*/\r
+\r
+\r
+package org.oran.otf.camunda.workflow.utility;\r
+\r
+import org.oran.otf.common.utility.Utility;\r
+import com.google.common.base.Strings;\r
+import com.google.common.util.concurrent.ThreadFactoryBuilder;\r
+import java.util.ArrayList;\r
+import java.util.Collections;\r
+import java.util.List;\r
+import java.util.Map;\r
+import java.util.Set;\r
+import java.util.concurrent.ConcurrentHashMap;\r
+import java.util.concurrent.ExecutorService;\r
+import java.util.concurrent.Executors;\r
+import java.util.concurrent.Future;\r
+import java.util.concurrent.ThreadFactory;\r
+import java.util.concurrent.ThreadPoolExecutor;\r
+import java.util.concurrent.TimeUnit;\r
+\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+public class WorkflowTask {\r
+\r
+  private static final Logger logger = LoggerFactory.getLogger(WorkflowTask.class);\r
+  private static final String logPrefix = Utility.getLoggerPrefix();\r
+\r
+  public static Map<String, List<WorkflowTask>> workflowTasksByExecutionId =\r
+      new ConcurrentHashMap<>();\r
+  // The processInstanceId of the Camunda process instance the thread pool is created under.\r
+  private final String processInstanceId;\r
+  // The pool service used to create the fixed thread pool.\r
+  private final ExecutorService pool;\r
+  // Used to keep track of all the tasks to be executed, which allows tasks to easily be deleted.\r
+  private List<Future<?>> futures;\r
+  // Used to determine if currently running threads should be interrupted\r
+  private boolean interruptOnFailure;\r
+\r
+  public WorkflowTask(String executionId, int threads, boolean interruptOnFailure) {\r
+    if (threads <= 0 || Strings.isNullOrEmpty(executionId)) {\r
+      this.processInstanceId = null;\r
+      this.pool = null;\r
+      return;\r
+    }\r
+\r
+    ThreadFactory namedThreadFactory =\r
+        new ThreadFactoryBuilder().setNameFormat(executionId + "-%d").build();\r
+\r
+    this.processInstanceId = executionId;\r
+    this.pool =\r
+        threads == 1\r
+            ? Executors.newSingleThreadExecutor()\r
+            : Executors.newFixedThreadPool(threads, namedThreadFactory);\r
+    this.futures = Collections.synchronizedList(new ArrayList<>());\r
+    this.interruptOnFailure = interruptOnFailure;\r
+\r
+    synchronized (WorkflowTask.workflowTasksByExecutionId) {\r
+      if (!WorkflowTask.workflowTasksByExecutionId.containsKey(this.processInstanceId)) {\r
+        List<WorkflowTask> list = new ArrayList<>();\r
+        list.add(this);\r
+        WorkflowTask.workflowTasksByExecutionId.put(\r
+            this.processInstanceId, Collections.synchronizedList(list));\r
+      } else {\r
+        WorkflowTask.workflowTasksByExecutionId.get(this.processInstanceId).add(this);\r
+      }\r
+    }\r
+  }\r
+\r
+  public void shutdown() {\r
+    this.shutdown(this.interruptOnFailure);\r
+  }\r
+\r
+  public void shutdown(boolean interruptOnFailure) {\r
+    if (interruptOnFailure) {\r
+      // Cancel currently executing tasks, and halt any waiting tasks.\r
+      pool.shutdownNow();\r
+    } else {\r
+      // Disable new tasks from being submitted, while allowing currently executing tasks to finish.\r
+      pool.shutdown();\r
+    }\r
+\r
+    try {\r
+      // Wait a while for existing tasks to terminate\r
+      if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {\r
+        for (Future<?> f : futures) {\r
+          f.cancel(interruptOnFailure);\r
+        }\r
+\r
+        // Wait a while for tasks to respond to being cancelled\r
+        if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {\r
+          System.err.println("Pool did not terminate");\r
+        }\r
+      }\r
+    } catch (InterruptedException ie) {\r
+      // (Re-)Cancel if current thread also interrupted\r
+      pool.shutdownNow();\r
+      // Preserve interrupt status\r
+      // Thread.currentThread().interrupt();\r
+    }\r
+\r
+    workflowTasksByExecutionId.remove(this.processInstanceId);\r
+  }\r
+\r
+  public String getProcessInstanceId() {\r
+    return processInstanceId;\r
+  }\r
+\r
+  public ExecutorService getPool() {\r
+    return pool;\r
+  }\r
+\r
+  public List<Future<?>> getFutures() {\r
+    return futures;\r
+  }\r
+\r
+  public void setFutures(List<Future<?>> futures) {\r
+    this.futures = futures;\r
+  }\r
+\r
+  public static void printWorkflowTaskResources() {\r
+    for (Map.Entry<String, List<WorkflowTask>> entry : workflowTasksByExecutionId.entrySet()) {\r
+      logger.info(\r
+          "{}--------------Parent processInstanceId: {}--------------", logPrefix, entry.getKey());\r
+\r
+      List<WorkflowTask> workflowTasks =\r
+          workflowTasksByExecutionId.getOrDefault(entry.getKey(), null);\r
+      for (WorkflowTask task : workflowTasks) {\r
+        task.print();\r
+      }\r
+    }\r
+  }\r
+\r
+  public static void printThreadInformation() {\r
+    Set<Thread> threadSet = Thread.getAllStackTraces().keySet();\r
+    for (Thread t : threadSet) {\r
+      if (t.getThreadGroup() == Thread.currentThread().getThreadGroup()) {\r
+        logger.info("{}{}", logPrefix, t.toString());\r
+      }\r
+    }\r
+  }\r
+\r
+  private void print() {\r
+    logger.info("%sWorkflowTask processInstanceId{})", this.processInstanceId);\r
+    if (this.pool instanceof ThreadPoolExecutor) {\r
+      ThreadPoolExecutor tpe = (ThreadPoolExecutor) pool;\r
+\r
+      logger.info("\tActive count: {}.", tpe.getActiveCount());\r
+      logger.info("\tTask status: {}/{}.", tpe.getCompletedTaskCount(), tpe.getTaskCount());\r
+      logger.info("\tPool size: {}.", tpe.getPoolSize());\r
+      logger.info("\tCore pool size: {}.", tpe.getCorePoolSize());\r
+      logger.info("\tQueue size: {}.", tpe.getQueue().size());\r
+    }\r
+  }\r
+}\r