X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?p=it%2Fotf.git;a=blobdiff_plain;f=otf-camunda%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fotf%2Fcamunda%2Fworkflow%2Futility%2FWorkflowTask.java;fp=otf-camunda%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fotf%2Fcamunda%2Fworkflow%2Futility%2FWorkflowTask.java;h=e7302e6c1553f6e1a85bd4a4129482f1bbfa7efc;hp=0000000000000000000000000000000000000000;hb=14f6f95c84a4a1fa8774190db4a03fd0214ec55f;hpb=f49bd1efeaaddd4891c1f329b18d8cfb28b3e75b diff --git a/otf-camunda/src/main/java/org/oran/otf/camunda/workflow/utility/WorkflowTask.java b/otf-camunda/src/main/java/org/oran/otf/camunda/workflow/utility/WorkflowTask.java new file mode 100644 index 0000000..e7302e6 --- /dev/null +++ b/otf-camunda/src/main/java/org/oran/otf/camunda/workflow/utility/WorkflowTask.java @@ -0,0 +1,169 @@ +/* Copyright (c) 2019 AT&T Intellectual Property. # +# # +# Licensed under the Apache License, Version 2.0 (the "License"); # +# you may not use this file except in compliance with the License. # +# You may obtain a copy of the License at # +# # +# http://www.apache.org/licenses/LICENSE-2.0 # +# # +# Unless required by applicable law or agreed to in writing, software # +# distributed under the License is distributed on an "AS IS" BASIS, # +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # +# See the License for the specific language governing permissions and # +# limitations under the License. # +##############################################################################*/ + + +package org.oran.otf.camunda.workflow.utility; + +import org.oran.otf.common.utility.Utility; +import com.google.common.base.Strings; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class WorkflowTask { + + private static final Logger logger = LoggerFactory.getLogger(WorkflowTask.class); + private static final String logPrefix = Utility.getLoggerPrefix(); + + public static Map> workflowTasksByExecutionId = + new ConcurrentHashMap<>(); + // The processInstanceId of the Camunda process instance the thread pool is created under. + private final String processInstanceId; + // The pool service used to create the fixed thread pool. + private final ExecutorService pool; + // Used to keep track of all the tasks to be executed, which allows tasks to easily be deleted. + private List> futures; + // Used to determine if currently running threads should be interrupted + private boolean interruptOnFailure; + + public WorkflowTask(String executionId, int threads, boolean interruptOnFailure) { + if (threads <= 0 || Strings.isNullOrEmpty(executionId)) { + this.processInstanceId = null; + this.pool = null; + return; + } + + ThreadFactory namedThreadFactory = + new ThreadFactoryBuilder().setNameFormat(executionId + "-%d").build(); + + this.processInstanceId = executionId; + this.pool = + threads == 1 + ? Executors.newSingleThreadExecutor() + : Executors.newFixedThreadPool(threads, namedThreadFactory); + this.futures = Collections.synchronizedList(new ArrayList<>()); + this.interruptOnFailure = interruptOnFailure; + + synchronized (WorkflowTask.workflowTasksByExecutionId) { + if (!WorkflowTask.workflowTasksByExecutionId.containsKey(this.processInstanceId)) { + List list = new ArrayList<>(); + list.add(this); + WorkflowTask.workflowTasksByExecutionId.put( + this.processInstanceId, Collections.synchronizedList(list)); + } else { + WorkflowTask.workflowTasksByExecutionId.get(this.processInstanceId).add(this); + } + } + } + + public void shutdown() { + this.shutdown(this.interruptOnFailure); + } + + public void shutdown(boolean interruptOnFailure) { + if (interruptOnFailure) { + // Cancel currently executing tasks, and halt any waiting tasks. + pool.shutdownNow(); + } else { + // Disable new tasks from being submitted, while allowing currently executing tasks to finish. + pool.shutdown(); + } + + try { + // Wait a while for existing tasks to terminate + if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { + for (Future f : futures) { + f.cancel(interruptOnFailure); + } + + // Wait a while for tasks to respond to being cancelled + if (!pool.awaitTermination(60, TimeUnit.SECONDS)) { + System.err.println("Pool did not terminate"); + } + } + } catch (InterruptedException ie) { + // (Re-)Cancel if current thread also interrupted + pool.shutdownNow(); + // Preserve interrupt status + // Thread.currentThread().interrupt(); + } + + workflowTasksByExecutionId.remove(this.processInstanceId); + } + + public String getProcessInstanceId() { + return processInstanceId; + } + + public ExecutorService getPool() { + return pool; + } + + public List> getFutures() { + return futures; + } + + public void setFutures(List> futures) { + this.futures = futures; + } + + public static void printWorkflowTaskResources() { + for (Map.Entry> entry : workflowTasksByExecutionId.entrySet()) { + logger.info( + "{}--------------Parent processInstanceId: {}--------------", logPrefix, entry.getKey()); + + List workflowTasks = + workflowTasksByExecutionId.getOrDefault(entry.getKey(), null); + for (WorkflowTask task : workflowTasks) { + task.print(); + } + } + } + + public static void printThreadInformation() { + Set threadSet = Thread.getAllStackTraces().keySet(); + for (Thread t : threadSet) { + if (t.getThreadGroup() == Thread.currentThread().getThreadGroup()) { + logger.info("{}{}", logPrefix, t.toString()); + } + } + } + + private void print() { + logger.info("%sWorkflowTask processInstanceId{})", this.processInstanceId); + if (this.pool instanceof ThreadPoolExecutor) { + ThreadPoolExecutor tpe = (ThreadPoolExecutor) pool; + + logger.info("\tActive count: {}.", tpe.getActiveCount()); + logger.info("\tTask status: {}/{}.", tpe.getCompletedTaskCount(), tpe.getTaskCount()); + logger.info("\tPool size: {}.", tpe.getPoolSize()); + logger.info("\tCore pool size: {}.", tpe.getCorePoolSize()); + logger.info("\tQueue size: {}.", tpe.getQueue().size()); + } + } +}