1 /* Copyright (c) 2019 AT&T Intellectual Property. #
\r
3 # Licensed under the Apache License, Version 2.0 (the "License"); #
\r
4 # you may not use this file except in compliance with the License. #
\r
5 # You may obtain a copy of the License at #
\r
7 # http://www.apache.org/licenses/LICENSE-2.0 #
\r
9 # Unless required by applicable law or agreed to in writing, software #
\r
10 # distributed under the License is distributed on an "AS IS" BASIS, #
\r
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
\r
12 # See the License for the specific language governing permissions and #
\r
13 # limitations under the License. #
\r
14 ##############################################################################*/
\r
17 package org.oran.otf.camunda.delegate.otf.common;
\r
19 import org.oran.otf.camunda.delegate.otf.common.runnable.TestHeadCallable;
\r
20 import org.oran.otf.camunda.exception.TestExecutionException;
\r
21 import org.oran.otf.camunda.workflow.utility.WorkflowTask;
\r
22 import org.oran.otf.camunda.workflow.utility.WorkflowUtility;
\r
23 import org.oran.otf.common.model.*;
\r
24 import org.oran.otf.common.model.local.BpmnInstance;
\r
25 import org.oran.otf.common.model.local.TestHeadNode;
\r
26 import org.oran.otf.common.model.local.TestHeadResult;
\r
27 import org.oran.otf.common.repository.*;
\r
28 import org.oran.otf.common.utility.Utility;
\r
29 import org.oran.otf.common.utility.database.Generic;
\r
30 import org.oran.otf.common.utility.permissions.PermissionChecker;
\r
31 import org.oran.otf.common.utility.permissions.UserPermission;
\r
32 import com.mongodb.client.result.UpdateResult;
\r
33 import java.util.ArrayList;
\r
34 import java.util.Collections;
\r
35 import java.util.HashMap;
\r
36 import java.util.List;
\r
37 import java.util.Map;
\r
38 import java.util.concurrent.ExecutorService;
\r
39 import java.util.concurrent.TimeUnit;
\r
41 import org.camunda.bpm.engine.delegate.DelegateExecution;
\r
42 import org.camunda.bpm.engine.delegate.JavaDelegate;
\r
43 import org.oran.otf.common.model.*;
\r
44 import org.oran.otf.common.repository.*;
\r
45 import org.slf4j.Logger;
\r
46 import org.slf4j.LoggerFactory;
\r
47 import org.springframework.beans.factory.annotation.Autowired;
\r
48 import org.springframework.data.mongodb.core.MongoTemplate;
\r
49 import org.springframework.data.mongodb.core.query.Criteria;
\r
50 import org.springframework.data.mongodb.core.query.Query;
\r
51 import org.springframework.data.mongodb.core.query.Update;
\r
52 import org.springframework.stereotype.Component;
\r
55 public class CallTestHeadDelegate implements JavaDelegate {
\r
56 private static final Logger logger = LoggerFactory.getLogger(CallTestHeadDelegate.class);
\r
59 private UserRepository userRepository;
\r
61 private GroupRepository groupRepository;
\r
63 private WorkflowUtility utility;
\r
65 private TestDefinitionRepository testDefinitionRepository;
\r
67 private TestHeadRepository testHeadRepository;
\r
69 private TestInstanceRepository testInstanceRepository;
\r
71 private MongoTemplate mongoOperation;
\r
73 // Used to retrieve the results from test head runnables.
\r
74 List<TestHeadResult> testHeadResults = Collections.synchronizedList(new ArrayList<>());
\r
77 public void execute(DelegateExecution execution) throws Exception {
\r
79 execution.getCurrentActivityId(),
\r
80 execution.getProcessDefinitionId(),
\r
81 execution.getProcessInstanceId(),
\r
82 execution.getProcessBusinessKey(),
\r
83 execution.getVariables());
\r
86 public void callTestHead(
\r
87 String currentActivityId,
\r
88 String processDefinitionId,
\r
89 String processInstanceId,
\r
90 String processBusinessKey,
\r
91 Map<String, Object> variables)
\r
93 final String logPrefix = Utility.getLoggerPrefix();
\r
94 logger.info(logPrefix + "::execute()");
\r
96 // Get vthInput from the Camunda execution variable map.
\r
97 List<Map<String, Object>> activityParameters = utility.getVthInput(variables, currentActivityId, logPrefix);
\r
99 // Get the current test execution object.
\r
100 TestExecution testExecution = utility.getTestExecution(variables, logPrefix);
\r
102 // Lookup the test head before making computations in the loop, and before calling the runnable.
\r
103 // If the lookup is made inside the runnable, concurrent test head calls would bombard the db.
\r
104 TestHead testHead = getTestHead(testExecution, currentActivityId, processDefinitionId);
\r
106 WorkflowTask workflowTask = new WorkflowTask(processInstanceId, activityParameters.size(), false);
\r
107 ExecutorService pool = workflowTask.getPool();
\r
109 // Try to cast each parameter to a Map, and create runnable tasks.
\r
110 for (int i = 0; i < activityParameters.size(); i++) {
\r
111 Object oTestHeadParameter = activityParameters.get(i);
\r
112 Map<?, ?> mTestHeadParameter;
\r
114 mTestHeadParameter = Utility.toMap(oTestHeadParameter);
\r
115 verifyOtfTestHead(mTestHeadParameter, testHead, testExecution, currentActivityId);
\r
116 } catch (Exception e) {
\r
117 // TODO: Make a design decision to either stop the execution, or attempt to convert the
\r
118 // other parameters.
\r
121 "Unable to convert test head parameter at vthInput[%s][%d] to a Map.",
\r
122 currentActivityId, i));
\r
126 // Get all the arguments for the runnable.
\r
127 Object oHeaders = mTestHeadParameter.get("headers"); // optional
\r
128 Object oMethod = mTestHeadParameter.get("method"); // required
\r
129 Object oPayload = mTestHeadParameter.get("payload"); // optional
\r
130 Object oTimeoutInMillis = mTestHeadParameter.get("timeoutInMillis"); // optional
\r
132 // Target typed parameters. Convert all objects to their expected types. Throw exceptions for
\r
133 // required parameters, or for parameters that are provided but not of the expected type.
\r
134 Map<String, String> headers = new HashMap<>();
\r
135 String method = "";
\r
136 Map<String, Object> payload = new HashMap<>();
\r
137 int timeoutInMillis = 0;
\r
139 if (oHeaders != null) {
\r
141 headers = (Map<String, String>) Utility.toMap(oHeaders);
\r
142 } catch (Exception e) {
\r
145 "Unable to convert test head parameter at vthInput[%s][%d][headers] to a Map.",
\r
146 currentActivityId, i));
\r
150 if (oMethod == null) {
\r
151 throw new TestExecutionException(
\r
153 "vthInput[%s][%d][method] is a required parameter.", currentActivityId, i));
\r
156 method = (String) oMethod;
\r
157 } catch (ClassCastException cce) {
\r
158 throw new TestExecutionException(
\r
160 "Unable to read vthInput[%s][%d][method] as primitive type String.",
\r
161 processInstanceId, i));
\r
165 if (oPayload != null) {
\r
167 payload = (Map<String, Object>) Utility.toMap(oPayload);
\r
168 } catch (Exception e) {
\r
171 "Unable to convert test head parameter at vthInput[%s][%d][payload] to a Map.",
\r
172 currentActivityId, i));
\r
176 if (oTimeoutInMillis != null) {
\r
178 timeoutInMillis = (int) oTimeoutInMillis;
\r
179 } catch (ClassCastException cce) {
\r
180 throw new TestExecutionException(
\r
182 "Unable to read vthInput[%s][%d][timeoutInMillis] as primitive type int.",
\r
183 currentActivityId, i));
\r
187 // logger.info("{}(BEFORE) PRINTING THREAD INFORMATION", logPrefix);
\r
188 // WorkflowTask.printThreadInformation();
\r
189 // logger.info("{}(BEFORE) PRINTING WORKFLOW TASKS", logPrefix);
\r
190 // WorkflowTask.printWorkflowTaskResources();
\r
191 TestHeadCallable callable =
\r
192 new TestHeadCallable(
\r
202 // Submit the test head call to the executor service.
\r
203 workflowTask.getFutures().add(pool.submit(callable));
\r
206 // Prevent new tasks from being submitted, and allow running tasks to finish.
\r
209 int numResults = 0;
\r
210 while (!pool.isTerminated()) {
\r
212 pool.awaitTermination(1, TimeUnit.SECONDS);
\r
213 } catch (InterruptedException e) {
\r
214 workflowTask.shutdown(true);
\r
219 workflowTask.shutdown(false);
\r
221 // logger.info("{}(AFTER) PRINTING THREAD INFORMATION", logPrefix);
\r
222 // WorkflowTask.printThreadInformation();
\r
223 // logger.info("{}(AFTER) PRINTING WORKFLOW TASKS", logPrefix);
\r
224 // WorkflowTask.printWorkflowTaskResources();
\r
227 private void saveTestHeadResults(String businessKey) {
\r
228 Query query = new Query();
\r
229 query.addCriteria(Criteria.where("businessKey").is(businessKey));
\r
230 Update update = new Update();
\r
231 update.set("testHeadResults", testHeadResults);
\r
232 UpdateResult result = mongoOperation.updateFirst(query, update, TestExecution.class);
\r
233 // Check the status of the findAndUpdate database, and appropriately handle the errors.
\r
234 if (result.getMatchedCount() == 0) {
\r
235 throw new TestExecutionException(
\r
237 "Unable to log the test result because a testExecution associated with businessKey, %s, was not found.",
\r
239 } else if (result.getModifiedCount() == 0) {
\r
240 throw new TestExecutionException("Unable to persist the testExecution to the database.");
\r
244 private TestHead getTestHead(
\r
245 TestExecution testExecution, String currentActivityId, String processDefinitionId) {
\r
246 List<BpmnInstance> bpmnInstances = testExecution.getHistoricTestDefinition().getBpmnInstances();
\r
247 BpmnInstance bpmnInstance =
\r
248 bpmnInstances.stream()
\r
251 _bpmnInstance.getProcessDefinitionId().equalsIgnoreCase(processDefinitionId))
\r
255 if (bpmnInstance == null) {
\r
256 throw new TestExecutionException(
\r
258 "Error looking BpmnInstance with processDefinitionId %s.", processDefinitionId));
\r
261 List<TestHeadNode> testHeads = bpmnInstance.getTestHeads();
\r
262 TestHeadNode testHeadNode =
\r
264 .filter(testHead -> testHead.getBpmnVthTaskId().equals(currentActivityId))
\r
268 if (testHeadNode == null) {
\r
269 throw new TestExecutionException(
\r
271 "No test head associated with the currentActivityId %s.", currentActivityId));
\r
274 TestHead testHead = Generic.findByIdGeneric(testHeadRepository, testHeadNode.getTestHeadId());
\r
275 if (testHead == null) {
\r
276 throw new TestExecutionException(
\r
278 "The test head with id, %s, was not found in the database.",
\r
279 testHeadNode.getTestHeadId()));
\r
281 User testExecUser = userRepository.findById(testExecution.getExecutorId().toString()).orElse(null);
\r
282 Group testheadGroup = groupRepository.findById(testHead.getGroupId().toString()).orElse(null);
\r
283 if(testExecUser == null){
\r
284 throw new TestExecutionException(
\r
285 String.format("Can not find user, user id: %s",testExecution.getExecutorId().toString()));
\r
287 if(testheadGroup == null){
\r
288 throw new TestExecutionException(
\r
289 String.format("Can not find test head group, group id: %s",testHead.getGroupId().toString())
\r
293 if( (testHead.isPublic() != null && !testHead.isPublic()) &&
\r
294 !PermissionChecker.hasPermissionTo(testExecUser,testheadGroup,UserPermission.Permission.EXECUTE,groupRepository)){
\r
295 throw new TestExecutionException(
\r
297 "User(%s) does not have permission to in testHead Group(%s)",
\r
298 testExecUser.get_id().toString(),testheadGroup.get_id().toString()
\r
304 private void verifyOtfTestHead(Map activityParams, TestHead testHead, TestExecution execution, String currentActivityId){
\r
305 String testHeadName = testHead.getTestHeadName().toLowerCase();
\r
306 switch(testHeadName) {
\r
309 TestInstance testInstance = Generic.findByIdGeneric(testInstanceRepository, execution.getHistoricTestInstance().get_id());
\r
310 Map<String, Object> internalTestDataByActivity = (Map<String, Object>) testInstance.getInternalTestData().get(currentActivityId);
\r
311 String robotFileId = (String) internalTestDataByActivity.get("robotFileId");
\r
312 Map<String, Object> testData = new HashMap<>();
\r
313 Map<String, Object> vthInput = new HashMap<>();
\r
314 testData.put("robotFileId", robotFileId);
\r
315 vthInput.put("testData", testData);
\r
316 Map<String, Object> payload = (Map<String, Object>) activityParams.get("payload");
\r
317 payload.put("vthInput", vthInput);
\r
319 catch (Exception e){
\r
320 throw new TestExecutionException(
\r
322 "Robot test head needs a robot file id: %s.", e.getMessage()));
\r