added svcapi ui and camunda code
[it/otf.git] / otf-camunda / src / main / java / org / oran / otf / camunda / delegate / otf / common / RunTestInstanceDelegate.java
diff --git a/otf-camunda/src/main/java/org/oran/otf/camunda/delegate/otf/common/RunTestInstanceDelegate.java b/otf-camunda/src/main/java/org/oran/otf/camunda/delegate/otf/common/RunTestInstanceDelegate.java
new file mode 100644 (file)
index 0000000..7b90e7e
--- /dev/null
@@ -0,0 +1,180 @@
+/*  Copyright (c) 2019 AT&T Intellectual Property.                             #\r
+#                                                                              #\r
+#   Licensed under the Apache License, Version 2.0 (the "License");            #\r
+#   you may not use this file except in compliance with the License.           #\r
+#   You may obtain a copy of the License at                                    #\r
+#                                                                              #\r
+#       http://www.apache.org/licenses/LICENSE-2.0                             #\r
+#                                                                              #\r
+#   Unless required by applicable law or agreed to in writing, software        #\r
+#   distributed under the License is distributed on an "AS IS" BASIS,          #\r
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #\r
+#   See the License for the specific language governing permissions and        #\r
+#   limitations under the License.                                             #\r
+##############################################################################*/\r
+\r
+\r
+package org.oran.otf.camunda.delegate.otf.common;\r
+\r
+import org.oran.otf.camunda.delegate.otf.common.runnable.SynchronousTestInstanceCallable;\r
+import org.oran.otf.camunda.exception.TestExecutionException;\r
+import org.oran.otf.camunda.workflow.WorkflowProcessor;\r
+import org.oran.otf.camunda.workflow.WorkflowRequest;\r
+import org.oran.otf.camunda.workflow.utility.WorkflowTask;\r
+import org.oran.otf.camunda.workflow.utility.WorkflowUtility;\r
+import org.oran.otf.common.model.TestExecution;\r
+import org.oran.otf.common.model.local.ParallelFlowInput;\r
+import org.oran.otf.common.repository.TestExecutionRepository;\r
+import org.oran.otf.common.utility.Utility;\r
+import java.util.ArrayList;\r
+import java.util.Collections;\r
+import java.util.List;\r
+import java.util.Map;\r
+import java.util.concurrent.ExecutorService;\r
+import java.util.concurrent.TimeUnit;\r
+import org.camunda.bpm.engine.delegate.DelegateExecution;\r
+import org.camunda.bpm.engine.delegate.JavaDelegate;\r
+import org.oran.otf.camunda.model.ExecutionConstants;\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+import org.springframework.beans.factory.annotation.Autowired;\r
+import org.springframework.data.mongodb.core.MongoTemplate;\r
+import org.springframework.stereotype.Component;\r
+\r
+@Component\r
+public class RunTestInstanceDelegate implements JavaDelegate {\r
+\r
+  private final String logPrefix = Utility.getLoggerPrefix();\r
+  private final Logger logger = LoggerFactory.getLogger(RunTestInstanceDelegate.class);\r
+  // Used to retrieve the results from test head runnables.\r
+  private final List<TestExecution> testExecutions =\r
+      Collections.synchronizedList(new ArrayList<>());\r
+\r
+  private @Autowired\r
+  WorkflowUtility utility;\r
+  private @Autowired\r
+  TestExecutionRepository testExecutionRepository;\r
+  private @Autowired\r
+  WorkflowProcessor processor;\r
+  private @Autowired MongoTemplate mongoOperation;\r
+\r
+  @Override\r
+  public void execute(DelegateExecution execution) throws Exception {\r
+    runTestInstance(\r
+        execution.getCurrentActivityId(),\r
+        execution.getProcessInstanceId(),\r
+        execution.getVariables());\r
+  }\r
+\r
+  public void runTestInstance(\r
+      String currentActivityId, String processInstanceId, Map<String, Object> variables)\r
+      throws Exception {\r
+    @SuppressWarnings("unchecked")\r
+\r
+    // Get the current test execution object to pass as an argument to the callable, and for data\r
+    // stored in the historicTestInstance\r
+    TestExecution testExecution = utility.getTestExecution(variables, logPrefix);\r
+\r
+    // Get the parallel flow input\r
+    Map<String, ParallelFlowInput> pfloInput =\r
+        (Map<String, ParallelFlowInput>) variables.get("pfloInput");\r
+\r
+    if (!pfloInput.containsKey(currentActivityId)) {\r
+      throw new TestExecutionException(\r
+          String.format(\r
+              "%sCould not find activityId %s in pfloInput.", logPrefix, currentActivityId));\r
+    }\r
+\r
+    ParallelFlowInput parallelFlowInput = pfloInput.get(currentActivityId);\r
+    List<WorkflowRequest> args = parallelFlowInput.getArgs();\r
+    boolean interruptOnFailure = parallelFlowInput.isInterruptOnFailure();\r
+    int maxFailures = parallelFlowInput.getMaxFailures();\r
+    int threadPoolSize = parallelFlowInput.getThreadPoolSize();\r
+\r
+    WorkflowTask workflowTask =\r
+        new WorkflowTask(processInstanceId, threadPoolSize, interruptOnFailure);\r
+    ExecutorService pool = workflowTask.getPool();\r
+\r
+//    logger.info("{}(BEFORE) PRINTING THREAD INFORMATION", logPrefix);\r
+//    WorkflowTask.printThreadInformation();\r
+//    logger.info("{}(BEFORE) PRINTING WORKFLOW TASKS", logPrefix);\r
+//    WorkflowTask.printWorkflowTaskResources();\r
+\r
+    for (WorkflowRequest request : args) {\r
+      request.setExecutorId(testExecution.getExecutorId());\r
+      // If an inner workflow calls the parent workflow, there is a cyclic dependency. To prevent\r
+      // infinite test instances from being spawned, we need to check for cycles. This is only a top\r
+      // level check, but a more thorough check needs to be implemented after 1906.\r
+      if (request.getTestInstanceId() == testExecution.getHistoricTestInstance().get_id()) {\r
+        // Prevent new tasks from being submitted\r
+        pool.shutdown();\r
+        // Shutdown the thread pool, and cleanup threads.\r
+        workflowTask.shutdown(true);\r
+        break;\r
+      }\r
+\r
+      SynchronousTestInstanceCallable synchronousTestInstanceCallable =\r
+          new SynchronousTestInstanceCallable(\r
+              request, testExecution, testExecutionRepository, processor, mongoOperation);\r
+      workflowTask.getFutures().add(pool.submit(synchronousTestInstanceCallable));\r
+    }\r
+\r
+    // Prevent new tasks from being submitted, and allow running tasks to finish.\r
+    pool.shutdown();\r
+\r
+    // Wait for test instances to finish execution, and check for failures.\r
+    while (!pool.isTerminated()) {\r
+      try {\r
+        // Terminate tasks if the test execution failure limit is reached.\r
+        int numFailures;\r
+        synchronized (testExecution) {\r
+          numFailures = getNumberOfFailures(testExecution.getTestInstanceResults());\r
+        }\r
+\r
+        if (numFailures > maxFailures) {\r
+          logger.error(\r
+              String.format(\r
+                  "[PARENT-%s] Shutting down workflow - numFailures: %s, maxFailures: %s.",\r
+                  processInstanceId, numFailures, maxFailures));\r
+          workflowTask.shutdown();\r
+        }\r
+\r
+        pool.awaitTermination(1, TimeUnit.SECONDS);\r
+      } catch (InterruptedException e) {\r
+        throw e;\r
+      }\r
+    }\r
+\r
+    workflowTask.shutdown(false);\r
+\r
+//    logger.info("{}(AFTER) PRINTING THREAD INFORMATION", logPrefix);\r
+//    WorkflowTask.printThreadInformation();\r
+//    logger.info("{}(AFTER) PRINTING WORKFLOW TASKS", logPrefix);\r
+//    WorkflowTask.printWorkflowTaskResources();\r
+  }\r
+\r
+  // Gets the total number of testExecutions that have failed.\r
+  private int getNumberOfFailures(List<TestExecution> testExecutions) {\r
+    int numFailures = 0;\r
+\r
+    for (TestExecution testExecution : testExecutions) {\r
+      if (isTestFailed(testExecution)) {\r
+        numFailures++;\r
+      }\r
+    }\r
+\r
+    return numFailures;\r
+  }\r
+\r
+  // Checks if the testResult is marked as FAILED or FAILURE.\r
+  private boolean isTestFailed(TestExecution testExecution) {\r
+    String testResult = testExecution.getTestResult();\r
+    logger.debug(\r
+        String.format(\r
+            "[PARENT-%s] Test result for inner execution: %s.",\r
+            testExecution.getProcessInstanceId(), testExecution.getTestResult()));\r
+    return testResult.equalsIgnoreCase(ExecutionConstants.TestResult.FAILED)\r
+//        || testResult.equalsIgnoreCase(TestResult.FAILED)\r
+        || testResult.equalsIgnoreCase(ExecutionConstants.TestResult.TERMINATED);\r
+  }\r
+}\r