Skip to content

Commit

Permalink
fix(dmn): prevent duplicate key insertion for dmn
Browse files Browse the repository at this point in the history
To make sure we keep our data consistent we should make sure we don't store duplicate values into the state. The DMN resources were missing the required checks to prevent this. We would always try to insert the resources, disregarding if it is a duplicate. This change filters out the duplicate records and guarantees we only store the non-duplicates.

(cherry picked from commit 96810c8)
  • Loading branch information
remcowesterhoud authored and github-actions[bot] committed Apr 13, 2022
1 parent 4bf32f2 commit 15ea241
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@
package io.camunda.zeebe.engine.state.appliers;

import static io.camunda.zeebe.util.buffer.BufferUtil.wrapArray;
import static java.util.function.Predicate.not;

import io.camunda.zeebe.engine.state.TypedEventApplier;
import io.camunda.zeebe.engine.state.mutable.MutableDecisionState;
import io.camunda.zeebe.engine.state.mutable.MutableProcessState;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DecisionRecord;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DecisionRequirementsMetadataRecord;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DecisionRequirementsRecord;
import io.camunda.zeebe.protocol.impl.record.value.deployment.DeploymentRecord;
import io.camunda.zeebe.protocol.record.intent.DeploymentIntent;
Expand Down Expand Up @@ -40,8 +43,8 @@ public void applyState(final long key, final DeploymentRecord value) {
}

