Skip to content

Commit

Permalink
Fixed missing subscriptions after migration of an active embedded non…
Browse files Browse the repository at this point in the history
… interrupting event sub process (#3928) (#3929)

* Fixed missing subscriptions after migration of an active non interrupting event sub process (#3928)

* Added missing package import

* In case of a timer event, only process event sub process when no timer job is present

* Added testcases and fix when migrating an event sub process with two started sub processes

* Fixed imports
  • Loading branch information
basclaessen authored Sep 2, 2024
1 parent 600d013 commit b89cad9
Show file tree
Hide file tree
Showing 6 changed files with 1,119 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -654,11 +654,24 @@ protected void doMoveExecutionState(ProcessInstanceChangeState processInstanceCh
}

protected void processPendingEventSubProcessesStartEvents(ProcessInstanceChangeState processInstanceChangeState, CommandContext commandContext) {
ProcessInstanceHelper processInstanceHelper = CommandContextUtil.getProcessEngineConfiguration(commandContext).getProcessInstanceHelper();
ProcessEngineConfigurationImpl processEngineConfiguration = CommandContextUtil.getProcessEngineConfiguration(commandContext);
ProcessInstanceHelper processInstanceHelper = processEngineConfiguration.getProcessInstanceHelper();
EventSubscriptionService eventSubscriptionService = processEngineConfiguration.getEventSubscriptionServiceConfiguration().getEventSubscriptionService();
ManagementService managementService = processEngineConfiguration.getManagementService();

for (Map.Entry<? extends StartEvent, ExecutionEntity> pendingStartEventEntry : processInstanceChangeState.getPendingEventSubProcessesStartEvents().entrySet()) {
StartEvent startEvent = pendingStartEventEntry.getKey();
ExecutionEntity parentExecution = pendingStartEventEntry.getValue();
if (!processInstanceChangeState.getCreatedEmbeddedSubProcesses().containsKey(startEvent.getSubProcess().getId())) {
EventDefinition eventDefinition = startEvent.getEventDefinitions().isEmpty() ? null : startEvent.getEventDefinitions().get(0);

//Process event sub process when no subscriptions/timer jobs are found
boolean processEventSubProcess = false;
if (eventDefinition instanceof TimerEventDefinition) {
processEventSubProcess = managementService.createTimerJobQuery().executionId(parentExecution.getId()).list().isEmpty();
} else {
processEventSubProcess = eventSubscriptionService.findEventSubscriptionsByExecution(parentExecution.getId()).isEmpty();
}
if (processEventSubProcess) {
processInstanceHelper.processEventSubProcess(parentExecution, (EventSubProcess) startEvent.getSubProcess(), commandContext);
}
}
Expand Down Expand Up @@ -809,7 +822,10 @@ protected List<ExecutionEntity> createEmbeddedSubProcessAndExecutions(Collection

// Build the subProcess hierarchy
for (SubProcess subProcess : subProcessesToCreate.values()) {
if (!processInstanceChangeState.getCreatedEmbeddedSubProcesses().containsKey(subProcess.getId())) {
if (subProcess instanceof EventSubProcess) {
ExecutionEntity embeddedSubProcess = createEmbeddedSubProcessHierarchy(subProcess, defaultContinueParentExecution, subProcessesToCreate, movingExecutionIds, processInstanceChangeState, commandContext);
moveExecutionEntityContainer.addCreatedEventSubProcess(subProcess.getId(), embeddedSubProcess);
} else if (!processInstanceChangeState.getCreatedEmbeddedSubProcesses().containsKey(subProcess.getId())) {
ExecutionEntity embeddedSubProcess = createEmbeddedSubProcessHierarchy(subProcess, defaultContinueParentExecution, subProcessesToCreate, movingExecutionIds, processInstanceChangeState, commandContext);
processInstanceChangeState.addCreatedEmbeddedSubProcess(subProcess.getId(), embeddedSubProcess);
}
Expand All @@ -820,7 +836,9 @@ protected List<ExecutionEntity> createEmbeddedSubProcessAndExecutions(Collection
for (FlowElementMoveEntry flowElementMoveEntry : moveToFlowElements) {
FlowElement newFlowElement = flowElementMoveEntry.getNewFlowElement();
ExecutionEntity parentExecution;
if (newFlowElement.getSubProcess() != null && processInstanceChangeState.getCreatedEmbeddedSubProcesses().containsKey(newFlowElement.getSubProcess().getId())) {
if (newFlowElement.getSubProcess() != null && moveExecutionEntityContainer.getCreatedEventSubProcess(newFlowElement.getSubProcess().getId()) != null) {
parentExecution = moveExecutionEntityContainer.getCreatedEventSubProcess(newFlowElement.getSubProcess().getId());
} else if (newFlowElement.getSubProcess() != null && processInstanceChangeState.getCreatedEmbeddedSubProcesses().containsKey(newFlowElement.getSubProcess().getId())) {
parentExecution = processInstanceChangeState.getCreatedEmbeddedSubProcesses().get(newFlowElement.getSubProcess().getId());

} else if ((newFlowElement instanceof Task || newFlowElement instanceof CallActivity) && isFlowElementMultiInstance(newFlowElement) && !movingExecutions.get(0).isMultiInstanceRoot() &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ public class MoveExecutionEntityContainer {
protected Map<String, FlowElementMoveEntry> currentActivityToNewElementMap = new LinkedHashMap<>();
protected Map<String, Map<String, Object>> flowElementLocalVariableMap = new HashMap<>();
protected List<String> newExecutionIds = new ArrayList<>();
protected Map<String, ExecutionEntity> createdEventSubProcesses = new HashMap<>();

public MoveExecutionEntityContainer(List<ExecutionEntity> executions, List<String> moveToActivityIds) {
this.executions = executions;
Expand Down Expand Up @@ -216,6 +217,14 @@ public void addNewExecutionId(String executionId) {
this.newExecutionIds.add(executionId);
}

public ExecutionEntity getCreatedEventSubProcess(String processDefinitionId) {
return createdEventSubProcesses.get(processDefinitionId);
}

public void addCreatedEventSubProcess(String processDefinitionId, ExecutionEntity executionEntity) {
createdEventSubProcesses.put(processDefinitionId, executionEntity);
}

public Map<String, Map<String, Object>> getFlowElementLocalVariableMap() {
return flowElementLocalVariableMap;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.flowable.engine.test.api.runtime.migration;

import java.util.List;
import java.util.Map;

import org.flowable.common.engine.impl.interceptor.EngineConfigurationConstants;
import org.flowable.eventregistry.api.EventDeployment;
import org.flowable.eventregistry.api.EventRegistry;
import org.flowable.eventregistry.api.EventRepositoryService;
import org.flowable.eventregistry.api.InboundEventChannelAdapter;
import org.flowable.eventregistry.impl.EventRegistryEngineConfiguration;
import org.flowable.eventregistry.model.InboundChannelModel;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;

/**
* Provides a test channel and test events.
*
* @author Bas Claessen
*/
public abstract class AbstractProcessInstanceMigrationEventRegistryConsumerTest extends AbstractProcessInstanceMigrationTest {

protected TestInboundEventChannelAdapter inboundEventChannelAdapter;

@BeforeEach
public void setUp() throws Exception {
inboundEventChannelAdapter = setupTestChannel();

getEventRepositoryService().createEventModelBuilder()
.key("myEvent")
.resourceName("myEvent.event")
.deploy();
}

@AfterEach
public void tearDown() throws Exception {
EventRepositoryService eventRepositoryService = getEventRepositoryService();
List<EventDeployment> deployments = eventRepositoryService.createDeploymentQuery().list();
for (EventDeployment eventDeployment : deployments) {
eventRepositoryService.deleteDeployment(eventDeployment.getId());
}
deleteDeployments();
}

protected TestInboundEventChannelAdapter setupTestChannel() {
TestInboundEventChannelAdapter inboundEventChannelAdapter = new TestInboundEventChannelAdapter();
Map<Object, Object> beans = getEventRegistryEngineConfiguration().getExpressionManager().getBeans();
beans.put("inboundEventChannelAdapter", inboundEventChannelAdapter);

getEventRepositoryService().createInboundChannelModelBuilder()
.key("test-channel")
.resourceName("testChannel.channel")
.channelAdapter("${inboundEventChannelAdapter}")
.jsonDeserializer()
.detectEventKeyUsingJsonField("type")
.jsonFieldsMapDirectlyToPayload()
.deploy();

return inboundEventChannelAdapter;
}

protected EventRepositoryService getEventRepositoryService() {
return getEventRegistryEngineConfiguration().getEventRepositoryService();
}

protected EventRegistryEngineConfiguration getEventRegistryEngineConfiguration() {
return (EventRegistryEngineConfiguration) processEngineConfiguration.getEngineConfigurations()
.get(EngineConfigurationConstants.KEY_EVENT_REGISTRY_CONFIG);
}

protected static class TestInboundEventChannelAdapter implements InboundEventChannelAdapter {

public InboundChannelModel inboundChannelModel;
public EventRegistry eventRegistry;
protected ObjectMapper objectMapper = new ObjectMapper();

@Override
public void setInboundChannelModel(InboundChannelModel inboundChannelModel) {
this.inboundChannelModel = inboundChannelModel;
}

@Override
public void setEventRegistry(EventRegistry eventRegistry) {
this.eventRegistry = eventRegistry;
}

public void triggerTestEvent() {
ObjectNode eventNode = createTestEventNode();
triggerTestEvent(eventNode);
}

public void triggerTestEvent(ObjectNode eventNode) {
try {
eventRegistry.eventReceived(inboundChannelModel, objectMapper.writeValueAsString(eventNode));
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}

protected ObjectNode createTestEventNode() {
ObjectNode json = objectMapper.createObjectNode();
json.put("type", "myEvent");
return json;
}
}
}
Loading

0 comments on commit b89cad9

Please sign in to comment.