1 /* Copyright (c) 2019 AT&T Intellectual Property. #
\r
3 # Licensed under the Apache License, Version 2.0 (the "License"); #
\r
4 # you may not use this file except in compliance with the License. #
\r
5 # You may obtain a copy of the License at #
\r
7 # http://www.apache.org/licenses/LICENSE-2.0 #
\r
9 # Unless required by applicable law or agreed to in writing, software #
\r
10 # distributed under the License is distributed on an "AS IS" BASIS, #
\r
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
\r
12 # See the License for the specific language governing permissions and #
\r
13 # limitations under the License. #
\r
14 ##############################################################################*/
\r
17 package org.oran.otf.camunda.service;
\r
19 import org.oran.otf.camunda.configuration.OtfCamundaConfiguration;
\r
20 import org.oran.otf.camunda.delegate.otf.common.CallTestHeadDelegate;
\r
21 import org.oran.otf.camunda.delegate.otf.common.RunTestInstanceDelegate;
\r
22 import com.google.common.util.concurrent.ThreadFactoryBuilder;
\r
23 import java.util.ArrayList;
\r
24 import java.util.List;
\r
25 import java.util.concurrent.ThreadFactory;
\r
26 import java.util.concurrent.ThreadLocalRandom;
\r
27 import org.camunda.bpm.BpmPlatform;
\r
28 import org.camunda.bpm.engine.ExternalTaskService;
\r
29 import org.camunda.bpm.engine.externaltask.LockedExternalTask;
\r
30 import org.camunda.bpm.engine.variable.VariableMap;
\r
31 import org.slf4j.Logger;
\r
32 import org.slf4j.LoggerFactory;
\r
33 import org.springframework.beans.factory.annotation.Autowired;
\r
34 import org.springframework.beans.factory.annotation.Value;
\r
35 import org.springframework.boot.context.event.ApplicationReadyEvent;
\r
36 import org.springframework.context.event.EventListener;
\r
37 import org.springframework.stereotype.Component;
\r
40 public class OtfExternalTaskService {
\r
42 private static Logger logger = LoggerFactory.getLogger(OtfExternalTaskService.class);
\r
43 public static boolean isEnabled;
\r
44 private static long pollIntervalInMillis = 1000;
\r
45 @Autowired CallTestHeadDelegate callTestHeadDelegate;
\r
46 @Autowired RunTestInstanceDelegate runTestInstanceDelegate;
\r
47 private ExternalTaskService externalTaskService;
\r
49 private List<LockedExternalTask> externalTasks;
\r
51 @Value("${otf.camunda.executors-active}")
\r
52 private boolean executorsActive;
\r
54 @EventListener(ApplicationReadyEvent.class)
\r
55 public void initialize() {
\r
56 this.externalTaskService =
\r
57 BpmPlatform.getProcessEngineService()
\r
58 .getProcessEngine(OtfCamundaConfiguration.processEngineName)
\r
59 .getExternalTaskService();
\r
61 pollIntervalInMillis = ThreadLocalRandom.current().nextLong(500, 5000);
\r
62 // this.externalTaskService =
\r
63 // BpmPlatform.getProcessEngineService()
\r
64 // .getProcessEngine(OtfCamundaConfiguration.processEngineName)
\r
65 // .getExternalTaskService();
\r
68 "Initializing external task service with poll interval at {}", pollIntervalInMillis);
\r
69 externalTasks = new ArrayList<>();
\r
70 isEnabled = this.executorsActive;
\r
71 logger.info("External Task Worker otf.camunda.executors-active set to : " + this.executorsActive);
\r
81 Thread.sleep(pollIntervalInMillis);
\r
82 } catch (Exception e) {
\r
83 logger.error(e.getMessage());
\r
91 private void acquire() {
\r
92 externalTasks.clear();
\r
93 List<LockedExternalTask> externalTasks =
\r
95 .fetchAndLock(10, "etw_" + OtfCamundaConfiguration.processEngineName)
\r
96 .topic("vth", 43200000)
\r
97 .enableCustomObjectDeserialization()
\r
98 .topic("testInstance", 43200000)
\r
99 .enableCustomObjectDeserialization()
\r
101 externalTasks.forEach(this::handleExternalTask);
\r
104 private void handleExternalTask(LockedExternalTask task) {
\r
105 logger.info("[" + task.getId() + "]: Handling external task for topic: " + task.getTopicName());
\r
106 String topicName = task.getTopicName();
\r
107 ExternalTaskCallable callable;
\r
109 // Set retries to 0 for the current task.
\r
110 // externalTaskService.setRetries(task.getId(), 0);
\r
112 switch (topicName) {
\r
114 callable = new ExternalTaskCallable(task, OtfExternalTask.VTH);
\r
116 case "testInstance":
\r
117 callable = new ExternalTaskCallable(task, OtfExternalTask.TEST_INSTANCE);
\r
120 String err = String.format("The topic name %s has no external task handler.", topicName);
\r
122 externalTaskService.handleFailure(task.getId(), task.getWorkerId(), err, 0, 0);
\r
127 ThreadFactory namedThreadFactory =
\r
128 new ThreadFactoryBuilder().setNameFormat("etw-" + task.getTopicName() + "-%d").build();
\r
129 namedThreadFactory.newThread(callable).start();
\r
130 } catch (Exception e) {
\r
131 externalTaskService.handleFailure(
\r
132 task.getId(), task.getWorkerId(), e.getMessage(), e.toString(), 0, 0);
\r
136 public enum OtfExternalTask {
\r
141 public class ExternalTaskCallable implements Runnable {
\r
143 private final LockedExternalTask task;
\r
144 private final OtfExternalTask type;
\r
146 private final String activityId;
\r
147 private final String processDefinitionId;
\r
148 private final String processInstanceId;
\r
149 private final String processBusinessKey;
\r
150 private VariableMap variables;
\r
152 private ExternalTaskCallable(LockedExternalTask lockedExternalTask, OtfExternalTask type) {
\r
153 this.task = lockedExternalTask;
\r
156 this.activityId = task.getActivityId();
\r
157 this.processDefinitionId = task.getProcessDefinitionId();
\r
158 this.processInstanceId = task.getProcessInstanceId();
\r
159 this.processBusinessKey = task.getBusinessKey();
\r
160 this.variables = task.getVariables();
\r
164 public void run() {
\r
166 if (type == OtfExternalTask.VTH) {
\r
167 callTestHeadDelegate.callTestHead(
\r
168 activityId, processDefinitionId, processInstanceId, processBusinessKey, variables);
\r
169 } else if (type == OtfExternalTask.TEST_INSTANCE) {
\r
170 runTestInstanceDelegate.runTestInstance(activityId, processInstanceId, variables);
\r
174 "Could not find the appropriate function for external task with id %s.", type));
\r
176 } catch (Exception e) {
\r
177 String err = String.format("Encountered error %s", e.getMessage());
\r
178 externalTaskService.handleFailure(
\r
179 task.getId(), task.getWorkerId(), e.getMessage(), err, 0, 0);
\r
183 synchronized (externalTaskService) {
\r
185 externalTaskService.complete(task.getId(), task.getWorkerId(), variables);
\r
186 } catch (Exception e) {
\r
187 String err = String.format("Encountered error %s", e.getMessage());
\r
188 e.printStackTrace();
\r
189 externalTaskService.handleFailure(
\r
190 task.getId(), task.getWorkerId(), e.getMessage(), err, 0, 0);
\r