private void putDmnResourcesInState(final DeploymentRecord value) {
value
.decisionRequirementsMetadata()
value.decisionRequirementsMetadata().stream()
.filter(not(DecisionRequirementsMetadataRecord::isDuplicate))
.forEach(
drg -> {
final var resource = getResourceByName(value, drg.getResourceName());
Expand All @@ -50,7 +53,9 @@ private void putDmnResourcesInState(final DeploymentRecord value) {
decisionState.storeDecisionRequirements(decisionRequirementsRecord);
});

value.decisionsMetadata().forEach(decisionState::storeDecisionRecord);
value.decisionsMetadata().stream()
.filter(not(DecisionRecord::isDuplicate))
.forEach(decisionState::storeDecisionRecord);
}

private DirectBuffer getResourceByName(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import io.camunda.zeebe.protocol.record.intent.DeploymentIntent;
import io.camunda.zeebe.protocol.record.value.DeploymentDistributionRecordValue;
import io.camunda.zeebe.protocol.record.value.DeploymentRecordValue;
import io.camunda.zeebe.protocol.record.value.deployment.DecisionRecordValue;
import io.camunda.zeebe.protocol.record.value.deployment.DecisionRequirementsMetadataValue;
import io.camunda.zeebe.protocol.record.value.deployment.DeploymentResource;
import io.camunda.zeebe.protocol.record.value.deployment.ProcessMetadataValue;
import io.camunda.zeebe.test.util.record.RecordingExporter;
Expand All @@ -42,6 +44,8 @@ public final class CreateDeploymentMultiplePartitionsTest {
Bpmn.createExecutableProcess(PROCESS_ID).startEvent().endEvent().done();
private static final BpmnModelInstance PROCESS_2 =
Bpmn.createExecutableProcess("process2").startEvent().endEvent().done();
private static final String DMN_DECISION_TABLE = "/dmn/decision-table.dmn";
private static final String DMN_DECISION_TABLE_V2 = "/dmn/decision-table_v2.dmn";

@Rule
public final RecordingExporterTestWatcher recordingExporterTestWatcher =
Expand Down Expand Up @@ -300,7 +304,7 @@ public void shouldFilterDuplicateProcess() {
.collect(Collectors.toList());

assertThat(repeatedWfs.size()).isEqualTo(PARTITION_COUNT - 1);
repeatedWfs.forEach(repeatedWf -> assertSameResource(originalProcesses.get(0), repeatedWf));
repeatedWfs.forEach(repeatedWf -> assertSameProcess(originalProcesses.get(0), repeatedWf));
}

@Test
Expand All @@ -320,7 +324,7 @@ public void shouldNotFilterDifferentProcesses() {
final var repeatedProcesses = repeated.getValue().getProcessesMetadata();
assertThat(repeatedProcesses.size()).isEqualTo(originalProcesses.size()).isOne();

assertDifferentResources(originalProcesses.get(0), repeatedProcesses.get(0));
assertDifferentProcesses(originalProcesses.get(0), repeatedProcesses.get(0));

assertThat(
RecordingExporter.deploymentRecords(DeploymentIntent.DISTRIBUTE)
Expand All @@ -338,10 +342,106 @@ public void shouldNotFilterDifferentProcesses() {

assertThat(repeatedWfs.size()).isEqualTo(PARTITION_COUNT - 1);
repeatedWfs.forEach(
repeatedWf -> assertDifferentResources(originalProcesses.get(0), repeatedWf));
repeatedWf -> assertDifferentProcesses(originalProcesses.get(0), repeatedWf));
}

private void assertSameResource(
@Test
public void shouldFilterDuplicateDmnResource() {
// given
final Record<DeploymentRecordValue> original =
ENGINE.deployment().withXmlClasspathResource(DMN_DECISION_TABLE).deploy();

// when
final Record<DeploymentRecordValue> repeated =
ENGINE.deployment().withXmlClasspathResource(DMN_DECISION_TABLE).deploy();

// then
assertThat(repeated.getKey()).isGreaterThan(original.getKey());

final var originalDecision = original.getValue().getDecisionsMetadata();
final var originalDrg = original.getValue().getDecisionRequirementsMetadata();
final var repeatedDecision = repeated.getValue().getDecisionsMetadata();
final var repeatedDrg = repeated.getValue().getDecisionRequirementsMetadata();
assertThat(repeatedDecision.size()).isEqualTo(originalDecision.size()).isOne();
assertThat(repeatedDrg.size()).isEqualTo(originalDrg.size()).isOne();

assertThat(
RecordingExporter.deploymentRecords(DeploymentIntent.DISTRIBUTE)
.withRecordKey(repeated.getKey())
.limit(PARTITION_COUNT - 1)
.count())
.isEqualTo(PARTITION_COUNT - 1);

final var repeatedDecisions =
RecordingExporter.deploymentRecords(DeploymentIntent.DISTRIBUTED)
.withRecordKey(repeated.getKey())
.limit(PARTITION_COUNT - 1)
.map(r -> r.getValue().getDecisionsMetadata().get(0))
.collect(Collectors.toList());

assertThat(repeatedDecisions.size()).isEqualTo(PARTITION_COUNT - 1);
repeatedDecisions.forEach(r -> assertSameDecision(originalDecision.get(0), r));

final var repeatedDrgs =
RecordingExporter.deploymentRecords(DeploymentIntent.DISTRIBUTED)
.withRecordKey(repeated.getKey())
.limit(PARTITION_COUNT - 1)
.map(r -> r.getValue().getDecisionRequirementsMetadata().get(0))
.collect(Collectors.toList());

assertThat(repeatedDrgs.size()).isEqualTo(PARTITION_COUNT - 1);
repeatedDrgs.forEach(r -> assertSameDrg(originalDrg.get(0), r));
}

@Test
public void shouldNotFilterDifferentDmnResource() {
// given
final Record<DeploymentRecordValue> original =
ENGINE.deployment().withXmlClasspathResource(DMN_DECISION_TABLE).deploy();

// when
final Record<DeploymentRecordValue> repeated =
ENGINE.deployment().withXmlClasspathResource(DMN_DECISION_TABLE_V2).deploy();

// then
assertThat(repeated.getKey()).isGreaterThan(original.getKey());

final var originalDecision = original.getValue().getDecisionsMetadata();
final var originalDrg = original.getValue().getDecisionRequirementsMetadata();
final var repeatedDecision = repeated.getValue().getDecisionsMetadata();
final var repeatedDrg = repeated.getValue().getDecisionRequirementsMetadata();
assertThat(repeatedDecision.size()).isEqualTo(originalDecision.size()).isOne();
assertThat(repeatedDrg.size()).isEqualTo(originalDrg.size()).isOne();

assertThat(
RecordingExporter.deploymentRecords(DeploymentIntent.DISTRIBUTE)
.withRecordKey(repeated.getKey())
.limit(PARTITION_COUNT - 1)
.count())
.isEqualTo(PARTITION_COUNT - 1);

final var repeatedDecisions =
RecordingExporter.deploymentRecords(DeploymentIntent.DISTRIBUTED)
.withRecordKey(repeated.getKey())
.limit(PARTITION_COUNT - 1)
.map(r -> r.getValue().getDecisionsMetadata().get(0))
.collect(Collectors.toList());

assertThat(repeatedDecisions.size()).isEqualTo(PARTITION_COUNT - 1);
repeatedDecisions.forEach(r -> assertDifferentDecision(originalDecision.get(0), r));

final var repeatedDrgs =
RecordingExporter.deploymentRecords(DeploymentIntent.DISTRIBUTED)
.withRecordKey(repeated.getKey())
.limit(PARTITION_COUNT - 1)
.map(r -> r.getValue().getDecisionRequirementsMetadata().get(0))
.collect(Collectors.toList());

assertThat(repeatedDrgs.size()).isEqualTo(PARTITION_COUNT - 1);
repeatedDrgs.forEach(r -> assertDifferentDrg(originalDrg.get(0), r));
}

private void assertSameProcess(
final ProcessMetadataValue original, final ProcessMetadataValue repeated) {
Assertions.assertThat(repeated)
.hasVersion(original.getVersion())
Expand All @@ -350,12 +450,53 @@ private void assertSameResource(
.hasBpmnProcessId(original.getBpmnProcessId());
}

private void assertDifferentResources(
private void assertDifferentProcesses(
final ProcessMetadataValue original, final ProcessMetadataValue repeated) {
assertThat(original.getProcessDefinitionKey()).isLessThan(repeated.getProcessDefinitionKey());
assertThat(original.getVersion()).isLessThan(repeated.getVersion());
}

private void assertSameDecision(
final DecisionRecordValue original, final DecisionRecordValue repeated) {
Assertions.assertThat(repeated)
.hasDecisionId(original.getDecisionId())
.hasDecisionName(original.getDecisionName())
.hasVersion(original.getVersion())
.hasDecisionKey(original.getDecisionKey())
.hasDecisionRequirementsId(original.getDecisionRequirementsId())
.hasDecisionRequirementsKey(original.getDecisionRequirementsKey());
}

private void assertDifferentDecision(
final DecisionRecordValue original, final DecisionRecordValue repeated) {
assertThat(original.getVersion()).isLessThan(repeated.getVersion());
assertThat(original.getDecisionKey()).isLessThan(repeated.getDecisionKey());
assertThat(original.getDecisionRequirementsKey())
.isLessThan(repeated.getDecisionRequirementsKey());
}

private void assertSameDrg(
final DecisionRequirementsMetadataValue original,
final DecisionRequirementsMetadataValue repeated) {
Assertions.assertThat(repeated)
.hasDecisionRequirementsId(original.getDecisionRequirementsId())
.hasDecisionRequirementsName(original.getDecisionRequirementsName())
.hasDecisionRequirementsVersion(original.getDecisionRequirementsVersion())
.hasDecisionRequirementsKey(original.getDecisionRequirementsKey())
.hasNamespace(original.getNamespace())
.hasResourceName(original.getResourceName())
.hasChecksum(original.getChecksum());
}

private void assertDifferentDrg(
final DecisionRequirementsMetadataValue original,
final DecisionRequirementsMetadataValue repeated) {
assertThat(original.getDecisionRequirementsVersion())
.isLessThan(repeated.getDecisionRequirementsVersion());
assertThat(original.getDecisionRequirementsKey())
.isLessThan(repeated.getDecisionRequirementsKey());
}

private byte[] bpmnXml(final BpmnModelInstance definition) {
final ByteArrayOutputStream outStream = new ByteArrayOutputStream();
Bpmn.writeModelToStream(outStream, definition);
Expand Down

0 comments on commit 15ea241

Please sign in to comment.