--- /dev/null
+/* Copyright (c) 2019 AT&T Intellectual Property. #\r
+# #\r
+# Licensed under the Apache License, Version 2.0 (the "License"); #\r
+# you may not use this file except in compliance with the License. #\r
+# You may obtain a copy of the License at #\r
+# #\r
+# http://www.apache.org/licenses/LICENSE-2.0 #\r
+# #\r
+# Unless required by applicable law or agreed to in writing, software #\r
+# distributed under the License is distributed on an "AS IS" BASIS, #\r
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #\r
+# See the License for the specific language governing permissions and #\r
+# limitations under the License. #\r
+##############################################################################*/\r
+\r
+\r
+package org.oran.otf.camunda.service;\r
+\r
+import org.oran.otf.camunda.configuration.OtfCamundaConfiguration;\r
+import org.oran.otf.camunda.delegate.otf.common.CallTestHeadDelegate;\r
+import org.oran.otf.camunda.delegate.otf.common.RunTestInstanceDelegate;\r
+import com.google.common.util.concurrent.ThreadFactoryBuilder;\r
+import java.util.ArrayList;\r
+import java.util.List;\r
+import java.util.concurrent.ThreadFactory;\r
+import java.util.concurrent.ThreadLocalRandom;\r
+import org.camunda.bpm.BpmPlatform;\r
+import org.camunda.bpm.engine.ExternalTaskService;\r
+import org.camunda.bpm.engine.externaltask.LockedExternalTask;\r
+import org.camunda.bpm.engine.variable.VariableMap;\r
+import org.slf4j.Logger;\r
+import org.slf4j.LoggerFactory;\r
+import org.springframework.beans.factory.annotation.Autowired;\r
+import org.springframework.beans.factory.annotation.Value;\r
+import org.springframework.boot.context.event.ApplicationReadyEvent;\r
+import org.springframework.context.event.EventListener;\r
+import org.springframework.stereotype.Component;\r
+\r
+@Component\r
+public class OtfExternalTaskService {\r
+\r
+ private static Logger logger = LoggerFactory.getLogger(OtfExternalTaskService.class);\r
+ public static boolean isEnabled;\r
+ private static long pollIntervalInMillis = 1000;\r
+ @Autowired CallTestHeadDelegate callTestHeadDelegate;\r
+ @Autowired RunTestInstanceDelegate runTestInstanceDelegate;\r
+ private ExternalTaskService externalTaskService;\r
+\r
+ private List<LockedExternalTask> externalTasks;\r
+\r
+ @Value("${otf.camunda.executors-active}")\r
+ private boolean executorsActive;\r
+\r
+ @EventListener(ApplicationReadyEvent.class)\r
+ public void initialize() {\r
+ this.externalTaskService =\r
+ BpmPlatform.getProcessEngineService()\r
+ .getProcessEngine(OtfCamundaConfiguration.processEngineName)\r
+ .getExternalTaskService();\r
+\r
+ pollIntervalInMillis = ThreadLocalRandom.current().nextLong(500, 5000);\r
+ // this.externalTaskService =\r
+ // BpmPlatform.getProcessEngineService()\r
+ // .getProcessEngine(OtfCamundaConfiguration.processEngineName)\r
+ // .getExternalTaskService();\r
+\r
+ logger.info(\r
+ "Initializing external task service with poll interval at {}", pollIntervalInMillis);\r
+ externalTasks = new ArrayList<>();\r
+ isEnabled = this.executorsActive;\r
+ logger.info("External Task Worker otf.camunda.executors-active set to : " + this.executorsActive);\r
+ Thread t =\r
+ new Thread(\r
+ () -> {\r
+ while (true) {\r
+ try {\r
+ if (isEnabled) {\r
+ acquire();\r
+ }\r
+\r
+ Thread.sleep(pollIntervalInMillis);\r
+ } catch (Exception e) {\r
+ logger.error(e.getMessage());\r
+ }\r
+ }\r
+ });\r
+\r
+ t.start();\r
+ }\r
+\r
+ private void acquire() {\r
+ externalTasks.clear();\r
+ List<LockedExternalTask> externalTasks =\r
+ externalTaskService\r
+ .fetchAndLock(10, "etw_" + OtfCamundaConfiguration.processEngineName)\r
+ .topic("vth", 43200000)\r
+ .enableCustomObjectDeserialization()\r
+ .topic("testInstance", 43200000)\r
+ .enableCustomObjectDeserialization()\r
+ .execute();\r
+ externalTasks.forEach(this::handleExternalTask);\r
+ }\r
+\r
+ private void handleExternalTask(LockedExternalTask task) {\r
+ logger.info("[" + task.getId() + "]: Handling external task for topic: " + task.getTopicName());\r
+ String topicName = task.getTopicName();\r
+ ExternalTaskCallable callable;\r
+\r
+ // Set retries to 0 for the current task.\r
+ // externalTaskService.setRetries(task.getId(), 0);\r
+\r
+ switch (topicName) {\r
+ case "vth":\r
+ callable = new ExternalTaskCallable(task, OtfExternalTask.VTH);\r
+ break;\r
+ case "testInstance":\r
+ callable = new ExternalTaskCallable(task, OtfExternalTask.TEST_INSTANCE);\r
+ break;\r
+ default:\r
+ String err = String.format("The topic name %s has no external task handler.", topicName);\r
+ logger.error(err);\r
+ externalTaskService.handleFailure(task.getId(), task.getWorkerId(), err, 0, 0);\r
+ return;\r
+ }\r
+\r
+ try {\r
+ ThreadFactory namedThreadFactory =\r
+ new ThreadFactoryBuilder().setNameFormat("etw-" + task.getTopicName() + "-%d").build();\r
+ namedThreadFactory.newThread(callable).start();\r
+ } catch (Exception e) {\r
+ externalTaskService.handleFailure(\r
+ task.getId(), task.getWorkerId(), e.getMessage(), e.toString(), 0, 0);\r
+ }\r
+ }\r
+\r
+ public enum OtfExternalTask {\r
+ VTH,\r
+ TEST_INSTANCE\r
+ }\r
+\r
+ public class ExternalTaskCallable implements Runnable {\r
+\r
+ private final LockedExternalTask task;\r
+ private final OtfExternalTask type;\r
+\r
+ private final String activityId;\r
+ private final String processDefinitionId;\r
+ private final String processInstanceId;\r
+ private final String processBusinessKey;\r
+ private VariableMap variables;\r
+\r
+ private ExternalTaskCallable(LockedExternalTask lockedExternalTask, OtfExternalTask type) {\r
+ this.task = lockedExternalTask;\r
+ this.type = type;\r
+\r
+ this.activityId = task.getActivityId();\r
+ this.processDefinitionId = task.getProcessDefinitionId();\r
+ this.processInstanceId = task.getProcessInstanceId();\r
+ this.processBusinessKey = task.getBusinessKey();\r
+ this.variables = task.getVariables();\r
+ }\r
+\r
+ @Override\r
+ public void run() {\r
+ try {\r
+ if (type == OtfExternalTask.VTH) {\r
+ callTestHeadDelegate.callTestHead(\r
+ activityId, processDefinitionId, processInstanceId, processBusinessKey, variables);\r
+ } else if (type == OtfExternalTask.TEST_INSTANCE) {\r
+ runTestInstanceDelegate.runTestInstance(activityId, processInstanceId, variables);\r
+ } else {\r
+ logger.error(\r
+ String.format(\r
+ "Could not find the appropriate function for external task with id %s.", type));\r
+ }\r
+ } catch (Exception e) {\r
+ String err = String.format("Encountered error %s", e.getMessage());\r
+ externalTaskService.handleFailure(\r
+ task.getId(), task.getWorkerId(), e.getMessage(), err, 0, 0);\r
+ return;\r
+ }\r
+\r
+ synchronized (externalTaskService) {\r
+ try {\r
+ externalTaskService.complete(task.getId(), task.getWorkerId(), variables);\r
+ } catch (Exception e) {\r
+ String err = String.format("Encountered error %s", e.getMessage());\r
+ e.printStackTrace();\r
+ externalTaskService.handleFailure(\r
+ task.getId(), task.getWorkerId(), e.getMessage(), err, 0, 0);\r
+ }\r
+ }\r
+ }\r
+ }\r
+}\r