1 /* Copyright (c) 2019 AT&T Intellectual Property. #
\r
3 # Licensed under the Apache License, Version 2.0 (the "License"); #
\r
4 # you may not use this file except in compliance with the License. #
\r
5 # You may obtain a copy of the License at #
\r
7 # http://www.apache.org/licenses/LICENSE-2.0 #
\r
9 # Unless required by applicable law or agreed to in writing, software #
\r
10 # distributed under the License is distributed on an "AS IS" BASIS, #
\r
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. #
\r
12 # See the License for the specific language governing permissions and #
\r
13 # limitations under the License. #
\r
14 ##############################################################################*/
\r
17 package org.oran.otf.camunda.service;
\r
19 import static org.springframework.data.mongodb.core.query.Criteria.where;
\r
21 import org.oran.otf.camunda.configuration.OtfCamundaConfiguration;
\r
22 import org.oran.otf.camunda.model.ExecutionConstants.TestResult;
\r
23 import org.oran.otf.camunda.workflow.utility.WorkflowTask;
\r
24 import org.oran.otf.common.model.TestExecution;
\r
26 import org.oran.otf.service.impl.DeveloperServiceImpl;
\r
27 import java.util.ArrayList;
\r
28 import java.util.HashSet;
\r
29 import java.util.Iterator;
\r
30 import java.util.List;
\r
31 import java.util.Set;
\r
33 import org.camunda.bpm.BpmPlatform;
\r
34 import org.camunda.bpm.engine.OptimisticLockingException;
\r
35 import org.camunda.bpm.engine.RuntimeService;
\r
36 import org.camunda.bpm.engine.runtime.ProcessInstance;
\r
37 import org.slf4j.Logger;
\r
38 import org.slf4j.LoggerFactory;
\r
39 import org.springframework.beans.factory.annotation.Autowired;
\r
40 import org.springframework.data.mongodb.core.BulkOperations;
\r
41 import org.springframework.data.mongodb.core.BulkOperations.BulkMode;
\r
42 import org.springframework.data.mongodb.core.MongoTemplate;
\r
43 import org.springframework.data.mongodb.core.query.Query;
\r
44 import org.springframework.data.mongodb.core.query.Update;
\r
45 import org.springframework.stereotype.Component;
\r
48 public class CamundaShutdown {
\r
50 private Logger logger = LoggerFactory.getLogger(DeveloperServiceImpl.class);
\r
53 private MongoTemplate mongoTemplate;
\r
55 public CamundaShutdown(){}
\r
57 //TODO: delete unused code
\r
58 public Set<String> gracefulShutdown(){
\r
59 Set<String> processIds = new HashSet<>();
\r
62 if (!WorkflowTask.workflowTasksByExecutionId.isEmpty()) {
\r
63 processIds = WorkflowTask.workflowTasksByExecutionId.keySet();
\r
64 if (processIds != null) {
\r
65 suspendTasks(processIds);
\r
66 //1. Update processes running as TERMINATED
\r
67 BulkOperations updates = prepareBatchUpdate(processIds);
\r
70 //3.kill poolthreads
\r
71 processIds = this.shutdownAllProcessThreads(processIds);
\r
72 //this.shutdownAllProcessThreads(processIds);
\r
74 //2.look up process instances and delete the suspeded processes
\r
75 processIds = queryProcessInstances(processIds);
\r
79 }catch (OptimisticLockingException e){
\r
80 //4. Update processes running as TERMINATED
\r
81 BulkOperations threadsInterrupted = prepareBatchUpdate(processIds);
\r
82 threadsInterrupted.execute();
\r
83 logger.info("Optimistic error was caught by graceful shutdown method");
\r
87 private void suspendTasks(Set<String> processIds){
\r
88 RuntimeService runtimeService = BpmPlatform.getProcessEngineService().getProcessEngine(
\r
89 OtfCamundaConfiguration.processEngineName).getRuntimeService();
\r
90 for(String id: processIds){
\r
91 runtimeService.suspendProcessInstanceById(id);
\r
95 private Set<String> queryProcessInstances(Set<String> processIds){
\r
96 RuntimeService runtimeService = BpmPlatform.getProcessEngineService().getProcessEngine(
\r
97 OtfCamundaConfiguration.processEngineName).getRuntimeService();
\r
98 for(String id: processIds){
\r
99 ProcessInstance instance = runtimeService.createProcessInstanceQuery().processInstanceId(id).singleResult();
\r
100 if(instance == null || instance.isEnded()){
\r
101 processIds.remove(id);
\r
104 List<String> del = new ArrayList<>(processIds);
\r
105 runtimeService.deleteProcessInstances(del, "Camunda Shutting down, proccess forcefully terminated", false, false , false);
\r
110 private Set<String> shutdownAllProcessThreads(Set<String> processIds){
\r
111 Set<String> terminatedProcesses = new HashSet<>();
\r
112 Iterator processes = processIds.iterator();
\r
113 //Iterator processes = WorkflowTask.workflowTasksByExecutionId.entrySet().iterator();
\r
114 while(processes.hasNext()){
\r
115 Object processHolder = processes.next();
\r
116 List<WorkflowTask> tasks = WorkflowTask.workflowTasksByExecutionId.get(processHolder.toString());
\r
117 //List<WorkflowTask> tasks = WorkflowTask.workflowTasksByExecutionId.get(processes.next());
\r
119 terminatedProcesses.add(processHolder.toString());
\r
120 for(WorkflowTask task: tasks){
\r
121 task.shutdown(true);
\r
126 //processIds.remove(processes.next());
\r
129 return terminatedProcesses;
\r
131 private BulkOperations prepareBatchUpdate(Set<String> processIds){
\r
132 //Set<String> processInstanceIds = this.runningProcessInstanceIds();
\r
133 Iterator<String> ids = processIds.iterator();//processInstanceIds.iterator();
\r
134 BulkOperations bulkOperations = mongoTemplate.bulkOps(BulkMode.ORDERED, TestExecution.class);
\r
135 while(ids.hasNext()){
\r
137 //Get tasks by processInstanceId
\r
138 Update update = new Update().set("testResult", TestResult.TERMINATED).set("testResultMessage", "Camunda application had to shutdown for maintenance, Test execution was TERMINATED");
\r
139 bulkOperations.updateOne(Query.query(where("processInstanceId").is(ids.next())), update);
\r
141 return bulkOperations;
\r