X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=otf-camunda%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fotf%2Fcamunda%2Fdelegate%2Fotf%2Fcommon%2FCallTestHeadDelegate.java;fp=otf-camunda%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fotf%2Fcamunda%2Fdelegate%2Fotf%2Fcommon%2FCallTestHeadDelegate.java;h=45511b0344fa4e141757477f0b28fb04449788d1;hb=14f6f95c84a4a1fa8774190db4a03fd0214ec55f;hp=0000000000000000000000000000000000000000;hpb=f49bd1efeaaddd4891c1f329b18d8cfb28b3e75b;p=it%2Fotf.git diff --git a/otf-camunda/src/main/java/org/oran/otf/camunda/delegate/otf/common/CallTestHeadDelegate.java b/otf-camunda/src/main/java/org/oran/otf/camunda/delegate/otf/common/CallTestHeadDelegate.java new file mode 100644 index 0000000..45511b0 --- /dev/null +++ b/otf-camunda/src/main/java/org/oran/otf/camunda/delegate/otf/common/CallTestHeadDelegate.java @@ -0,0 +1,329 @@ +/* Copyright (c) 2019 AT&T Intellectual Property. # +# # +# Licensed under the Apache License, Version 2.0 (the "License"); # +# you may not use this file except in compliance with the License. # +# You may obtain a copy of the License at # +# # +# http://www.apache.org/licenses/LICENSE-2.0 # +# # +# Unless required by applicable law or agreed to in writing, software # +# distributed under the License is distributed on an "AS IS" BASIS, # +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # +# See the License for the specific language governing permissions and # +# limitations under the License. # +##############################################################################*/ + + +package org.oran.otf.camunda.delegate.otf.common; + +import org.oran.otf.camunda.delegate.otf.common.runnable.TestHeadCallable; +import org.oran.otf.camunda.exception.TestExecutionException; +import org.oran.otf.camunda.workflow.utility.WorkflowTask; +import org.oran.otf.camunda.workflow.utility.WorkflowUtility; +import org.oran.otf.common.model.*; +import org.oran.otf.common.model.local.BpmnInstance; +import org.oran.otf.common.model.local.TestHeadNode; +import org.oran.otf.common.model.local.TestHeadResult; +import org.oran.otf.common.repository.*; +import org.oran.otf.common.utility.Utility; +import org.oran.otf.common.utility.database.Generic; +import org.oran.otf.common.utility.permissions.PermissionChecker; +import org.oran.otf.common.utility.permissions.UserPermission; +import com.mongodb.client.result.UpdateResult; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; + +import org.camunda.bpm.engine.delegate.DelegateExecution; +import org.camunda.bpm.engine.delegate.JavaDelegate; +import org.oran.otf.common.model.*; +import org.oran.otf.common.repository.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.mongodb.core.MongoTemplate; +import org.springframework.data.mongodb.core.query.Criteria; +import org.springframework.data.mongodb.core.query.Query; +import org.springframework.data.mongodb.core.query.Update; +import org.springframework.stereotype.Component; + +@Component +public class CallTestHeadDelegate implements JavaDelegate { + private static final Logger logger = LoggerFactory.getLogger(CallTestHeadDelegate.class); + + @Autowired + private UserRepository userRepository; + @Autowired + private GroupRepository groupRepository; + @Autowired + private WorkflowUtility utility; + @Autowired + private TestDefinitionRepository testDefinitionRepository; + @Autowired + private TestHeadRepository testHeadRepository; + @Autowired + private TestInstanceRepository testInstanceRepository; + @Autowired + private MongoTemplate mongoOperation; + + // Used to retrieve the results from test head runnables. + List testHeadResults = Collections.synchronizedList(new ArrayList<>()); + + @Override + public void execute(DelegateExecution execution) throws Exception { + callTestHead( + execution.getCurrentActivityId(), + execution.getProcessDefinitionId(), + execution.getProcessInstanceId(), + execution.getProcessBusinessKey(), + execution.getVariables()); + } + + public void callTestHead( + String currentActivityId, + String processDefinitionId, + String processInstanceId, + String processBusinessKey, + Map variables) + throws Exception { + final String logPrefix = Utility.getLoggerPrefix(); + logger.info(logPrefix + "::execute()"); + + // Get vthInput from the Camunda execution variable map. + List> activityParameters = utility.getVthInput(variables, currentActivityId, logPrefix); + + // Get the current test execution object. + TestExecution testExecution = utility.getTestExecution(variables, logPrefix); + + // Lookup the test head before making computations in the loop, and before calling the runnable. + // If the lookup is made inside the runnable, concurrent test head calls would bombard the db. + TestHead testHead = getTestHead(testExecution, currentActivityId, processDefinitionId); + + WorkflowTask workflowTask = new WorkflowTask(processInstanceId, activityParameters.size(), false); + ExecutorService pool = workflowTask.getPool(); + + // Try to cast each parameter to a Map, and create runnable tasks. + for (int i = 0; i < activityParameters.size(); i++) { + Object oTestHeadParameter = activityParameters.get(i); + Map mTestHeadParameter; + try { + mTestHeadParameter = Utility.toMap(oTestHeadParameter); + verifyOtfTestHead(mTestHeadParameter, testHead, testExecution, currentActivityId); + } catch (Exception e) { + // TODO: Make a design decision to either stop the execution, or attempt to convert the + // other parameters. + logger.error( + String.format( + "Unable to convert test head parameter at vthInput[%s][%d] to a Map.", + currentActivityId, i)); + continue; + } + + // Get all the arguments for the runnable. + Object oHeaders = mTestHeadParameter.get("headers"); // optional + Object oMethod = mTestHeadParameter.get("method"); // required + Object oPayload = mTestHeadParameter.get("payload"); // optional + Object oTimeoutInMillis = mTestHeadParameter.get("timeoutInMillis"); // optional + + // Target typed parameters. Convert all objects to their expected types. Throw exceptions for + // required parameters, or for parameters that are provided but not of the expected type. + Map headers = new HashMap<>(); + String method = ""; + Map payload = new HashMap<>(); + int timeoutInMillis = 0; + + if (oHeaders != null) { + try { + headers = (Map) Utility.toMap(oHeaders); + } catch (Exception e) { + logger.error( + String.format( + "Unable to convert test head parameter at vthInput[%s][%d][headers] to a Map.", + currentActivityId, i)); + } + } + + if (oMethod == null) { + throw new TestExecutionException( + String.format( + "vthInput[%s][%d][method] is a required parameter.", currentActivityId, i)); + } else { + try { + method = (String) oMethod; + } catch (ClassCastException cce) { + throw new TestExecutionException( + String.format( + "Unable to read vthInput[%s][%d][method] as primitive type String.", + processInstanceId, i)); + } + } + + if (oPayload != null) { + try { + payload = (Map) Utility.toMap(oPayload); + } catch (Exception e) { + logger.error( + String.format( + "Unable to convert test head parameter at vthInput[%s][%d][payload] to a Map.", + currentActivityId, i)); + } + } + + if (oTimeoutInMillis != null) { + try { + timeoutInMillis = (int) oTimeoutInMillis; + } catch (ClassCastException cce) { + throw new TestExecutionException( + String.format( + "Unable to read vthInput[%s][%d][timeoutInMillis] as primitive type int.", + currentActivityId, i)); + } + } + +// logger.info("{}(BEFORE) PRINTING THREAD INFORMATION", logPrefix); +// WorkflowTask.printThreadInformation(); +// logger.info("{}(BEFORE) PRINTING WORKFLOW TASKS", logPrefix); +// WorkflowTask.printWorkflowTaskResources(); + TestHeadCallable callable = + new TestHeadCallable( + timeoutInMillis, + method, + headers, + payload, + testHead, + currentActivityId, + testExecution, + mongoOperation); + + // Submit the test head call to the executor service. + workflowTask.getFutures().add(pool.submit(callable)); + } + + // Prevent new tasks from being submitted, and allow running tasks to finish. + pool.shutdown(); + + int numResults = 0; + while (!pool.isTerminated()) { + try { + pool.awaitTermination(1, TimeUnit.SECONDS); + } catch (InterruptedException e) { + workflowTask.shutdown(true); + throw e; + } + } + + workflowTask.shutdown(false); + +// logger.info("{}(AFTER) PRINTING THREAD INFORMATION", logPrefix); +// WorkflowTask.printThreadInformation(); +// logger.info("{}(AFTER) PRINTING WORKFLOW TASKS", logPrefix); +// WorkflowTask.printWorkflowTaskResources(); + } + + private void saveTestHeadResults(String businessKey) { + Query query = new Query(); + query.addCriteria(Criteria.where("businessKey").is(businessKey)); + Update update = new Update(); + update.set("testHeadResults", testHeadResults); + UpdateResult result = mongoOperation.updateFirst(query, update, TestExecution.class); + // Check the status of the findAndUpdate database, and appropriately handle the errors. + if (result.getMatchedCount() == 0) { + throw new TestExecutionException( + String.format( + "Unable to log the test result because a testExecution associated with businessKey, %s, was not found.", + businessKey)); + } else if (result.getModifiedCount() == 0) { + throw new TestExecutionException("Unable to persist the testExecution to the database."); + } + } + + private TestHead getTestHead( + TestExecution testExecution, String currentActivityId, String processDefinitionId) { + List bpmnInstances = testExecution.getHistoricTestDefinition().getBpmnInstances(); + BpmnInstance bpmnInstance = + bpmnInstances.stream() + .filter( + _bpmnInstance -> + _bpmnInstance.getProcessDefinitionId().equalsIgnoreCase(processDefinitionId)) + .findFirst() + .orElse(null); + + if (bpmnInstance == null) { + throw new TestExecutionException( + String.format( + "Error looking BpmnInstance with processDefinitionId %s.", processDefinitionId)); + } + + List testHeads = bpmnInstance.getTestHeads(); + TestHeadNode testHeadNode = + testHeads.stream() + .filter(testHead -> testHead.getBpmnVthTaskId().equals(currentActivityId)) + .findAny() + .orElse(null); + + if (testHeadNode == null) { + throw new TestExecutionException( + String.format( + "No test head associated with the currentActivityId %s.", currentActivityId)); + } + + TestHead testHead = Generic.findByIdGeneric(testHeadRepository, testHeadNode.getTestHeadId()); + if (testHead == null) { + throw new TestExecutionException( + String.format( + "The test head with id, %s, was not found in the database.", + testHeadNode.getTestHeadId())); + } + User testExecUser = userRepository.findById(testExecution.getExecutorId().toString()).orElse(null); + Group testheadGroup = groupRepository.findById(testHead.getGroupId().toString()).orElse(null); + if(testExecUser == null){ + throw new TestExecutionException( + String.format("Can not find user, user id: %s",testExecution.getExecutorId().toString())); + } + if(testheadGroup == null){ + throw new TestExecutionException( + String.format("Can not find test head group, group id: %s",testHead.getGroupId().toString()) + ); + } + + if( (testHead.isPublic() != null && !testHead.isPublic()) && + !PermissionChecker.hasPermissionTo(testExecUser,testheadGroup,UserPermission.Permission.EXECUTE,groupRepository)){ + throw new TestExecutionException( + String.format( + "User(%s) does not have permission to in testHead Group(%s)", + testExecUser.get_id().toString(),testheadGroup.get_id().toString() + )); + } + return testHead; + } + + private void verifyOtfTestHead(Map activityParams, TestHead testHead, TestExecution execution, String currentActivityId){ + String testHeadName = testHead.getTestHeadName().toLowerCase(); + switch(testHeadName) { + case "robot": + try { + TestInstance testInstance = Generic.findByIdGeneric(testInstanceRepository, execution.getHistoricTestInstance().get_id()); + Map internalTestDataByActivity = (Map) testInstance.getInternalTestData().get(currentActivityId); + String robotFileId = (String) internalTestDataByActivity.get("robotFileId"); + Map testData = new HashMap<>(); + Map vthInput = new HashMap<>(); + testData.put("robotFileId", robotFileId); + vthInput.put("testData", testData); + Map payload = (Map) activityParams.get("payload"); + payload.put("vthInput", vthInput); + } + catch (Exception e){ + throw new TestExecutionException( + String.format( + "Robot test head needs a robot file id: %s.", e.getMessage())); + } + break; + default: + break; + } + } +}