1 /* Copyright (c) 2019 AT&T Intellectual Property. #
\r
3 # Licensed under the Apache License, Version 2.0 (the "License"); #
\r
4 # you may not use this file except in compliance with the License. #
\r
5 # You may obtain a copy of the License at #
\r
7 # http://www.apache.org/licenses/LICENSE-2.0 #
\r
9 # Unless required by applicable law or agreed to in writing, software #
\r
10 # distributed under the License is distributed on an "AS IS" BASIS, #
\r
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
\r
12 # See the License for the specific language governing permissions and #
\r
13 # limitations under the License. #
\r
14 ##############################################################################*/
\r
17 package org.oran.otf.camunda.delegate.otf.common;
\r
19 import org.oran.otf.camunda.delegate.otf.common.runnable.SynchronousTestInstanceCallable;
\r
20 import org.oran.otf.camunda.exception.TestExecutionException;
\r
21 import org.oran.otf.camunda.workflow.WorkflowProcessor;
\r
22 import org.oran.otf.camunda.workflow.WorkflowRequest;
\r
23 import org.oran.otf.camunda.workflow.utility.WorkflowTask;
\r
24 import org.oran.otf.camunda.workflow.utility.WorkflowUtility;
\r
25 import org.oran.otf.common.model.TestExecution;
\r
26 import org.oran.otf.common.model.local.ParallelFlowInput;
\r
27 import org.oran.otf.common.repository.TestExecutionRepository;
\r
28 import org.oran.otf.common.utility.Utility;
\r
29 import java.util.ArrayList;
\r
30 import java.util.Collections;
\r
31 import java.util.List;
\r
32 import java.util.Map;
\r
33 import java.util.concurrent.ExecutorService;
\r
34 import java.util.concurrent.TimeUnit;
\r
35 import org.camunda.bpm.engine.delegate.DelegateExecution;
\r
36 import org.camunda.bpm.engine.delegate.JavaDelegate;
\r
37 import org.oran.otf.camunda.model.ExecutionConstants;
\r
38 import org.slf4j.Logger;
\r
39 import org.slf4j.LoggerFactory;
\r
40 import org.springframework.beans.factory.annotation.Autowired;
\r
41 import org.springframework.data.mongodb.core.MongoTemplate;
\r
42 import org.springframework.stereotype.Component;
\r
45 public class RunTestInstanceDelegate implements JavaDelegate {
\r
47 private final String logPrefix = Utility.getLoggerPrefix();
\r
48 private final Logger logger = LoggerFactory.getLogger(RunTestInstanceDelegate.class);
\r
49 // Used to retrieve the results from test head runnables.
\r
50 private final List<TestExecution> testExecutions =
\r
51 Collections.synchronizedList(new ArrayList<>());
\r
54 WorkflowUtility utility;
\r
56 TestExecutionRepository testExecutionRepository;
\r
58 WorkflowProcessor processor;
\r
59 private @Autowired MongoTemplate mongoOperation;
\r
62 public void execute(DelegateExecution execution) throws Exception {
\r
64 execution.getCurrentActivityId(),
\r
65 execution.getProcessInstanceId(),
\r
66 execution.getVariables());
\r
69 public void runTestInstance(
\r
70 String currentActivityId, String processInstanceId, Map<String, Object> variables)
\r
72 @SuppressWarnings("unchecked")
\r
74 // Get the current test execution object to pass as an argument to the callable, and for data
\r
75 // stored in the historicTestInstance
\r
76 TestExecution testExecution = utility.getTestExecution(variables, logPrefix);
\r
78 // Get the parallel flow input
\r
79 Map<String, ParallelFlowInput> pfloInput =
\r
80 (Map<String, ParallelFlowInput>) variables.get("pfloInput");
\r
82 if (!pfloInput.containsKey(currentActivityId)) {
\r
83 throw new TestExecutionException(
\r
85 "%sCould not find activityId %s in pfloInput.", logPrefix, currentActivityId));
\r
88 ParallelFlowInput parallelFlowInput = pfloInput.get(currentActivityId);
\r
89 List<WorkflowRequest> args = parallelFlowInput.getArgs();
\r
90 boolean interruptOnFailure = parallelFlowInput.isInterruptOnFailure();
\r
91 int maxFailures = parallelFlowInput.getMaxFailures();
\r
92 int threadPoolSize = parallelFlowInput.getThreadPoolSize();
\r
94 WorkflowTask workflowTask =
\r
95 new WorkflowTask(processInstanceId, threadPoolSize, interruptOnFailure);
\r
96 ExecutorService pool = workflowTask.getPool();
\r
98 // logger.info("{}(BEFORE) PRINTING THREAD INFORMATION", logPrefix);
\r
99 // WorkflowTask.printThreadInformation();
\r
100 // logger.info("{}(BEFORE) PRINTING WORKFLOW TASKS", logPrefix);
\r
101 // WorkflowTask.printWorkflowTaskResources();
\r
103 for (WorkflowRequest request : args) {
\r
104 request.setExecutorId(testExecution.getExecutorId());
\r
105 // If an inner workflow calls the parent workflow, there is a cyclic dependency. To prevent
\r
106 // infinite test instances from being spawned, we need to check for cycles. This is only a top
\r
107 // level check, but a more thorough check needs to be implemented after 1906.
\r
108 if (request.getTestInstanceId() == testExecution.getHistoricTestInstance().get_id()) {
\r
109 // Prevent new tasks from being submitted
\r
111 // Shutdown the thread pool, and cleanup threads.
\r
112 workflowTask.shutdown(true);
\r
116 SynchronousTestInstanceCallable synchronousTestInstanceCallable =
\r
117 new SynchronousTestInstanceCallable(
\r
118 request, testExecution, testExecutionRepository, processor, mongoOperation);
\r
119 workflowTask.getFutures().add(pool.submit(synchronousTestInstanceCallable));
\r
122 // Prevent new tasks from being submitted, and allow running tasks to finish.
\r
125 // Wait for test instances to finish execution, and check for failures.
\r
126 while (!pool.isTerminated()) {
\r
128 // Terminate tasks if the test execution failure limit is reached.
\r
130 synchronized (testExecution) {
\r
131 numFailures = getNumberOfFailures(testExecution.getTestInstanceResults());
\r
134 if (numFailures > maxFailures) {
\r
137 "[PARENT-%s] Shutting down workflow - numFailures: %s, maxFailures: %s.",
\r
138 processInstanceId, numFailures, maxFailures));
\r
139 workflowTask.shutdown();
\r
142 pool.awaitTermination(1, TimeUnit.SECONDS);
\r
143 } catch (InterruptedException e) {
\r
148 workflowTask.shutdown(false);
\r
150 // logger.info("{}(AFTER) PRINTING THREAD INFORMATION", logPrefix);
\r
151 // WorkflowTask.printThreadInformation();
\r
152 // logger.info("{}(AFTER) PRINTING WORKFLOW TASKS", logPrefix);
\r
153 // WorkflowTask.printWorkflowTaskResources();
\r
156 // Gets the total number of testExecutions that have failed.
\r
157 private int getNumberOfFailures(List<TestExecution> testExecutions) {
\r
158 int numFailures = 0;
\r
160 for (TestExecution testExecution : testExecutions) {
\r
161 if (isTestFailed(testExecution)) {
\r
166 return numFailures;
\r
169 // Checks if the testResult is marked as FAILED or FAILURE.
\r
170 private boolean isTestFailed(TestExecution testExecution) {
\r
171 String testResult = testExecution.getTestResult();
\r
174 "[PARENT-%s] Test result for inner execution: %s.",
\r
175 testExecution.getProcessInstanceId(), testExecution.getTestResult()));
\r
176 return testResult.equalsIgnoreCase(ExecutionConstants.TestResult.FAILED)
\r
177 // || testResult.equalsIgnoreCase(TestResult.FAILED)
\r
178 || testResult.equalsIgnoreCase(ExecutionConstants.TestResult.TERMINATED);
\r