X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?a=blobdiff_plain;ds=sidebyside;f=otf-camunda%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fotf%2Fcamunda%2Fservice%2FCamundaShutdown.java;fp=otf-camunda%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fotf%2Fcamunda%2Fservice%2FCamundaShutdown.java;h=c41452831730e3ae80dcdd2e914da99279b45749;hb=14f6f95c84a4a1fa8774190db4a03fd0214ec55f;hp=0000000000000000000000000000000000000000;hpb=f49bd1efeaaddd4891c1f329b18d8cfb28b3e75b;p=it%2Fotf.git diff --git a/otf-camunda/src/main/java/org/oran/otf/camunda/service/CamundaShutdown.java b/otf-camunda/src/main/java/org/oran/otf/camunda/service/CamundaShutdown.java new file mode 100644 index 0000000..c414528 --- /dev/null +++ b/otf-camunda/src/main/java/org/oran/otf/camunda/service/CamundaShutdown.java @@ -0,0 +1,143 @@ +/* Copyright (c) 2019 AT&T Intellectual Property. # +# # +# Licensed under the Apache License, Version 2.0 (the "License"); # +# you may not use this file except in compliance with the License. # +# You may obtain a copy of the License at # +# # +# http://www.apache.org/licenses/LICENSE-2.0 # +# # +# Unless required by applicable law or agreed to in writing, software # +# distributed under the License is distributed on an "AS IS" BASIS, # +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # +# See the License for the specific language governing permissions and # +# limitations under the License. # +##############################################################################*/ + + +package org.oran.otf.camunda.service; + +import static org.springframework.data.mongodb.core.query.Criteria.where; + +import org.oran.otf.camunda.configuration.OtfCamundaConfiguration; +import org.oran.otf.camunda.model.ExecutionConstants.TestResult; +import org.oran.otf.camunda.workflow.utility.WorkflowTask; +import org.oran.otf.common.model.TestExecution; + +import org.oran.otf.service.impl.DeveloperServiceImpl; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.camunda.bpm.BpmPlatform; +import org.camunda.bpm.engine.OptimisticLockingException; +import org.camunda.bpm.engine.RuntimeService; +import org.camunda.bpm.engine.runtime.ProcessInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.data.mongodb.core.BulkOperations; +import org.springframework.data.mongodb.core.BulkOperations.BulkMode; +import org.springframework.data.mongodb.core.MongoTemplate; +import org.springframework.data.mongodb.core.query.Query; +import org.springframework.data.mongodb.core.query.Update; +import org.springframework.stereotype.Component; + +@Component +public class CamundaShutdown { + + private Logger logger = LoggerFactory.getLogger(DeveloperServiceImpl.class); + + @Autowired + private MongoTemplate mongoTemplate; + + public CamundaShutdown(){} + + //TODO: delete unused code + public Set gracefulShutdown(){ + Set processIds = new HashSet<>(); + + try { + if (!WorkflowTask.workflowTasksByExecutionId.isEmpty()) { + processIds = WorkflowTask.workflowTasksByExecutionId.keySet(); + if (processIds != null) { + suspendTasks(processIds); + //1. Update processes running as TERMINATED + BulkOperations updates = prepareBatchUpdate(processIds); + updates.execute(); + + //3.kill poolthreads + processIds = this.shutdownAllProcessThreads(processIds); + //this.shutdownAllProcessThreads(processIds); + + //2.look up process instances and delete the suspeded processes + processIds = queryProcessInstances(processIds); + + } + } + }catch (OptimisticLockingException e){ + //4. Update processes running as TERMINATED + BulkOperations threadsInterrupted = prepareBatchUpdate(processIds); + threadsInterrupted.execute(); + logger.info("Optimistic error was caught by graceful shutdown method"); + } + return processIds; + } + private void suspendTasks(Set processIds){ + RuntimeService runtimeService = BpmPlatform.getProcessEngineService().getProcessEngine( + OtfCamundaConfiguration.processEngineName).getRuntimeService(); + for(String id: processIds){ + runtimeService.suspendProcessInstanceById(id); + } + } + + private Set queryProcessInstances(Set processIds){ + RuntimeService runtimeService = BpmPlatform.getProcessEngineService().getProcessEngine( + OtfCamundaConfiguration.processEngineName).getRuntimeService(); + for(String id: processIds){ + ProcessInstance instance = runtimeService.createProcessInstanceQuery().processInstanceId(id).singleResult(); + if(instance == null || instance.isEnded()){ + processIds.remove(id); + } + } + List del = new ArrayList<>(processIds); + runtimeService.deleteProcessInstances(del, "Camunda Shutting down, proccess forcefully terminated", false, false , false); + return processIds; + + } + + private Set shutdownAllProcessThreads(Set processIds){ + Set terminatedProcesses = new HashSet<>(); + Iterator processes = processIds.iterator(); + //Iterator processes = WorkflowTask.workflowTasksByExecutionId.entrySet().iterator(); + while(processes.hasNext()){ + Object processHolder = processes.next(); + List tasks = WorkflowTask.workflowTasksByExecutionId.get(processHolder.toString()); + //List tasks = WorkflowTask.workflowTasksByExecutionId.get(processes.next()); + if(tasks != null){ + terminatedProcesses.add(processHolder.toString()); + for(WorkflowTask task: tasks){ + task.shutdown(true); + } + } + + else{ + //processIds.remove(processes.next()); + } + } + return terminatedProcesses; + } + private BulkOperations prepareBatchUpdate(Set processIds){ + //Set processInstanceIds = this.runningProcessInstanceIds(); + Iterator ids = processIds.iterator();//processInstanceIds.iterator(); + BulkOperations bulkOperations = mongoTemplate.bulkOps(BulkMode.ORDERED, TestExecution.class); + while(ids.hasNext()){ + ids.hasNext(); + //Get tasks by processInstanceId + Update update = new Update().set("testResult", TestResult.TERMINATED).set("testResultMessage", "Camunda application had to shutdown for maintenance, Test execution was TERMINATED"); + bulkOperations.updateOne(Query.query(where("processInstanceId").is(ids.next())), update); + } + return bulkOperations; + } +}