added svcapi ui and camunda code
[it/otf.git] / otf-camunda / src / main / java / org / oran / otf / camunda / service / OtfExternalTaskService.java
1 /*  Copyright (c) 2019 AT&T Intellectual Property.                             #\r
2 #                                                                              #\r
3 #   Licensed under the Apache License, Version 2.0 (the "License");            #\r
4 #   you may not use this file except in compliance with the License.           #\r
5 #   You may obtain a copy of the License at                                    #\r
6 #                                                                              #\r
7 #       http://www.apache.org/licenses/LICENSE-2.0                             #\r
8 #                                                                              #\r
9 #   Unless required by applicable law or agreed to in writing, software        #\r
10 #   distributed under the License is distributed on an "AS IS" BASIS,          #\r
11 #   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.   #\r
12 #   See the License for the specific language governing permissions and        #\r
13 #   limitations under the License.                                             #\r
14 ##############################################################################*/\r
15 \r
16 \r
17 package org.oran.otf.camunda.service;\r
18 \r
19 import org.oran.otf.camunda.configuration.OtfCamundaConfiguration;\r
20 import org.oran.otf.camunda.delegate.otf.common.CallTestHeadDelegate;\r
21 import org.oran.otf.camunda.delegate.otf.common.RunTestInstanceDelegate;\r
22 import com.google.common.util.concurrent.ThreadFactoryBuilder;\r
23 import java.util.ArrayList;\r
24 import java.util.List;\r
25 import java.util.concurrent.ThreadFactory;\r
26 import java.util.concurrent.ThreadLocalRandom;\r
27 import org.camunda.bpm.BpmPlatform;\r
28 import org.camunda.bpm.engine.ExternalTaskService;\r
29 import org.camunda.bpm.engine.externaltask.LockedExternalTask;\r
30 import org.camunda.bpm.engine.variable.VariableMap;\r
31 import org.slf4j.Logger;\r
32 import org.slf4j.LoggerFactory;\r
33 import org.springframework.beans.factory.annotation.Autowired;\r
34 import org.springframework.beans.factory.annotation.Value;\r
35 import org.springframework.boot.context.event.ApplicationReadyEvent;\r
36 import org.springframework.context.event.EventListener;\r
37 import org.springframework.stereotype.Component;\r
38 \r
39 @Component\r
40 public class OtfExternalTaskService {\r
41 \r
42   private static Logger logger = LoggerFactory.getLogger(OtfExternalTaskService.class);\r
43   public static boolean isEnabled;\r
44   private static long pollIntervalInMillis = 1000;\r
45   @Autowired CallTestHeadDelegate callTestHeadDelegate;\r
46   @Autowired RunTestInstanceDelegate runTestInstanceDelegate;\r
47   private ExternalTaskService externalTaskService;\r
48 \r
49   private List<LockedExternalTask> externalTasks;\r
50 \r
51   @Value("${otf.camunda.executors-active}")\r
52   private boolean executorsActive;\r
53 \r
54   @EventListener(ApplicationReadyEvent.class)\r
55   public void initialize() {\r
56     this.externalTaskService =\r
57         BpmPlatform.getProcessEngineService()\r
58             .getProcessEngine(OtfCamundaConfiguration.processEngineName)\r
59             .getExternalTaskService();\r
60 \r
61     pollIntervalInMillis = ThreadLocalRandom.current().nextLong(500, 5000);\r
62     //    this.externalTaskService =\r
63     //        BpmPlatform.getProcessEngineService()\r
64     //            .getProcessEngine(OtfCamundaConfiguration.processEngineName)\r
65     //            .getExternalTaskService();\r
66 \r
67     logger.info(\r
68         "Initializing external task service with poll interval at {}", pollIntervalInMillis);\r
69     externalTasks = new ArrayList<>();\r
70     isEnabled = this.executorsActive;\r
71     logger.info("External Task Worker otf.camunda.executors-active set to : "  + this.executorsActive);\r
72     Thread t =\r
73         new Thread(\r
74             () -> {\r
75               while (true) {\r
76                 try {\r
77                   if (isEnabled) {\r
78                     acquire();\r
79                   }\r
80 \r
81                   Thread.sleep(pollIntervalInMillis);\r
82                 } catch (Exception e) {\r
83                   logger.error(e.getMessage());\r
84                 }\r
85               }\r
86             });\r
87 \r
88     t.start();\r
89   }\r
90 \r
91   private void acquire() {\r
92     externalTasks.clear();\r
93     List<LockedExternalTask> externalTasks =\r
94         externalTaskService\r
95             .fetchAndLock(10, "etw_" + OtfCamundaConfiguration.processEngineName)\r
96             .topic("vth", 43200000)\r
97             .enableCustomObjectDeserialization()\r
98             .topic("testInstance", 43200000)\r
99             .enableCustomObjectDeserialization()\r
100             .execute();\r
101     externalTasks.forEach(this::handleExternalTask);\r
102   }\r
103 \r
104   private void handleExternalTask(LockedExternalTask task) {\r
105     logger.info("[" + task.getId() + "]: Handling external task for topic: " + task.getTopicName());\r
106     String topicName = task.getTopicName();\r
107     ExternalTaskCallable callable;\r
108 \r
109     // Set retries to 0 for the current task.\r
110     // externalTaskService.setRetries(task.getId(), 0);\r
111 \r
112     switch (topicName) {\r
113       case "vth":\r
114         callable = new ExternalTaskCallable(task, OtfExternalTask.VTH);\r
115         break;\r
116       case "testInstance":\r
117         callable = new ExternalTaskCallable(task, OtfExternalTask.TEST_INSTANCE);\r
118         break;\r
119       default:\r
120         String err = String.format("The topic name %s has no external task handler.", topicName);\r
121         logger.error(err);\r
122         externalTaskService.handleFailure(task.getId(), task.getWorkerId(), err, 0, 0);\r
123         return;\r
124     }\r
125 \r
126     try {\r
127       ThreadFactory namedThreadFactory =\r
128           new ThreadFactoryBuilder().setNameFormat("etw-" + task.getTopicName() + "-%d").build();\r
129       namedThreadFactory.newThread(callable).start();\r
130     } catch (Exception e) {\r
131       externalTaskService.handleFailure(\r
132           task.getId(), task.getWorkerId(), e.getMessage(), e.toString(), 0, 0);\r
133     }\r
134   }\r
135 \r
136   public enum OtfExternalTask {\r
137     VTH,\r
138     TEST_INSTANCE\r
139   }\r
140 \r
141   public class ExternalTaskCallable implements Runnable {\r
142 \r
143     private final LockedExternalTask task;\r
144     private final OtfExternalTask type;\r
145 \r
146     private final String activityId;\r
147     private final String processDefinitionId;\r
148     private final String processInstanceId;\r
149     private final String processBusinessKey;\r
150     private VariableMap variables;\r
151 \r
152     private ExternalTaskCallable(LockedExternalTask lockedExternalTask, OtfExternalTask type) {\r
153       this.task = lockedExternalTask;\r
154       this.type = type;\r
155 \r
156       this.activityId = task.getActivityId();\r
157       this.processDefinitionId = task.getProcessDefinitionId();\r
158       this.processInstanceId = task.getProcessInstanceId();\r
159       this.processBusinessKey = task.getBusinessKey();\r
160       this.variables = task.getVariables();\r
161     }\r
162 \r
163     @Override\r
164     public void run() {\r
165       try {\r
166         if (type == OtfExternalTask.VTH) {\r
167           callTestHeadDelegate.callTestHead(\r
168               activityId, processDefinitionId, processInstanceId, processBusinessKey, variables);\r
169         } else if (type == OtfExternalTask.TEST_INSTANCE) {\r
170           runTestInstanceDelegate.runTestInstance(activityId, processInstanceId, variables);\r
171         } else {\r
172           logger.error(\r
173               String.format(\r
174                   "Could not find the appropriate function for external task with id %s.", type));\r
175         }\r
176       } catch (Exception e) {\r
177         String err = String.format("Encountered error %s", e.getMessage());\r
178         externalTaskService.handleFailure(\r
179             task.getId(), task.getWorkerId(), e.getMessage(), err, 0, 0);\r
180         return;\r
181       }\r
182 \r
183       synchronized (externalTaskService) {\r
184         try {\r
185           externalTaskService.complete(task.getId(), task.getWorkerId(), variables);\r
186         } catch (Exception e) {\r
187           String err = String.format("Encountered error %s", e.getMessage());\r
188           e.printStackTrace();\r
189           externalTaskService.handleFailure(\r
190                   task.getId(), task.getWorkerId(), e.getMessage(), err, 0, 0);\r
191         }\r
192       }\r
193     }\r
194   }\r
195 }\r