added svcapi ui and camunda code
[it/otf.git] / otf-camunda / src / main / java / org / oran / otf / camunda / service / OtfExternalTaskService.java
diff --git a/otf-camunda/src/main/java/org/oran/otf/camunda/service/OtfExternalTaskService.java b/otf-camunda/src/main/java/org/oran/otf/camunda/service/OtfExternalTaskService.java
new file mode 100644 (file)
index 0000000..c23d1cb
--- /dev/null
@@ -0,0 +1,195 @@
+/*  Copyright (c) 2019 AT&T Intellectual Property.                             #\r
+#                                                                              #\r
+#   Licensed under the Apache License, Version 2.0 (the "License");            #\r
+#   you may not use this file except in compliance with the License.           #\r
+#   You may obtain a copy of the License at                                    #\r
+#                                                                              #\r
+#       http://www.apache.org/licenses/LICENSE-2.0                             #\r
+#                                                                              #\r
+#   Unless required by applicable law or agreed to in writing, software        #\r
+#   distributed under the License is distributed on an "AS IS" BASIS,          #\r
+#   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #\r
+#   See the License for the specific language governing permissions and        #\r
+#   limitations under the License.                                             #\r
+##############################################################################*/\r
+\r
+\r
+package org.oran.otf.camunda.service;\r
+\r
+import org.oran.otf.camunda.configuration.OtfCamundaConfiguration;\r
+import org.oran.otf.camunda.delegate.otf.common.CallTestHeadDelegate;\r
+import org.oran.otf.camunda.delegate.otf.common.RunTestInstanceDelegate;\r
+import com.google.common.util.concurrent.ThreadFactoryBuilder;\r
+import java.util.ArrayList;\r
+import java.util.List;\r
+import java.util.concurrent.ThreadFactory;\r
+import java.util.concurrent.ThreadLocalRandom;\r
+import org.camunda.bpm.BpmPlatform;\r
+import org.camunda.bpm.engine.ExternalTaskService;\r
+import org.camunda.bpm.engine.externaltask.LockedExternalTask;\r
+import org.camunda.bpm.engine.variable.VariableMap;\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+import org.springframework.beans.factory.annotation.Autowired;\r
+import org.springframework.beans.factory.annotation.Value;\r
+import org.springframework.boot.context.event.ApplicationReadyEvent;\r
+import org.springframework.context.event.EventListener;\r
+import org.springframework.stereotype.Component;\r
+\r
+@Component\r
+public class OtfExternalTaskService {\r
+\r
+  private static Logger logger = LoggerFactory.getLogger(OtfExternalTaskService.class);\r
+  public static boolean isEnabled;\r
+  private static long pollIntervalInMillis = 1000;\r
+  @Autowired CallTestHeadDelegate callTestHeadDelegate;\r
+  @Autowired RunTestInstanceDelegate runTestInstanceDelegate;\r
+  private ExternalTaskService externalTaskService;\r
+\r
+  private List<LockedExternalTask> externalTasks;\r
+\r
+  @Value("${otf.camunda.executors-active}")\r
+  private boolean executorsActive;\r
+\r
+  @EventListener(ApplicationReadyEvent.class)\r
+  public void initialize() {\r
+    this.externalTaskService =\r
+        BpmPlatform.getProcessEngineService()\r
+            .getProcessEngine(OtfCamundaConfiguration.processEngineName)\r
+            .getExternalTaskService();\r
+\r
+    pollIntervalInMillis = ThreadLocalRandom.current().nextLong(500, 5000);\r
+    //    this.externalTaskService =\r
+    //        BpmPlatform.getProcessEngineService()\r
+    //            .getProcessEngine(OtfCamundaConfiguration.processEngineName)\r
+    //            .getExternalTaskService();\r
+\r
+    logger.info(\r
+        "Initializing external task service with poll interval at {}", pollIntervalInMillis);\r
+    externalTasks = new ArrayList<>();\r
+    isEnabled = this.executorsActive;\r
+    logger.info("External Task Worker otf.camunda.executors-active set to : "  + this.executorsActive);\r
+    Thread t =\r
+        new Thread(\r
+            () -> {\r
+              while (true) {\r
+                try {\r
+                  if (isEnabled) {\r
+                    acquire();\r
+                  }\r
+\r
+                  Thread.sleep(pollIntervalInMillis);\r
+                } catch (Exception e) {\r
+                  logger.error(e.getMessage());\r
+                }\r
+              }\r
+            });\r
+\r
+    t.start();\r
+  }\r
+\r
+  private void acquire() {\r
+    externalTasks.clear();\r
+    List<LockedExternalTask> externalTasks =\r
+        externalTaskService\r
+            .fetchAndLock(10, "etw_" + OtfCamundaConfiguration.processEngineName)\r
+            .topic("vth", 43200000)\r
+            .enableCustomObjectDeserialization()\r
+            .topic("testInstance", 43200000)\r
+            .enableCustomObjectDeserialization()\r
+            .execute();\r
+    externalTasks.forEach(this::handleExternalTask);\r
+  }\r
+\r
+  private void handleExternalTask(LockedExternalTask task) {\r
+    logger.info("[" + task.getId() + "]: Handling external task for topic: " + task.getTopicName());\r
+    String topicName = task.getTopicName();\r
+    ExternalTaskCallable callable;\r
+\r
+    // Set retries to 0 for the current task.\r
+    // externalTaskService.setRetries(task.getId(), 0);\r
+\r
+    switch (topicName) {\r
+      case "vth":\r
+        callable = new ExternalTaskCallable(task, OtfExternalTask.VTH);\r
+        break;\r
+      case "testInstance":\r
+        callable = new ExternalTaskCallable(task, OtfExternalTask.TEST_INSTANCE);\r
+        break;\r
+      default:\r
+        String err = String.format("The topic name %s has no external task handler.", topicName);\r
+        logger.error(err);\r
+        externalTaskService.handleFailure(task.getId(), task.getWorkerId(), err, 0, 0);\r
+        return;\r
+    }\r
+\r
+    try {\r
+      ThreadFactory namedThreadFactory =\r
+          new ThreadFactoryBuilder().setNameFormat("etw-" + task.getTopicName() + "-%d").build();\r
+      namedThreadFactory.newThread(callable).start();\r
+    } catch (Exception e) {\r
+      externalTaskService.handleFailure(\r
+          task.getId(), task.getWorkerId(), e.getMessage(), e.toString(), 0, 0);\r
+    }\r
+  }\r
+\r
+  public enum OtfExternalTask {\r
+    VTH,\r
+    TEST_INSTANCE\r
+  }\r
+\r
+  public class ExternalTaskCallable implements Runnable {\r
+\r
+    private final LockedExternalTask task;\r
+    private final OtfExternalTask type;\r
+\r
+    private final String activityId;\r
+    private final String processDefinitionId;\r
+    private final String processInstanceId;\r
+    private final String processBusinessKey;\r
+    private VariableMap variables;\r
+\r
+    private ExternalTaskCallable(LockedExternalTask lockedExternalTask, OtfExternalTask type) {\r
+      this.task = lockedExternalTask;\r
+      this.type = type;\r
+\r
+      this.activityId = task.getActivityId();\r
+      this.processDefinitionId = task.getProcessDefinitionId();\r
+      this.processInstanceId = task.getProcessInstanceId();\r
+      this.processBusinessKey = task.getBusinessKey();\r
+      this.variables = task.getVariables();\r
+    }\r
+\r
+    @Override\r
+    public void run() {\r
+      try {\r
+        if (type == OtfExternalTask.VTH) {\r
+          callTestHeadDelegate.callTestHead(\r
+              activityId, processDefinitionId, processInstanceId, processBusinessKey, variables);\r
+        } else if (type == OtfExternalTask.TEST_INSTANCE) {\r
+          runTestInstanceDelegate.runTestInstance(activityId, processInstanceId, variables);\r
+        } else {\r
+          logger.error(\r
+              String.format(\r
+                  "Could not find the appropriate function for external task with id %s.", type));\r
+        }\r
+      } catch (Exception e) {\r
+        String err = String.format("Encountered error %s", e.getMessage());\r
+        externalTaskService.handleFailure(\r
+            task.getId(), task.getWorkerId(), e.getMessage(), err, 0, 0);\r
+        return;\r
+      }\r
+\r
+      synchronized (externalTaskService) {\r
+        try {\r
+          externalTaskService.complete(task.getId(), task.getWorkerId(), variables);\r
+        } catch (Exception e) {\r
+          String err = String.format("Encountered error %s", e.getMessage());\r
+          e.printStackTrace();\r
+          externalTaskService.handleFailure(\r
+                  task.getId(), task.getWorkerId(), e.getMessage(), err, 0, 0);\r
+        }\r
+      }\r
+    }\r
+  }\r
+}\r