Skip to content

Commit

Permalink
merge: #7711
Browse files Browse the repository at this point in the history
7711: [Backport stable/1.1] Avoid multiple correlations of a message to a non-interrupting event  r=saig0 a=github-actions[bot]

# Description
Backport of #7707 to `stable/1.1`.

relates to #7619

Co-authored-by: Philipp Ossler <[email protected]>
  • Loading branch information
zeebe-bors-cloud[bot] and saig0 authored Aug 27, 2021
2 parents bbba9ef + 95e28bd commit 9bb81a0
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public void applyState(final long key, final MessageSubscriptionRecord value) {
if (value.isInterrupting()) {
messageSubscriptionState.remove(subscription);
} else {
messageSubscriptionState.resetCorrelatingState(subscription);
messageSubscriptionState.updateToCorrelatedState(subscription);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,13 @@ public ProcessMessageSubscriptionCorrelatedApplier(
@Override
public void applyState(final long key, final ProcessMessageSubscriptionRecord value) {
final var eventScopeKey = value.getElementInstanceKey();

if (value.isInterrupting()) {
subscriptionState.remove(eventScopeKey, value.getMessageNameBuffer());
} else {
// if the message subscription is created and a matching message is buffered then it writes a
// process message subscription CORRELATE instead of a CREATE command
subscriptionState.updateToOpenedState(value);
}

if (shouldCreateTemporaryVariables(value)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,8 +155,9 @@ public void updateToCorrelatingState(final MessageSubscriptionRecord record) {
}

@Override
public void resetCorrelatingState(final MessageSubscription subscription) {
public void updateToCorrelatedState(final MessageSubscription subscription) {
updateCorrelatingFlag(subscription, false);
transientState.remove(subscription.getRecord());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public interface MutableMessageSubscriptionState extends MessageSubscriptionStat

void updateToCorrelatingState(MessageSubscriptionRecord record);

void resetCorrelatingState(MessageSubscription subscription);
void updateToCorrelatedState(MessageSubscription subscription);

boolean remove(long elementInstanceKey, DirectBuffer messageName);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -831,6 +831,59 @@ public void shouldCorrelateToNonInterruptingBoundaryEvent() {
.containsExactly("0", "1", "2");
}

@Test
public void shouldCorrelateOnlyOnceToNonInterruptingBoundaryEvent() {
// given
final BpmnModelInstance process =
Bpmn.createExecutableProcess(PROCESS_ID)
.startEvent()
.serviceTask("task", b -> b.zeebeJobType("test"))
.boundaryEvent("message")
.cancelActivity(false)
.message(m -> m.name("message").zeebeCorrelationKeyExpression("key"))
.endEvent("correlated")
.moveToActivity("task")
.boundaryEvent("timer")
.timerWithDuration("PT5M")
.endEvent()
.done();

engine.deployment().withXmlResource(process).deploy();

final var processInstanceKey =
engine.processInstance().ofBpmnProcessId(PROCESS_ID).withVariable("key", "key-1").create();

// when
engine.message().withName("message").withCorrelationKey("key-1").publish();

RecordingExporter.processInstanceRecords(ProcessInstanceIntent.ELEMENT_COMPLETED)
.withProcessInstanceKey(processInstanceKey)
.withElementType(BpmnElementType.BOUNDARY_EVENT)
.await();

// complete the process instance by triggering the interrupting timer boundary event
engine.increaseTime(Duration.ofMinutes(5));

// then
assertThat(
RecordingExporter.processInstanceRecords()
.withProcessInstanceKey(processInstanceKey)
.limitToProcessInstanceCompleted()
.withIntent(ProcessInstanceIntent.ELEMENT_ACTIVATED)
.withElementType(BpmnElementType.BOUNDARY_EVENT))
.describedAs("Expected that the message boundary event is activated only once")
.hasSize(2)
.extracting(r -> r.getValue().getElementId())
.containsExactly("message", "timer");

assertThat(
RecordingExporter.records()
.limitToProcessInstance(processInstanceKey)
.onlyCommandRejections())
.describedAs("Expected no subscription command rejections")
.isEmpty();
}

@Test
public void shouldCorrelateMessageAgainAfterRejection() {
// given
Expand Down

0 comments on commit 9bb81a0

Please sign in to comment.