X-Git-Url: https://gerrit.o-ran-sc.org/r/gitweb?p=it%2Fotf.git;a=blobdiff_plain;f=otf-camunda%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fotf%2Fcamunda%2Fservice%2FOtfExternalTaskService.java;fp=otf-camunda%2Fsrc%2Fmain%2Fjava%2Forg%2Foran%2Fotf%2Fcamunda%2Fservice%2FOtfExternalTaskService.java;h=c23d1cbdcb1c26b48c52212c7e8384f49e2f4b70;hp=0000000000000000000000000000000000000000;hb=14f6f95c84a4a1fa8774190db4a03fd0214ec55f;hpb=f49bd1efeaaddd4891c1f329b18d8cfb28b3e75b diff --git a/otf-camunda/src/main/java/org/oran/otf/camunda/service/OtfExternalTaskService.java b/otf-camunda/src/main/java/org/oran/otf/camunda/service/OtfExternalTaskService.java new file mode 100644 index 0000000..c23d1cb --- /dev/null +++ b/otf-camunda/src/main/java/org/oran/otf/camunda/service/OtfExternalTaskService.java @@ -0,0 +1,195 @@ +/* Copyright (c) 2019 AT&T Intellectual Property. # +# # +# Licensed under the Apache License, Version 2.0 (the "License"); # +# you may not use this file except in compliance with the License. # +# You may obtain a copy of the License at # +# # +# http://www.apache.org/licenses/LICENSE-2.0 # +# # +# Unless required by applicable law or agreed to in writing, software # +# distributed under the License is distributed on an "AS IS" BASIS, # +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # +# See the License for the specific language governing permissions and # +# limitations under the License. # +##############################################################################*/ + + +package org.oran.otf.camunda.service; + +import org.oran.otf.camunda.configuration.OtfCamundaConfiguration; +import org.oran.otf.camunda.delegate.otf.common.CallTestHeadDelegate; +import org.oran.otf.camunda.delegate.otf.common.RunTestInstanceDelegate; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadLocalRandom; +import org.camunda.bpm.BpmPlatform; +import org.camunda.bpm.engine.ExternalTaskService; +import org.camunda.bpm.engine.externaltask.LockedExternalTask; +import org.camunda.bpm.engine.variable.VariableMap; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.boot.context.event.ApplicationReadyEvent; +import org.springframework.context.event.EventListener; +import org.springframework.stereotype.Component; + +@Component +public class OtfExternalTaskService { + + private static Logger logger = LoggerFactory.getLogger(OtfExternalTaskService.class); + public static boolean isEnabled; + private static long pollIntervalInMillis = 1000; + @Autowired CallTestHeadDelegate callTestHeadDelegate; + @Autowired RunTestInstanceDelegate runTestInstanceDelegate; + private ExternalTaskService externalTaskService; + + private List externalTasks; + + @Value("${otf.camunda.executors-active}") + private boolean executorsActive; + + @EventListener(ApplicationReadyEvent.class) + public void initialize() { + this.externalTaskService = + BpmPlatform.getProcessEngineService() + .getProcessEngine(OtfCamundaConfiguration.processEngineName) + .getExternalTaskService(); + + pollIntervalInMillis = ThreadLocalRandom.current().nextLong(500, 5000); + // this.externalTaskService = + // BpmPlatform.getProcessEngineService() + // .getProcessEngine(OtfCamundaConfiguration.processEngineName) + // .getExternalTaskService(); + + logger.info( + "Initializing external task service with poll interval at {}", pollIntervalInMillis); + externalTasks = new ArrayList<>(); + isEnabled = this.executorsActive; + logger.info("External Task Worker otf.camunda.executors-active set to : " + this.executorsActive); + Thread t = + new Thread( + () -> { + while (true) { + try { + if (isEnabled) { + acquire(); + } + + Thread.sleep(pollIntervalInMillis); + } catch (Exception e) { + logger.error(e.getMessage()); + } + } + }); + + t.start(); + } + + private void acquire() { + externalTasks.clear(); + List externalTasks = + externalTaskService + .fetchAndLock(10, "etw_" + OtfCamundaConfiguration.processEngineName) + .topic("vth", 43200000) + .enableCustomObjectDeserialization() + .topic("testInstance", 43200000) + .enableCustomObjectDeserialization() + .execute(); + externalTasks.forEach(this::handleExternalTask); + } + + private void handleExternalTask(LockedExternalTask task) { + logger.info("[" + task.getId() + "]: Handling external task for topic: " + task.getTopicName()); + String topicName = task.getTopicName(); + ExternalTaskCallable callable; + + // Set retries to 0 for the current task. + // externalTaskService.setRetries(task.getId(), 0); + + switch (topicName) { + case "vth": + callable = new ExternalTaskCallable(task, OtfExternalTask.VTH); + break; + case "testInstance": + callable = new ExternalTaskCallable(task, OtfExternalTask.TEST_INSTANCE); + break; + default: + String err = String.format("The topic name %s has no external task handler.", topicName); + logger.error(err); + externalTaskService.handleFailure(task.getId(), task.getWorkerId(), err, 0, 0); + return; + } + + try { + ThreadFactory namedThreadFactory = + new ThreadFactoryBuilder().setNameFormat("etw-" + task.getTopicName() + "-%d").build(); + namedThreadFactory.newThread(callable).start(); + } catch (Exception e) { + externalTaskService.handleFailure( + task.getId(), task.getWorkerId(), e.getMessage(), e.toString(), 0, 0); + } + } + + public enum OtfExternalTask { + VTH, + TEST_INSTANCE + } + + public class ExternalTaskCallable implements Runnable { + + private final LockedExternalTask task; + private final OtfExternalTask type; + + private final String activityId; + private final String processDefinitionId; + private final String processInstanceId; + private final String processBusinessKey; + private VariableMap variables; + + private ExternalTaskCallable(LockedExternalTask lockedExternalTask, OtfExternalTask type) { + this.task = lockedExternalTask; + this.type = type; + + this.activityId = task.getActivityId(); + this.processDefinitionId = task.getProcessDefinitionId(); + this.processInstanceId = task.getProcessInstanceId(); + this.processBusinessKey = task.getBusinessKey(); + this.variables = task.getVariables(); + } + + @Override + public void run() { + try { + if (type == OtfExternalTask.VTH) { + callTestHeadDelegate.callTestHead( + activityId, processDefinitionId, processInstanceId, processBusinessKey, variables); + } else if (type == OtfExternalTask.TEST_INSTANCE) { + runTestInstanceDelegate.runTestInstance(activityId, processInstanceId, variables); + } else { + logger.error( + String.format( + "Could not find the appropriate function for external task with id %s.", type)); + } + } catch (Exception e) { + String err = String.format("Encountered error %s", e.getMessage()); + externalTaskService.handleFailure( + task.getId(), task.getWorkerId(), e.getMessage(), err, 0, 0); + return; + } + + synchronized (externalTaskService) { + try { + externalTaskService.complete(task.getId(), task.getWorkerId(), variables); + } catch (Exception e) { + String err = String.format("Encountered error %s", e.getMessage()); + e.printStackTrace(); + externalTaskService.handleFailure( + task.getId(), task.getWorkerId(), e.getMessage(), err, 0, 0); + } + } + } + } +}