Skip to content

Commit

Permalink
fix(engine): prevent record corruption during message TTL checking
Browse files Browse the repository at this point in the history
When multiple message TTL checker were running concurrently, for example
because one broker is leader for multiple partitions, the scheduled
jobs tried to write the same message record concurrently. Writing
records (or really, `UnpackedObject`s) is not thread safe. Using a
static instance of the empty message record would result in
deserialization errors while trying to create a copy before writing to
the logstream.
Usually this would kill the processing actor and stop processing for one
partition. With `enableMessageTtlCheckerAsync` enabled, it would only
prevent message TTL checking until a new instance of `MessageObserver`
is started.

(cherry picked from commit f318044)
  • Loading branch information
lenaschoenburg authored and github-actions[bot] committed Apr 21, 2023
1 parent 29e82eb commit 231f1cc
Showing 1 changed file with 2 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
*/
public final class MessageTimeToLiveChecker implements Task {

private static final MessageRecord EMPTY_DELETE_MESSAGE_COMMAND =
private final MessageRecord emptyDeleteMessageCommand =
new MessageRecord().setName("").setCorrelationKey("").setTimeToLive(-1L);

/** This determines the duration that the TTL checker is idle after it completes an execution. */
Expand Down Expand Up @@ -88,7 +88,7 @@ public TaskResult execute(final TaskResultBuilder taskResultBuilder) {

final boolean stillFitsInResult =
taskResultBuilder.appendCommandRecord(
expiredMessageKey, MessageIntent.EXPIRE, EMPTY_DELETE_MESSAGE_COMMAND);
expiredMessageKey, MessageIntent.EXPIRE, emptyDeleteMessageCommand);
return stillFitsInResult && counter.incrementAndGet() < batchLimit;
});

Expand Down

0 comments on commit 231f1cc

Please sign in to comment.