Skip to content

Commit

Permalink
[fix][broker] Make timestamp fields thread safe by using volatile
Browse files Browse the repository at this point in the history
- fixes issue with stats where timestamps might be inconsistent because of visibility issues
  - fields should be volatile to ensure visibility of updated values in a consistent manner

- in replication, the lastDataMessagePublishedTimestamp field in PersistentTopic might be inconsistent
  unless volatile is used
  • Loading branch information
lhotari committed Sep 28, 2022
1 parent dfd4882 commit e231f4e
Show file tree
Hide file tree
Showing 9 changed files with 22 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -260,11 +260,11 @@ public void triggerFailed(ManagedLedgerException exception) {
AtomicIntegerFieldUpdater.newUpdater(ManagedCursorImpl.class, "pendingMarkDeletedSubmittedCount");
@SuppressWarnings("unused")
private volatile int pendingMarkDeletedSubmittedCount = 0;
private long lastLedgerSwitchTimestamp;
private volatile long lastLedgerSwitchTimestamp;
private final Clock clock;

// The last active time (Unix time, milliseconds) of the cursor
private long lastActive;
private volatile long lastActive;

public enum State {
Uninitialized, // Cursor is being initialized
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,13 +215,13 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private volatile LedgerHandle currentLedger;
private long currentLedgerEntries = 0;
private long currentLedgerSize = 0;
private long lastLedgerCreatedTimestamp = 0;
private long lastLedgerCreationFailureTimestamp = 0;
private volatile long lastLedgerCreatedTimestamp = 0;
private volatile long lastLedgerCreationFailureTimestamp = 0;
private long lastLedgerCreationInitiationTimestamp = 0;

private long lastOffloadLedgerId = 0;
private long lastOffloadSuccessTimestamp = 0;
private long lastOffloadFailureTimestamp = 0;
private volatile long lastOffloadSuccessTimestamp = 0;
private volatile long lastOffloadFailureTimestamp = 0;

private int minBacklogCursorsForCaching = 0;
private int minBacklogEntriesForCaching = 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ public class Consumer {
private final LongAdder bytesOutCounter;
private final Rate messageAckRate;

private long lastConsumedTimestamp;
private long lastAckedTimestamp;
private long lastConsumedFlowTimestamp;
private volatile long lastConsumedTimestamp;
private volatile long lastAckedTimestamp;
private volatile long lastConsumedFlowTimestamp;
private Rate chunkedMessageRate;

// Represents how many messages we can safely send to the consumer without
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,9 +107,9 @@ public class PersistentSubscription extends AbstractSubscription implements Subs
private volatile int isFenced = FALSE;
private PersistentMessageExpiryMonitor expiryMonitor;

private long lastExpireTimestamp = 0L;
private long lastConsumedFlowTimestamp = 0L;
private long lastMarkDeleteAdvancedTimestamp = 0L;
private volatile long lastExpireTimestamp = 0L;
private volatile long lastConsumedFlowTimestamp = 0L;
private volatile long lastMarkDeleteAdvancedTimestamp = 0L;

// for connected subscriptions, message expiry will be checked if the backlog is greater than this threshold
private static final int MINIMUM_BACKLOG_FOR_EXPIRY_CHECK = 1000;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ protected TopicStatsHelper initialValue() {
protected final TransactionBuffer transactionBuffer;

// Record the last time a data message (ie: not an internal Pulsar marker) is published on the topic
private long lastDataMessagePublishedTimestamp = 0;
private volatile long lastDataMessagePublishedTimestamp = 0;

private static class TopicStatsHelper {
public double averageMsgSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.P
private final String localCluster;

// The timestamp of when the last snapshot was initiated
private long lastCompletedSnapshotStartTime = 0;
private volatile long lastCompletedSnapshotStartTime = 0;

private String lastCompletedSnapshotId;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,16 +30,16 @@ public class CompactionRecord {
200_000, 1000_000 };

@Getter
private long lastCompactionRemovedEventCount = 0L;
private volatile long lastCompactionRemovedEventCount = 0L;
@Getter
private long lastCompactionSucceedTimestamp = 0L;
private volatile long lastCompactionSucceedTimestamp = 0L;
@Getter
private long lastCompactionFailedTimestamp = 0L;
private volatile long lastCompactionFailedTimestamp = 0L;
@Getter
private long lastCompactionDurationTimeInMills = 0L;
private volatile long lastCompactionDurationTimeInMills = 0L;

private LongAdder lastCompactionRemovedEventCountOp = new LongAdder();
private long lastCompactionStartTimeOp;
private volatile long lastCompactionStartTimeOp;

private final LongAdder compactionRemovedEventCount = new LongAdder();
private final LongAdder compactionSucceedCount = new LongAdder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class AuthenticationAthenz implements Authentication, EncodedAuthenticati
// ZTSClient.cancelPrefetch() is called.
// cf. https://github.com/AthenZ/athenz/issues/544
private boolean autoPrefetchEnabled = false;
private long cachedRoleTokenTimestamp;
private volatile long cachedRoleTokenTimestamp;
private String roleToken;
// athenz will only give this token if it's at least valid for 2hrs
private static final int minValidity = 2 * 60 * 60;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ public class AutoClusterFailover implements ServiceUrlProvider {
private final long failoverDelayNs;
private final long switchBackDelayNs;
private final ScheduledExecutorService executor;
private long recoverTimestamp;
private long failedTimestamp;
private volatile long recoverTimestamp;
private volatile long failedTimestamp;
private final long intervalMs;
private static final int TIMEOUT = 30_000;
private final PulsarServiceNameResolver resolver;
Expand Down

0 comments on commit e231f4e

Please sign in to comment.