added svcapi ui and camunda code
[it/otf.git] / otf-camunda / src / main / java / org / oran / otf / camunda / delegate / otf / common / RunTestInstanceDelegate.java
1 /*  Copyright (c) 2019 AT&T Intellectual Property.                             #\r
2 #                                                                              #\r
3 #   Licensed under the Apache License, Version 2.0 (the "License");            #\r
4 #   you may not use this file except in compliance with the License.           #\r
5 #   You may obtain a copy of the License at                                    #\r
6 #                                                                              #\r
7 #       http://www.apache.org/licenses/LICENSE-2.0                             #\r
8 #                                                                              #\r
9 #   Unless required by applicable law or agreed to in writing, software        #\r
10 #   distributed under the License is distributed on an "AS IS" BASIS,          #\r
11 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #\r
12 #   See the License for the specific language governing permissions and        #\r
13 #   limitations under the License.                                             #\r
14 ##############################################################################*/\r
15 \r
16 \r
17 package org.oran.otf.camunda.delegate.otf.common;\r
18 \r
19 import org.oran.otf.camunda.delegate.otf.common.runnable.SynchronousTestInstanceCallable;\r
20 import org.oran.otf.camunda.exception.TestExecutionException;\r
21 import org.oran.otf.camunda.workflow.WorkflowProcessor;\r
22 import org.oran.otf.camunda.workflow.WorkflowRequest;\r
23 import org.oran.otf.camunda.workflow.utility.WorkflowTask;\r
24 import org.oran.otf.camunda.workflow.utility.WorkflowUtility;\r
25 import org.oran.otf.common.model.TestExecution;\r
26 import org.oran.otf.common.model.local.ParallelFlowInput;\r
27 import org.oran.otf.common.repository.TestExecutionRepository;\r
28 import org.oran.otf.common.utility.Utility;\r
29 import java.util.ArrayList;\r
30 import java.util.Collections;\r
31 import java.util.List;\r
32 import java.util.Map;\r
33 import java.util.concurrent.ExecutorService;\r
34 import java.util.concurrent.TimeUnit;\r
35 import org.camunda.bpm.engine.delegate.DelegateExecution;\r
36 import org.camunda.bpm.engine.delegate.JavaDelegate;\r
37 import org.oran.otf.camunda.model.ExecutionConstants;\r
38 import org.slf4j.Logger;\r
39 import org.slf4j.LoggerFactory;\r
40 import org.springframework.beans.factory.annotation.Autowired;\r
41 import org.springframework.data.mongodb.core.MongoTemplate;\r
42 import org.springframework.stereotype.Component;\r
43 \r
44 @Component\r
45 public class RunTestInstanceDelegate implements JavaDelegate {\r
46 \r
47   private final String logPrefix = Utility.getLoggerPrefix();\r
48   private final Logger logger = LoggerFactory.getLogger(RunTestInstanceDelegate.class);\r
49   // Used to retrieve the results from test head runnables.\r
50   private final List<TestExecution> testExecutions =\r
51       Collections.synchronizedList(new ArrayList<>());\r
52 \r
53   private @Autowired\r
54   WorkflowUtility utility;\r
55   private @Autowired\r
56   TestExecutionRepository testExecutionRepository;\r
57   private @Autowired\r
58   WorkflowProcessor processor;\r
59   private @Autowired MongoTemplate mongoOperation;\r
60 \r
61   @Override\r
62   public void execute(DelegateExecution execution) throws Exception {\r
63     runTestInstance(\r
64         execution.getCurrentActivityId(),\r
65         execution.getProcessInstanceId(),\r
66         execution.getVariables());\r
67   }\r
68 \r
69   public void runTestInstance(\r
70       String currentActivityId, String processInstanceId, Map<String, Object> variables)\r
71       throws Exception {\r
72     @SuppressWarnings("unchecked")\r
73 \r
74     // Get the current test execution object to pass as an argument to the callable, and for data\r
75     // stored in the historicTestInstance\r
76     TestExecution testExecution = utility.getTestExecution(variables, logPrefix);\r
77 \r
78     // Get the parallel flow input\r
79     Map<String, ParallelFlowInput> pfloInput =\r
80         (Map<String, ParallelFlowInput>) variables.get("pfloInput");\r
81 \r
82     if (!pfloInput.containsKey(currentActivityId)) {\r
83       throw new TestExecutionException(\r
84           String.format(\r
85               "%sCould not find activityId %s in pfloInput.", logPrefix, currentActivityId));\r
86     }\r
87 \r
88     ParallelFlowInput parallelFlowInput = pfloInput.get(currentActivityId);\r
89     List<WorkflowRequest> args = parallelFlowInput.getArgs();\r
90     boolean interruptOnFailure = parallelFlowInput.isInterruptOnFailure();\r
91     int maxFailures = parallelFlowInput.getMaxFailures();\r
92     int threadPoolSize = parallelFlowInput.getThreadPoolSize();\r
93 \r
94     WorkflowTask workflowTask =\r
95         new WorkflowTask(processInstanceId, threadPoolSize, interruptOnFailure);\r
96     ExecutorService pool = workflowTask.getPool();\r
97 \r
98 //    logger.info("{}(BEFORE) PRINTING THREAD INFORMATION", logPrefix);\r
99 //    WorkflowTask.printThreadInformation();\r
100 //    logger.info("{}(BEFORE) PRINTING WORKFLOW TASKS", logPrefix);\r
101 //    WorkflowTask.printWorkflowTaskResources();\r
102 \r
103     for (WorkflowRequest request : args) {\r
104       request.setExecutorId(testExecution.getExecutorId());\r
105       // If an inner workflow calls the parent workflow, there is a cyclic dependency. To prevent\r
106       // infinite test instances from being spawned, we need to check for cycles. This is only a top\r
107       // level check, but a more thorough check needs to be implemented after 1906.\r
108       if (request.getTestInstanceId() == testExecution.getHistoricTestInstance().get_id()) {\r
109         // Prevent new tasks from being submitted\r
110         pool.shutdown();\r
111         // Shutdown the thread pool, and cleanup threads.\r
112         workflowTask.shutdown(true);\r
113         break;\r
114       }\r
115 \r
116       SynchronousTestInstanceCallable synchronousTestInstanceCallable =\r
117           new SynchronousTestInstanceCallable(\r
118               request, testExecution, testExecutionRepository, processor, mongoOperation);\r
119       workflowTask.getFutures().add(pool.submit(synchronousTestInstanceCallable));\r
120     }\r
121 \r
122     // Prevent new tasks from being submitted, and allow running tasks to finish.\r
123     pool.shutdown();\r
124 \r
125     // Wait for test instances to finish execution, and check for failures.\r
126     while (!pool.isTerminated()) {\r
127       try {\r
128         // Terminate tasks if the test execution failure limit is reached.\r
129         int numFailures;\r
130         synchronized (testExecution) {\r
131           numFailures = getNumberOfFailures(testExecution.getTestInstanceResults());\r
132         }\r
133 \r
134         if (numFailures > maxFailures) {\r
135           logger.error(\r
136               String.format(\r
137                   "[PARENT-%s] Shutting down workflow - numFailures: %s, maxFailures: %s.",\r
138                   processInstanceId, numFailures, maxFailures));\r
139           workflowTask.shutdown();\r
140         }\r
141 \r
142         pool.awaitTermination(1, TimeUnit.SECONDS);\r
143       } catch (InterruptedException e) {\r
144         throw e;\r
145       }\r
146     }\r
147 \r
148     workflowTask.shutdown(false);\r
149 \r
150 //    logger.info("{}(AFTER) PRINTING THREAD INFORMATION", logPrefix);\r
151 //    WorkflowTask.printThreadInformation();\r
152 //    logger.info("{}(AFTER) PRINTING WORKFLOW TASKS", logPrefix);\r
153 //    WorkflowTask.printWorkflowTaskResources();\r
154   }\r
155 \r
156   // Gets the total number of testExecutions that have failed.\r
157   private int getNumberOfFailures(List<TestExecution> testExecutions) {\r
158     int numFailures = 0;\r
159 \r
160     for (TestExecution testExecution : testExecutions) {\r
161       if (isTestFailed(testExecution)) {\r
162         numFailures++;\r
163       }\r
164     }\r
165 \r
166     return numFailures;\r
167   }\r
168 \r
169   // Checks if the testResult is marked as FAILED or FAILURE.\r
170   private boolean isTestFailed(TestExecution testExecution) {\r
171     String testResult = testExecution.getTestResult();\r
172     logger.debug(\r
173         String.format(\r
174             "[PARENT-%s] Test result for inner execution: %s.",\r
175             testExecution.getProcessInstanceId(), testExecution.getTestResult()));\r
176     return testResult.equalsIgnoreCase(ExecutionConstants.TestResult.FAILED)\r
177 //        || testResult.equalsIgnoreCase(TestResult.FAILED)\r
178         || testResult.equalsIgnoreCase(ExecutionConstants.TestResult.TERMINATED);\r
179   }\r
180 }\r