From 92b3ba8e1e159046188dbadf9913ef1ac15ef506 Mon Sep 17 00:00:00 2001 From: Boris Klinker Date: Tue, 5 Apr 2022 14:12:46 +0200 Subject: [PATCH 1/2] Fix: Completion of multi-instance call activities when in async execution --- .../service/event/impl/FlowableJobEventBuilder.java | 9 +++++++-- .../impl/asyncexecutor/DefaultJobManager.java | 13 +++++++++++++ 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/modules/flowable-job-service/src/main/java/org/flowable/job/service/event/impl/FlowableJobEventBuilder.java b/modules/flowable-job-service/src/main/java/org/flowable/job/service/event/impl/FlowableJobEventBuilder.java index d6295543a6c..89ede583a7c 100644 --- a/modules/flowable-job-service/src/main/java/org/flowable/job/service/event/impl/FlowableJobEventBuilder.java +++ b/modules/flowable-job-service/src/main/java/org/flowable/job/service/event/impl/FlowableJobEventBuilder.java @@ -67,8 +67,13 @@ protected static void populateEventWithCurrentContext(FlowableEngineEventImpl ev if (persistedObject instanceof Job) { Job jobObject = (Job) persistedObject; if (jobObject.getScopeType() == null) { - event.setExecutionId(jobObject.getExecutionId()); - event.setProcessInstanceId(jobObject.getProcessInstanceId()); + if ("async-complete-call-actiivty".equals(jobObject.getJobHandlerType())) { + event.setExecutionId(jobObject.getJobHandlerConfiguration()); + event.setProcessInstanceId(jobObject.getJobHandlerConfiguration()); + } else { + event.setExecutionId(jobObject.getExecutionId()); + event.setProcessInstanceId(jobObject.getProcessInstanceId()); + } event.setProcessDefinitionId(jobObject.getProcessDefinitionId()); } else { event.setScopeType(jobObject.getScopeType()); diff --git a/modules/flowable-job-service/src/main/java/org/flowable/job/service/impl/asyncexecutor/DefaultJobManager.java b/modules/flowable-job-service/src/main/java/org/flowable/job/service/impl/asyncexecutor/DefaultJobManager.java index 2e4be056ee0..8ec85acce4e 100644 --- a/modules/flowable-job-service/src/main/java/org/flowable/job/service/impl/asyncexecutor/DefaultJobManager.java +++ b/modules/flowable-job-service/src/main/java/org/flowable/job/service/impl/asyncexecutor/DefaultJobManager.java @@ -735,6 +735,14 @@ public JobEntity createExecutableJobFromOtherJob(AbstractRuntimeJobEntity job) { protected JobEntity createExecutableJobFromOtherJob(AbstractRuntimeJobEntity job, boolean lockJob) { JobEntity executableJob = jobServiceConfiguration.getJobEntityManager().create(); copyJobInfo(executableJob, job); + if (job instanceof DeadLetterJobEntity && "async-complete-call-actiivty".equals(job.getJobHandlerType())) { + String[] jobHandlerConfiguration = job.getJobHandlerConfiguration().split("::"); + if (jobHandlerConfiguration.length == 2) { + executableJob.setProcessInstanceId(jobHandlerConfiguration[0]); + executableJob.setExecutionId(jobHandlerConfiguration[1]); + executableJob.setJobHandlerConfiguration(job.getExecutionId()); + } + } if (lockJob) { GregorianCalendar gregorianCalendar = new GregorianCalendar(); @@ -765,6 +773,11 @@ public SuspendedJobEntity createSuspendedJobFromOtherJob(AbstractRuntimeJobEntit public DeadLetterJobEntity createDeadLetterJobFromOtherJob(AbstractRuntimeJobEntity otherJob) { DeadLetterJobEntity deadLetterJob = jobServiceConfiguration.getDeadLetterJobEntityManager().create(); copyJobInfo(deadLetterJob, otherJob); + if ("async-complete-call-actiivty".equals(otherJob.getJobHandlerType())) { + deadLetterJob.setExecutionId(otherJob.getJobHandlerConfiguration()); + deadLetterJob.setProcessInstanceId(otherJob.getJobHandlerConfiguration()); + deadLetterJob.setJobHandlerConfiguration(otherJob.getProcessInstanceId() + "::" + otherJob.getExecutionId()); + } sendMoveToDeadletterEvent(otherJob); return deadLetterJob; } From 7ab9360f734e911dd730f980b200b975c3699950 Mon Sep 17 00:00:00 2001 From: Boris Klinker Date: Thu, 8 Sep 2022 10:10:59 +0200 Subject: [PATCH 2/2] Tests: failing subflow completion w/ completeAsync true/false --- .../test/api/event/CallActivityTest.java | 81 +++++++++++++++++++ ...eteCompletionConditionException.bpmn20.xml | 45 +++++++++++ ...ityCompletionConditionException.bpmn20.xml | 45 +++++++++++ 3 files changed, 171 insertions(+) create mode 100644 modules/flowable-engine/src/test/resources/org/flowable/engine/test/api/event/CallActivityTest.testCallActivityAsyncCompleteCompletionConditionException.bpmn20.xml create mode 100644 modules/flowable-engine/src/test/resources/org/flowable/engine/test/api/event/CallActivityTest.testCallActivityCompletionConditionException.bpmn20.xml diff --git a/modules/flowable-engine/src/test/java/org/flowable/engine/test/api/event/CallActivityTest.java b/modules/flowable-engine/src/test/java/org/flowable/engine/test/api/event/CallActivityTest.java index 7eaa7f16495..2a244270145 100644 --- a/modules/flowable-engine/src/test/java/org/flowable/engine/test/api/event/CallActivityTest.java +++ b/modules/flowable-engine/src/test/java/org/flowable/engine/test/api/event/CallActivityTest.java @@ -35,6 +35,7 @@ import org.flowable.engine.impl.persistence.entity.ExecutionEntity; import org.flowable.engine.impl.test.PluggableFlowableTestCase; import org.flowable.engine.repository.ProcessDefinition; +import org.flowable.engine.runtime.Execution; import org.flowable.engine.runtime.ProcessInstance; import org.flowable.engine.test.Deployment; import org.flowable.job.api.Job; @@ -706,6 +707,86 @@ public void testCallActivityAsyncCompleteRealExecutor() { assertThat(runtimeService.createProcessInstanceQuery().count()).isZero(); } + @Test + @Deployment(resources = { + "org/flowable/engine/test/api/event/CallActivityTest.testCallActivityCompletionConditionException.bpmn20.xml", + "org/flowable/engine/test/api/event/CallActivityTest.testCallActivityAsyncComplete_subprocess.bpmn20.xml" + }) + @DisabledIfSystemProperty(named = "disableWhen", matches = "cockroachdb") + public void testCallActivityWithCompletionConditionException() { + testCallActivityWithCompletionConditionExceptionRealExecutor(); + } + + @Test + @Deployment(resources = { + "org/flowable/engine/test/api/event/CallActivityTest.testCallActivityAsyncCompleteCompletionConditionException.bpmn20.xml", + "org/flowable/engine/test/api/event/CallActivityTest.testCallActivityAsyncComplete_subprocess.bpmn20.xml" + }) + @DisabledIfSystemProperty(named = "disableWhen", matches = "cockroachdb") + public void testCallActivityAsyncCompleteWithCompletionConditionException() { + testCallActivityWithCompletionConditionExceptionRealExecutor(); + } + + private void testCallActivityWithCompletionConditionExceptionRealExecutor() { + ProcessInstance processInstance = runtimeService.startProcessInstanceByKey("testCompletionConditionException"); + waitForJobExecutorOnCondition(20000L, 200L, () -> numberOfDeadLetterJobsWithException() == 3); + assertThat(runtimeService.createProcessInstanceQuery().count()).isEqualTo(4); + + verifyFailingSubProcesses(processInstance); + + // fix context + Execution multiInstanceRootExecution = runtimeService.createExecutionQuery() + .rootProcessInstanceId(processInstance.getId()) + .variableExists("nrOfCompletedInstances") + .singleResult(); + runtimeService.setVariableLocal(multiInstanceRootExecution.getId(), "approved", false); + + // and retry + if ((Integer) runtimeService.getVariable(multiInstanceRootExecution.getId(), "nrOfCompletedInstances") > 0) { + // When flowable:completeAsync="true" the nrOfCompletedInstances is increased in case of failure too. + // We have to reset the nrOfCompletedInstances to 0, so that all jobs created from moveDeadLetterJobToExecutableJob are executed. + // Without this reset the process gets completed during the execution of the first job. + // The remaining activities are not executed. One test fails because of missing end time on the historic activities. + // This behavior can be observed independent of our fix. + // It was introduced by commit 65f81864904a2aa565adfa2a350a459d3bb0a893 Improve parallel multi instance leave to avoid optimistic locking exceptions + runtimeService.setVariableLocal(multiInstanceRootExecution.getId(), "nrOfCompletedInstances", 0); + } + List deadLetterJobs = managementService.createDeadLetterJobQuery().withException().list(); + deadLetterJobs.forEach(job -> managementService.moveDeadLetterJobToExecutableJob(job.getId(), 2)); + + waitForJobExecutorToProcessAllJobsAndExecutableTimerJobs(20000L, 200L); + assertThat(managementService.createDeadLetterJobQuery().withException().count()).isZero(); + } + + private long numberOfDeadLetterJobsWithException() { + return managementService.createDeadLetterJobQuery().withException().count(); + } + + private void verifyFailingSubProcesses(ProcessInstance processInstance) { + assertThat(processInstance.isSuspended()).isFalse(); + + List deadLetterJobs = managementService.createDeadLetterJobQuery().withException().list(); + List subProcessExecutions = runtimeService.createExecutionQuery() + .rootProcessInstanceId(processInstance.getId()) + .onlySubProcessExecutions() + .list(); + assertThat(deadLetterJobs.size()).isEqualTo(subProcessExecutions.size()); + + List subProcessInstanceIds = subProcessExecutions.stream().map(Execution::getProcessInstanceId).collect(Collectors.toList()); + + deadLetterJobs.forEach(job -> { + ProcessDefinition processDefinition = repositoryService.createProcessDefinitionQuery().processDefinitionId(job.getProcessDefinitionId()) + .singleResult(); + assertThat(processDefinition.getKey()).isEqualTo("subProcess"); + + assertThat(job.getProcessInstanceId()).isNotEqualTo(processInstance.getId()); + assertThat(job.getProcessInstanceId()).isIn(subProcessInstanceIds); + + Execution subProcessChildExecution = runtimeService.createExecutionQuery().executionId(job.getExecutionId()).singleResult(); + assertThat(subProcessChildExecution.getProcessInstanceId()).isIn(subProcessInstanceIds); + }); + } + @Test @Deployment(resources = { "org/flowable/engine/test/api/event/CallActivityTest.testCallActivityWithEventSubprocessParent.bpmn20.xml", diff --git a/modules/flowable-engine/src/test/resources/org/flowable/engine/test/api/event/CallActivityTest.testCallActivityAsyncCompleteCompletionConditionException.bpmn20.xml b/modules/flowable-engine/src/test/resources/org/flowable/engine/test/api/event/CallActivityTest.testCallActivityAsyncCompleteCompletionConditionException.bpmn20.xml new file mode 100644 index 00000000000..ff1b7e1967e --- /dev/null +++ b/modules/flowable-engine/src/test/resources/org/flowable/engine/test/api/event/CallActivityTest.testCallActivityAsyncCompleteCompletionConditionException.bpmn20.xml @@ -0,0 +1,45 @@ + + + + + + + + + 3 + #{execution.getVariable('approved')} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/modules/flowable-engine/src/test/resources/org/flowable/engine/test/api/event/CallActivityTest.testCallActivityCompletionConditionException.bpmn20.xml b/modules/flowable-engine/src/test/resources/org/flowable/engine/test/api/event/CallActivityTest.testCallActivityCompletionConditionException.bpmn20.xml new file mode 100644 index 00000000000..0df07ee8bcb --- /dev/null +++ b/modules/flowable-engine/src/test/resources/org/flowable/engine/test/api/event/CallActivityTest.testCallActivityCompletionConditionException.bpmn20.xml @@ -0,0 +1,45 @@ + + + + + + + + + 3 + #{execution.getVariable('approved')} + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file