--- /dev/null
+/* Copyright (c) 2019 AT&T Intellectual Property. #\r
+# #\r
+# Licensed under the Apache License, Version 2.0 (the "License"); #\r
+# you may not use this file except in compliance with the License. #\r
+# You may obtain a copy of the License at #\r
+# #\r
+# http://www.apache.org/licenses/LICENSE-2.0 #\r
+# #\r
+# Unless required by applicable law or agreed to in writing, software #\r
+# distributed under the License is distributed on an "AS IS" BASIS, #\r
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #\r
+# See the License for the specific language governing permissions and #\r
+# limitations under the License. #\r
+##############################################################################*/\r
+\r
+\r
+package org.oran.otf.camunda.workflow.utility;\r
+\r
+import org.oran.otf.common.utility.Utility;\r
+import com.google.common.base.Strings;\r
+import com.google.common.util.concurrent.ThreadFactoryBuilder;\r
+import java.util.ArrayList;\r
+import java.util.Collections;\r
+import java.util.List;\r
+import java.util.Map;\r
+import java.util.Set;\r
+import java.util.concurrent.ConcurrentHashMap;\r
+import java.util.concurrent.ExecutorService;\r
+import java.util.concurrent.Executors;\r
+import java.util.concurrent.Future;\r
+import java.util.concurrent.ThreadFactory;\r
+import java.util.concurrent.ThreadPoolExecutor;\r
+import java.util.concurrent.TimeUnit;\r
+\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+\r
+public class WorkflowTask {\r
+\r
+ private static final Logger logger = LoggerFactory.getLogger(WorkflowTask.class);\r
+ private static final String logPrefix = Utility.getLoggerPrefix();\r
+\r
+ public static Map<String, List<WorkflowTask>> workflowTasksByExecutionId =\r
+ new ConcurrentHashMap<>();\r
+ // The processInstanceId of the Camunda process instance the thread pool is created under.\r
+ private final String processInstanceId;\r
+ // The pool service used to create the fixed thread pool.\r
+ private final ExecutorService pool;\r
+ // Used to keep track of all the tasks to be executed, which allows tasks to easily be deleted.\r
+ private List<Future<?>> futures;\r
+ // Used to determine if currently running threads should be interrupted\r
+ private boolean interruptOnFailure;\r
+\r
+ public WorkflowTask(String executionId, int threads, boolean interruptOnFailure) {\r
+ if (threads <= 0 || Strings.isNullOrEmpty(executionId)) {\r
+ this.processInstanceId = null;\r
+ this.pool = null;\r
+ return;\r
+ }\r
+\r
+ ThreadFactory namedThreadFactory =\r
+ new ThreadFactoryBuilder().setNameFormat(executionId + "-%d").build();\r
+\r
+ this.processInstanceId = executionId;\r
+ this.pool =\r
+ threads == 1\r
+ ? Executors.newSingleThreadExecutor()\r
+ : Executors.newFixedThreadPool(threads, namedThreadFactory);\r
+ this.futures = Collections.synchronizedList(new ArrayList<>());\r
+ this.interruptOnFailure = interruptOnFailure;\r
+\r
+ synchronized (WorkflowTask.workflowTasksByExecutionId) {\r
+ if (!WorkflowTask.workflowTasksByExecutionId.containsKey(this.processInstanceId)) {\r
+ List<WorkflowTask> list = new ArrayList<>();\r
+ list.add(this);\r
+ WorkflowTask.workflowTasksByExecutionId.put(\r
+ this.processInstanceId, Collections.synchronizedList(list));\r
+ } else {\r
+ WorkflowTask.workflowTasksByExecutionId.get(this.processInstanceId).add(this);\r
+ }\r
+ }\r
+ }\r
+\r
+ public void shutdown() {\r
+ this.shutdown(this.interruptOnFailure);\r
+ }\r
+\r
+ public void shutdown(boolean interruptOnFailure) {\r
+ if (interruptOnFailure) {\r
+ // Cancel currently executing tasks, and halt any waiting tasks.\r
+ pool.shutdownNow();\r
+ } else {\r
+ // Disable new tasks from being submitted, while allowing currently executing tasks to finish.\r
+ pool.shutdown();\r
+ }\r
+\r
+ try {\r
+ // Wait a while for existing tasks to terminate\r
+ if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {\r
+ for (Future<?> f : futures) {\r
+ f.cancel(interruptOnFailure);\r
+ }\r
+\r
+ // Wait a while for tasks to respond to being cancelled\r
+ if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {\r
+ System.err.println("Pool did not terminate");\r
+ }\r
+ }\r
+ } catch (InterruptedException ie) {\r
+ // (Re-)Cancel if current thread also interrupted\r
+ pool.shutdownNow();\r
+ // Preserve interrupt status\r
+ // Thread.currentThread().interrupt();\r
+ }\r
+\r
+ workflowTasksByExecutionId.remove(this.processInstanceId);\r
+ }\r
+\r
+ public String getProcessInstanceId() {\r
+ return processInstanceId;\r
+ }\r
+\r
+ public ExecutorService getPool() {\r
+ return pool;\r
+ }\r
+\r
+ public List<Future<?>> getFutures() {\r
+ return futures;\r
+ }\r
+\r
+ public void setFutures(List<Future<?>> futures) {\r
+ this.futures = futures;\r
+ }\r
+\r
+ public static void printWorkflowTaskResources() {\r
+ for (Map.Entry<String, List<WorkflowTask>> entry : workflowTasksByExecutionId.entrySet()) {\r
+ logger.info(\r
+ "{}--------------Parent processInstanceId: {}--------------", logPrefix, entry.getKey());\r
+\r
+ List<WorkflowTask> workflowTasks =\r
+ workflowTasksByExecutionId.getOrDefault(entry.getKey(), null);\r
+ for (WorkflowTask task : workflowTasks) {\r
+ task.print();\r
+ }\r
+ }\r
+ }\r
+\r
+ public static void printThreadInformation() {\r
+ Set<Thread> threadSet = Thread.getAllStackTraces().keySet();\r
+ for (Thread t : threadSet) {\r
+ if (t.getThreadGroup() == Thread.currentThread().getThreadGroup()) {\r
+ logger.info("{}{}", logPrefix, t.toString());\r
+ }\r
+ }\r
+ }\r
+\r
+ private void print() {\r
+ logger.info("%sWorkflowTask processInstanceId{})", this.processInstanceId);\r
+ if (this.pool instanceof ThreadPoolExecutor) {\r
+ ThreadPoolExecutor tpe = (ThreadPoolExecutor) pool;\r
+\r
+ logger.info("\tActive count: {}.", tpe.getActiveCount());\r
+ logger.info("\tTask status: {}/{}.", tpe.getCompletedTaskCount(), tpe.getTaskCount());\r
+ logger.info("\tPool size: {}.", tpe.getPoolSize());\r
+ logger.info("\tCore pool size: {}.", tpe.getCorePoolSize());\r
+ logger.info("\tQueue size: {}.", tpe.getQueue().size());\r
+ }\r
+ }\r
+}\r