Skip to content

Commit

Permalink
KAFKA-18635: reenable the unclean shutdown detection (#18277)
Browse files Browse the repository at this point in the history
We need to re-enable the unclean shutdown detection when in ELR mode, which was inadvertently removed during the development process.

Reviewers: David Mao <[email protected]>,  Jun Rao <[email protected]>
  • Loading branch information
CalvinConfluent authored Feb 4, 2025
1 parent 7719b5f commit ad031b9
Show file tree
Hide file tree
Showing 8 changed files with 613 additions and 69 deletions.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ static class Builder {
private long sessionTimeoutNs = DEFAULT_SESSION_TIMEOUT_NS;
private ReplicaPlacer replicaPlacer = null;
private FeatureControlManager featureControl = null;
private BrokerUncleanShutdownHandler brokerUncleanShutdownHandler = null;
private BrokerShutdownHandler brokerShutdownHandler = null;
private String interBrokerListenerName = "PLAINTEXT";

Builder setLogContext(LogContext logContext) {
Expand Down Expand Up @@ -128,8 +128,8 @@ Builder setFeatureControlManager(FeatureControlManager featureControl) {
return this;
}

Builder setBrokerUncleanShutdownHandler(BrokerUncleanShutdownHandler brokerUncleanShutdownHandler) {
this.brokerUncleanShutdownHandler = brokerUncleanShutdownHandler;
Builder setBrokerShutdownHandler(BrokerShutdownHandler brokerShutdownHandler) {
this.brokerShutdownHandler = brokerShutdownHandler;
return this;
}

Expand All @@ -154,8 +154,8 @@ ClusterControlManager build() {
if (featureControl == null) {
throw new RuntimeException("You must specify FeatureControlManager");
}
if (brokerUncleanShutdownHandler == null) {
throw new RuntimeException("You must specify BrokerUncleanShutdownHandler");
if (brokerShutdownHandler == null) {
throw new RuntimeException("You must specify BrokerShutdownHandler");
}
return new ClusterControlManager(logContext,
clusterId,
Expand All @@ -164,7 +164,7 @@ ClusterControlManager build() {
sessionTimeoutNs,
replicaPlacer,
featureControl,
brokerUncleanShutdownHandler,
brokerShutdownHandler,
interBrokerListenerName
);
}
Expand Down Expand Up @@ -252,7 +252,7 @@ boolean check() {
*/
private final FeatureControlManager featureControl;

private final BrokerUncleanShutdownHandler brokerUncleanShutdownHandler;
private final BrokerShutdownHandler brokerShutdownHandler;

/**
* The statically configured inter-broker listener name.
Expand All @@ -277,7 +277,7 @@ private ClusterControlManager(
long sessionTimeoutNs,
ReplicaPlacer replicaPlacer,
FeatureControlManager featureControl,
BrokerUncleanShutdownHandler brokerUncleanShutdownHandler,
BrokerShutdownHandler brokerShutdownHandler,
String interBrokerListenerName
) {
this.logContext = logContext;
Expand All @@ -293,7 +293,7 @@ private ClusterControlManager(
this.featureControl = featureControl;
this.controllerRegistrations = new TimelineHashMap<>(snapshotRegistry, 0);
this.directoryToBroker = new TimelineHashMap<>(snapshotRegistry, 0);
this.brokerUncleanShutdownHandler = brokerUncleanShutdownHandler;
this.brokerShutdownHandler = brokerShutdownHandler;
this.interBrokerListenerName = interBrokerListenerName;
}

Expand Down Expand Up @@ -335,7 +335,8 @@ Map<Integer, BrokerRegistration> brokerRegistrations() {
public ControllerResult<BrokerRegistrationReply> registerBroker(
BrokerRegistrationRequestData request,
long newBrokerEpoch,
FinalizedControllerFeatures finalizedFeatures
FinalizedControllerFeatures finalizedFeatures,
boolean cleanShutdownDetectionEnabled
) {
if (heartbeatManager == null) {
throw new RuntimeException("ClusterControlManager is not active.");
Expand All @@ -348,8 +349,10 @@ public ControllerResult<BrokerRegistrationReply> registerBroker(
List<ApiMessageAndVersion> records = new ArrayList<>();
BrokerRegistration existing = brokerRegistrations.get(brokerId);
Uuid prevIncarnationId = null;
long storedBrokerEpoch = -2; // BrokerRegistration.previousBrokerEpoch default value is -1
if (existing != null) {
prevIncarnationId = existing.incarnationId();
storedBrokerEpoch = existing.epoch();
if (heartbeatManager.hasValidSession(brokerId, existing.epoch())) {
if (!request.incarnationId().equals(prevIncarnationId)) {
throw new DuplicateBrokerRegistrationException("Another broker is " +
Expand Down Expand Up @@ -424,7 +427,9 @@ public ControllerResult<BrokerRegistrationReply> registerBroker(

if (!request.incarnationId().equals(prevIncarnationId)) {
int prevNumRecords = records.size();
brokerUncleanShutdownHandler.addRecordsForShutdown(request.brokerId(), records);
boolean isCleanShutdown = cleanShutdownDetectionEnabled ?
storedBrokerEpoch == request.previousBrokerEpoch() : false;
brokerShutdownHandler.addRecordsForShutdown(request.brokerId(), isCleanShutdown, records);
int numRecordsAdded = records.size() - prevNumRecords;
if (existing == null) {
log.info("No previous registration found for broker {}. New incarnation ID is " +
Expand Down Expand Up @@ -847,7 +852,7 @@ public Entry<Integer, Map<String, VersionRange>> next() {
}

@FunctionalInterface
interface BrokerUncleanShutdownHandler {
void addRecordsForShutdown(int brokerId, List<ApiMessageAndVersion> records);
interface BrokerShutdownHandler {
void addRecordsForShutdown(int brokerId, boolean isCleanShutdown, List<ApiMessageAndVersion> records);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1555,7 +1555,7 @@ private QuorumController(
setSessionTimeoutNs(sessionTimeoutNs).
setReplicaPlacer(replicaPlacer).
setFeatureControlManager(featureControl).
setBrokerUncleanShutdownHandler(this::handleUncleanBrokerShutdown).
setBrokerShutdownHandler(this::handleBrokerShutdown).
setInterBrokerListenerName(interBrokerListenerName).
build();
this.configurationControl = new ConfigurationControlManager.Builder().
Expand Down Expand Up @@ -2025,7 +2025,8 @@ public CompletableFuture<BrokerRegistrationReply> registerBroker(
return appendWriteEvent("registerBroker", context.deadlineNs(),
() -> clusterControl.
registerBroker(request, offsetControl.nextWriteOffset(),
new FinalizedControllerFeatures(controllerFeatures, Long.MAX_VALUE)),
new FinalizedControllerFeatures(controllerFeatures, Long.MAX_VALUE),
context.requestHeader().requestApiVersion() >= 3),
EnumSet.noneOf(ControllerOperationFlag.class));
}

Expand Down Expand Up @@ -2203,7 +2204,7 @@ QuorumControllerMetrics controllerMetrics() {
return controllerMetrics;
}

void handleUncleanBrokerShutdown(int brokerId, List<ApiMessageAndVersion> records) {
replicationControl.handleBrokerUncleanShutdown(brokerId, records);
void handleBrokerShutdown(int brokerId, boolean isCleanShutdown, List<ApiMessageAndVersion> records) {
replicationControl.handleBrokerShutdown(brokerId, isCleanShutdown, records);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1461,21 +1461,22 @@ void handleBrokerInControlledShutdown(int brokerId, long brokerEpoch, List<ApiMe
}

/**
* Create partition change records to remove replicas from any ISR or ELR for brokers doing unclean shutdown.
* Create partition change records to remove replicas from any ISR or ELR for brokers when the shutdown is detected.
*
* @param brokerId The broker id.
* @param records The record list to append to.
* @param brokerId The broker id to be shut down.
* @param isCleanShutdown Whether the broker has a clean shutdown.
* @param records The record list to append to.
*/
void handleBrokerUncleanShutdown(int brokerId, List<ApiMessageAndVersion> records) {
if (featureControl.metadataVersion().isElrSupported()) {
void handleBrokerShutdown(int brokerId, boolean isCleanShutdown, List<ApiMessageAndVersion> records) {
if (featureControl.metadataVersion().isElrSupported() && !isCleanShutdown) {
// ELR is enabled, generate unclean shutdown partition change records
generateLeaderAndIsrUpdates("handleBrokerUncleanShutdown", NO_LEADER, NO_LEADER, brokerId, records,
brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
generateLeaderAndIsrUpdates("handleBrokerUncleanShutdown", NO_LEADER, NO_LEADER, brokerId, records,
brokersToElrs.partitionsWithBrokerInElr(brokerId));
} else {
// ELR is not enabled, handle the unclean shutdown as if the broker was fenced
generateLeaderAndIsrUpdates("handleBrokerUncleanShutdown", brokerId, NO_LEADER, NO_LEADER, records,
// ELR is not enabled or if it is a clean shutdown, handle the shutdown as if the broker was fenced
generateLeaderAndIsrUpdates("handleBrokerShutdown", brokerId, NO_LEADER, NO_LEADER, records,
brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
}
}
Expand Down
Loading

0 comments on commit ad031b9

Please sign in to comment.