Skip to content

Commit

Permalink
fix(engine): write correct header size
Browse files Browse the repository at this point in the history
When we write the headers size we would include invalid headers. These headers would be filtered and not written to the buffer. Therefore, the size could differ from the actual amount of headers written in the buffer.

(cherry picked from commit 71dfa9b)
  • Loading branch information
remcowesterhoud authored and github-actions[bot] committed Jan 26, 2022
1 parent da9674e commit 253423f
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,20 @@
import io.camunda.zeebe.util.Either;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.stream.Collectors;
import org.agrona.DirectBuffer;
import org.agrona.ExpandableArrayBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.concurrent.UnsafeBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class BpmnJobBehavior {

private static final Logger LOGGER =
LoggerFactory.getLogger(BpmnJobBehavior.class.getPackageName());

private final JobRecord jobRecord = new JobRecord().setVariables(DocumentValue.EMPTY_DOCUMENT);
private final HeaderEncoder headerEncoder = new HeaderEncoder();

Expand Down Expand Up @@ -150,10 +157,10 @@ private DirectBuffer encodeHeaders(
final var headers = new HashMap<>(taskHeaders);
final String assignee = props.getAssignee();
final String candidateGroups = props.getCandidateGroups();
if (assignee != null) {
if (assignee != null && !assignee.isEmpty()) {
headers.put(Protocol.USER_TASK_ASSIGNEE_HEADER_NAME, assignee);
}
if (candidateGroups != null) {
if (candidateGroups != null && !candidateGroups.isEmpty()) {
headers.put(Protocol.USER_TASK_CANDIDATE_GROUPS_HEADER_NAME, candidateGroups);
}
return headerEncoder.encode(headers);
Expand Down Expand Up @@ -234,21 +241,28 @@ public DirectBuffer encode(final Map<String, String> taskHeaders) {

final MutableDirectBuffer buffer = new UnsafeBuffer(0, 0);

final var validHeaders =
taskHeaders.entrySet().stream()
.filter(entry -> isValidHeader(entry.getKey(), entry.getValue()))
.collect(Collectors.toMap(Entry::getKey, Entry::getValue));

if (validHeaders.size() != taskHeaders.size()) {
LOGGER.debug("Ignored {} invalid headers.", taskHeaders.size() - validHeaders.size());
}

final ExpandableArrayBuffer expandableBuffer =
new ExpandableArrayBuffer(INITIAL_SIZE_KEY_VALUE_PAIR * taskHeaders.size());
new ExpandableArrayBuffer(INITIAL_SIZE_KEY_VALUE_PAIR * validHeaders.size());

msgPackWriter.wrap(expandableBuffer, 0);
msgPackWriter.writeMapHeader(taskHeaders.size());
msgPackWriter.writeMapHeader(validHeaders.size());

taskHeaders.forEach(
validHeaders.forEach(
(k, v) -> {
if (isValidHeader(k, v)) {
final DirectBuffer key = wrapString(k);
msgPackWriter.writeString(key);
final DirectBuffer key = wrapString(k);
msgPackWriter.writeString(key);

final DirectBuffer value = wrapString(v);
msgPackWriter.writeString(value);
}
final DirectBuffer value = wrapString(v);
msgPackWriter.writeString(value);
});

buffer.wrap(expandableBuffer.byteArray(), 0, msgPackWriter.getOffset());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,6 +299,29 @@ public void shouldCreateJobWithEvaluatedAssigneeExpressionHeader() {
.containsEntry(Protocol.USER_TASK_ASSIGNEE_HEADER_NAME, "alice");
}

@Test
public void shouldCreateJobAndIgnoreEmptyEvaluatedAssigneeExpressionHeader() {
// given
ENGINE.deployment().withXmlResource(process(t -> t.zeebeAssigneeExpression("user"))).deploy();

// when
final long processInstanceKey =
ENGINE
.processInstance()
.ofBpmnProcessId(PROCESS_ID)
.withVariables(Map.of("user", ""))
.create();

// then
final Record<JobRecordValue> job =
RecordingExporter.jobRecords(JobIntent.CREATED)
.withProcessInstanceKey(processInstanceKey)
.getFirst();

final Map<String, String> customHeaders = job.getValue().getCustomHeaders();
assertThat(customHeaders).hasSize(0).doesNotContainKey(Protocol.USER_TASK_ASSIGNEE_HEADER_NAME);
}

@Test
public void shouldCreateJobWithCandidateGroupsHeader() {
// given
Expand Down

0 comments on commit 253423f

Please sign in to comment.