1 /* Copyright (c) 2019 AT&T Intellectual Property. #
\r
3 # Licensed under the Apache License, Version 2.0 (the "License"); #
\r
4 # you may not use this file except in compliance with the License. #
\r
5 # You may obtain a copy of the License at #
\r
7 # http://www.apache.org/licenses/LICENSE-2.0 #
\r
9 # Unless required by applicable law or agreed to in writing, software #
\r
10 # distributed under the License is distributed on an "AS IS" BASIS, #
\r
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
\r
12 # See the License for the specific language governing permissions and #
\r
13 # limitations under the License. #
\r
14 ##############################################################################*/
\r
17 package org.oran.otf.camunda.workflow.utility;
\r
19 import org.oran.otf.common.utility.Utility;
\r
20 import com.google.common.base.Strings;
\r
21 import com.google.common.util.concurrent.ThreadFactoryBuilder;
\r
22 import java.util.ArrayList;
\r
23 import java.util.Collections;
\r
24 import java.util.List;
\r
25 import java.util.Map;
\r
26 import java.util.Set;
\r
27 import java.util.concurrent.ConcurrentHashMap;
\r
28 import java.util.concurrent.ExecutorService;
\r
29 import java.util.concurrent.Executors;
\r
30 import java.util.concurrent.Future;
\r
31 import java.util.concurrent.ThreadFactory;
\r
32 import java.util.concurrent.ThreadPoolExecutor;
\r
33 import java.util.concurrent.TimeUnit;
\r
35 import org.slf4j.Logger;
\r
36 import org.slf4j.LoggerFactory;
\r
38 public class WorkflowTask {
\r
40 private static final Logger logger = LoggerFactory.getLogger(WorkflowTask.class);
\r
41 private static final String logPrefix = Utility.getLoggerPrefix();
\r
43 public static Map<String, List<WorkflowTask>> workflowTasksByExecutionId =
\r
44 new ConcurrentHashMap<>();
\r
45 // The processInstanceId of the Camunda process instance the thread pool is created under.
\r
46 private final String processInstanceId;
\r
47 // The pool service used to create the fixed thread pool.
\r
48 private final ExecutorService pool;
\r
49 // Used to keep track of all the tasks to be executed, which allows tasks to easily be deleted.
\r
50 private List<Future<?>> futures;
\r
51 // Used to determine if currently running threads should be interrupted
\r
52 private boolean interruptOnFailure;
\r
54 public WorkflowTask(String executionId, int threads, boolean interruptOnFailure) {
\r
55 if (threads <= 0 || Strings.isNullOrEmpty(executionId)) {
\r
56 this.processInstanceId = null;
\r
61 ThreadFactory namedThreadFactory =
\r
62 new ThreadFactoryBuilder().setNameFormat(executionId + "-%d").build();
\r
64 this.processInstanceId = executionId;
\r
67 ? Executors.newSingleThreadExecutor()
\r
68 : Executors.newFixedThreadPool(threads, namedThreadFactory);
\r
69 this.futures = Collections.synchronizedList(new ArrayList<>());
\r
70 this.interruptOnFailure = interruptOnFailure;
\r
72 synchronized (WorkflowTask.workflowTasksByExecutionId) {
\r
73 if (!WorkflowTask.workflowTasksByExecutionId.containsKey(this.processInstanceId)) {
\r
74 List<WorkflowTask> list = new ArrayList<>();
\r
76 WorkflowTask.workflowTasksByExecutionId.put(
\r
77 this.processInstanceId, Collections.synchronizedList(list));
\r
79 WorkflowTask.workflowTasksByExecutionId.get(this.processInstanceId).add(this);
\r
84 public void shutdown() {
\r
85 this.shutdown(this.interruptOnFailure);
\r
88 public void shutdown(boolean interruptOnFailure) {
\r
89 if (interruptOnFailure) {
\r
90 // Cancel currently executing tasks, and halt any waiting tasks.
\r
93 // Disable new tasks from being submitted, while allowing currently executing tasks to finish.
\r
98 // Wait a while for existing tasks to terminate
\r
99 if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
\r
100 for (Future<?> f : futures) {
\r
101 f.cancel(interruptOnFailure);
\r
104 // Wait a while for tasks to respond to being cancelled
\r
105 if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
\r
106 System.err.println("Pool did not terminate");
\r
109 } catch (InterruptedException ie) {
\r
110 // (Re-)Cancel if current thread also interrupted
\r
111 pool.shutdownNow();
\r
112 // Preserve interrupt status
\r
113 // Thread.currentThread().interrupt();
\r
116 workflowTasksByExecutionId.remove(this.processInstanceId);
\r
119 public String getProcessInstanceId() {
\r
120 return processInstanceId;
\r
123 public ExecutorService getPool() {
\r
127 public List<Future<?>> getFutures() {
\r
131 public void setFutures(List<Future<?>> futures) {
\r
132 this.futures = futures;
\r
135 public static void printWorkflowTaskResources() {
\r
136 for (Map.Entry<String, List<WorkflowTask>> entry : workflowTasksByExecutionId.entrySet()) {
\r
138 "{}--------------Parent processInstanceId: {}--------------", logPrefix, entry.getKey());
\r
140 List<WorkflowTask> workflowTasks =
\r
141 workflowTasksByExecutionId.getOrDefault(entry.getKey(), null);
\r
142 for (WorkflowTask task : workflowTasks) {
\r
148 public static void printThreadInformation() {
\r
149 Set<Thread> threadSet = Thread.getAllStackTraces().keySet();
\r
150 for (Thread t : threadSet) {
\r
151 if (t.getThreadGroup() == Thread.currentThread().getThreadGroup()) {
\r
152 logger.info("{}{}", logPrefix, t.toString());
\r
157 private void print() {
\r
158 logger.info("%sWorkflowTask processInstanceId{})", this.processInstanceId);
\r
159 if (this.pool instanceof ThreadPoolExecutor) {
\r
160 ThreadPoolExecutor tpe = (ThreadPoolExecutor) pool;
\r
162 logger.info("\tActive count: {}.", tpe.getActiveCount());
\r
163 logger.info("\tTask status: {}/{}.", tpe.getCompletedTaskCount(), tpe.getTaskCount());
\r
164 logger.info("\tPool size: {}.", tpe.getPoolSize());
\r
165 logger.info("\tCore pool size: {}.", tpe.getCorePoolSize());
\r
166 logger.info("\tQueue size: {}.", tpe.getQueue().size());
\r