--- /dev/null
+/* Copyright (c) 2019 AT&T Intellectual Property. #\r
+# #\r
+# Licensed under the Apache License, Version 2.0 (the "License"); #\r
+# you may not use this file except in compliance with the License. #\r
+# You may obtain a copy of the License at #\r
+# #\r
+# http://www.apache.org/licenses/LICENSE-2.0 #\r
+# #\r
+# Unless required by applicable law or agreed to in writing, software #\r
+# distributed under the License is distributed on an "AS IS" BASIS, #\r
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #\r
+# See the License for the specific language governing permissions and #\r
+# limitations under the License. #\r
+##############################################################################*/\r
+\r
+\r
+package org.oran.otf.camunda.delegate.otf.common;\r
+\r
+import org.oran.otf.camunda.delegate.otf.common.runnable.SynchronousTestInstanceCallable;\r
+import org.oran.otf.camunda.exception.TestExecutionException;\r
+import org.oran.otf.camunda.workflow.WorkflowProcessor;\r
+import org.oran.otf.camunda.workflow.WorkflowRequest;\r
+import org.oran.otf.camunda.workflow.utility.WorkflowTask;\r
+import org.oran.otf.camunda.workflow.utility.WorkflowUtility;\r
+import org.oran.otf.common.model.TestExecution;\r
+import org.oran.otf.common.model.local.ParallelFlowInput;\r
+import org.oran.otf.common.repository.TestExecutionRepository;\r
+import org.oran.otf.common.utility.Utility;\r
+import java.util.ArrayList;\r
+import java.util.Collections;\r
+import java.util.List;\r
+import java.util.Map;\r
+import java.util.concurrent.ExecutorService;\r
+import java.util.concurrent.TimeUnit;\r
+import org.camunda.bpm.engine.delegate.DelegateExecution;\r
+import org.camunda.bpm.engine.delegate.JavaDelegate;\r
+import org.oran.otf.camunda.model.ExecutionConstants;\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+import org.springframework.beans.factory.annotation.Autowired;\r
+import org.springframework.data.mongodb.core.MongoTemplate;\r
+import org.springframework.stereotype.Component;\r
+\r
+@Component\r
+public class RunTestInstanceDelegate implements JavaDelegate {\r
+\r
+ private final String logPrefix = Utility.getLoggerPrefix();\r
+ private final Logger logger = LoggerFactory.getLogger(RunTestInstanceDelegate.class);\r
+ // Used to retrieve the results from test head runnables.\r
+ private final List<TestExecution> testExecutions =\r
+ Collections.synchronizedList(new ArrayList<>());\r
+\r
+ private @Autowired\r
+ WorkflowUtility utility;\r
+ private @Autowired\r
+ TestExecutionRepository testExecutionRepository;\r
+ private @Autowired\r
+ WorkflowProcessor processor;\r
+ private @Autowired MongoTemplate mongoOperation;\r
+\r
+ @Override\r
+ public void execute(DelegateExecution execution) throws Exception {\r
+ runTestInstance(\r
+ execution.getCurrentActivityId(),\r
+ execution.getProcessInstanceId(),\r
+ execution.getVariables());\r
+ }\r
+\r
+ public void runTestInstance(\r
+ String currentActivityId, String processInstanceId, Map<String, Object> variables)\r
+ throws Exception {\r
+ @SuppressWarnings("unchecked")\r
+\r
+ // Get the current test execution object to pass as an argument to the callable, and for data\r
+ // stored in the historicTestInstance\r
+ TestExecution testExecution = utility.getTestExecution(variables, logPrefix);\r
+\r
+ // Get the parallel flow input\r
+ Map<String, ParallelFlowInput> pfloInput =\r
+ (Map<String, ParallelFlowInput>) variables.get("pfloInput");\r
+\r
+ if (!pfloInput.containsKey(currentActivityId)) {\r
+ throw new TestExecutionException(\r
+ String.format(\r
+ "%sCould not find activityId %s in pfloInput.", logPrefix, currentActivityId));\r
+ }\r
+\r
+ ParallelFlowInput parallelFlowInput = pfloInput.get(currentActivityId);\r
+ List<WorkflowRequest> args = parallelFlowInput.getArgs();\r
+ boolean interruptOnFailure = parallelFlowInput.isInterruptOnFailure();\r
+ int maxFailures = parallelFlowInput.getMaxFailures();\r
+ int threadPoolSize = parallelFlowInput.getThreadPoolSize();\r
+\r
+ WorkflowTask workflowTask =\r
+ new WorkflowTask(processInstanceId, threadPoolSize, interruptOnFailure);\r
+ ExecutorService pool = workflowTask.getPool();\r
+\r
+// logger.info("{}(BEFORE) PRINTING THREAD INFORMATION", logPrefix);\r
+// WorkflowTask.printThreadInformation();\r
+// logger.info("{}(BEFORE) PRINTING WORKFLOW TASKS", logPrefix);\r
+// WorkflowTask.printWorkflowTaskResources();\r
+\r
+ for (WorkflowRequest request : args) {\r
+ request.setExecutorId(testExecution.getExecutorId());\r
+ // If an inner workflow calls the parent workflow, there is a cyclic dependency. To prevent\r
+ // infinite test instances from being spawned, we need to check for cycles. This is only a top\r
+ // level check, but a more thorough check needs to be implemented after 1906.\r
+ if (request.getTestInstanceId() == testExecution.getHistoricTestInstance().get_id()) {\r
+ // Prevent new tasks from being submitted\r
+ pool.shutdown();\r
+ // Shutdown the thread pool, and cleanup threads.\r
+ workflowTask.shutdown(true);\r
+ break;\r
+ }\r
+\r
+ SynchronousTestInstanceCallable synchronousTestInstanceCallable =\r
+ new SynchronousTestInstanceCallable(\r
+ request, testExecution, testExecutionRepository, processor, mongoOperation);\r
+ workflowTask.getFutures().add(pool.submit(synchronousTestInstanceCallable));\r
+ }\r
+\r
+ // Prevent new tasks from being submitted, and allow running tasks to finish.\r
+ pool.shutdown();\r
+\r
+ // Wait for test instances to finish execution, and check for failures.\r
+ while (!pool.isTerminated()) {\r
+ try {\r
+ // Terminate tasks if the test execution failure limit is reached.\r
+ int numFailures;\r
+ synchronized (testExecution) {\r
+ numFailures = getNumberOfFailures(testExecution.getTestInstanceResults());\r
+ }\r
+\r
+ if (numFailures > maxFailures) {\r
+ logger.error(\r
+ String.format(\r
+ "[PARENT-%s] Shutting down workflow - numFailures: %s, maxFailures: %s.",\r
+ processInstanceId, numFailures, maxFailures));\r
+ workflowTask.shutdown();\r
+ }\r
+\r
+ pool.awaitTermination(1, TimeUnit.SECONDS);\r
+ } catch (InterruptedException e) {\r
+ throw e;\r
+ }\r
+ }\r
+\r
+ workflowTask.shutdown(false);\r
+\r
+// logger.info("{}(AFTER) PRINTING THREAD INFORMATION", logPrefix);\r
+// WorkflowTask.printThreadInformation();\r
+// logger.info("{}(AFTER) PRINTING WORKFLOW TASKS", logPrefix);\r
+// WorkflowTask.printWorkflowTaskResources();\r
+ }\r
+\r
+ // Gets the total number of testExecutions that have failed.\r
+ private int getNumberOfFailures(List<TestExecution> testExecutions) {\r
+ int numFailures = 0;\r
+\r
+ for (TestExecution testExecution : testExecutions) {\r
+ if (isTestFailed(testExecution)) {\r
+ numFailures++;\r
+ }\r
+ }\r
+\r
+ return numFailures;\r
+ }\r
+\r
+ // Checks if the testResult is marked as FAILED or FAILURE.\r
+ private boolean isTestFailed(TestExecution testExecution) {\r
+ String testResult = testExecution.getTestResult();\r
+ logger.debug(\r
+ String.format(\r
+ "[PARENT-%s] Test result for inner execution: %s.",\r
+ testExecution.getProcessInstanceId(), testExecution.getTestResult()));\r
+ return testResult.equalsIgnoreCase(ExecutionConstants.TestResult.FAILED)\r
+// || testResult.equalsIgnoreCase(TestResult.FAILED)\r
+ || testResult.equalsIgnoreCase(ExecutionConstants.TestResult.TERMINATED);\r
+ }\r
+}\r