Skip to content

Commit

Permalink
[fix][broker] Make timestamp fields thread safe by using volatile
Browse files Browse the repository at this point in the history
- fixes issue with stats where timestamps might be inconsistent because of visibility issues
  - fields should be volatile to ensure visibility of updated values in a consistent manner
  • Loading branch information
lhotari committed Aug 24, 2022
1 parent 3160edc commit eea55c4
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -255,11 +255,11 @@ public void triggerFailed(ManagedLedgerException exception) {
AtomicIntegerFieldUpdater.newUpdater(ManagedCursorImpl.class, "pendingMarkDeletedSubmittedCount");
@SuppressWarnings("unused")
private volatile int pendingMarkDeletedSubmittedCount = 0;
private long lastLedgerSwitchTimestamp;
private volatile long lastLedgerSwitchTimestamp;
private final Clock clock;

// The last active time (Unix time, milliseconds) of the cursor
private long lastActive;
private volatile long lastActive;

public enum State {
Uninitialized, // Cursor is being initialized
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,9 +87,9 @@ public class Consumer {
private final LongAdder bytesOutCounter;
private final Rate messageAckRate;

private long lastConsumedTimestamp;
private long lastAckedTimestamp;
private long lastConsumedFlowTimestamp;
private volatile long lastConsumedTimestamp;
private volatile long lastAckedTimestamp;
private volatile long lastConsumedFlowTimestamp;
private Rate chunkedMessageRate;

// Represents how many messages we can safely send to the consumer without
Expand Down Expand Up @@ -129,7 +129,7 @@ public class Consumer {

private static final double avgPercent = 0.9;
private boolean preciseDispatcherFlowControl;
private PositionImpl readPositionWhenJoining;
private volatile PositionImpl readPositionWhenJoining;
private final String clientAddress; // IP address only, no port number included
private final MessageId startMessageId;
private final boolean isAcknowledgmentAtBatchIndexLevelEnabled;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
private volatile int isFenced = FALSE;
private PersistentMessageExpiryMonitor expiryMonitor;

private long lastExpireTimestamp = 0L;
private long lastConsumedFlowTimestamp = 0L;
private long lastMarkDeleteAdvancedTimestamp = 0L;
private volatile long lastExpireTimestamp = 0L;
private volatile long lastConsumedFlowTimestamp = 0L;
private volatile long lastMarkDeleteAdvancedTimestamp = 0L;

// for connected subscriptions, message expiry will be checked if the backlog is greater than this threshold
private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000;
Expand Down

0 comments on commit eea55c4

Please sign in to comment.