Skip to content

Commit

Permalink
KAFKA-18001: Support UpdateRaftVoterRequest in KafkaNetworkChannel (#…
Browse files Browse the repository at this point in the history
…17773)

Adds support for UpdateRaftVoterRequest in KafkaNetworkChannel. This addresses the following scenario:

* Bootstrap a KRaft Controller quorum in dynamic mode
* Start additional controllers (as observers)
* Update kraft.version feature from 0 to 1
* Use kafka-metadata-quorum add-controller to promote an observer controller to a follower

Reviewers: Colin Patrick McCabe <[email protected]>, Alyssa Huang <[email protected]>
  • Loading branch information
justinrlee authored Nov 15, 2024
1 parent 5725a51 commit a8f84ca
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 2 deletions.
2 changes: 1 addition & 1 deletion checkstyle/suppressions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@
files="ClientUtils.java"/>

<suppress checks="ClassDataAbstractionCoupling"
files="(KafkaConsumer|ConsumerCoordinator|AbstractFetch|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaRaftClient|KafkaRaftClientTest).java"/>
files="(KafkaConsumer|ConsumerCoordinator|AbstractFetch|KafkaProducer|AbstractRequest|AbstractResponse|TransactionManager|Admin|KafkaAdminClient|MockAdminClient|KafkaRaftClient|KafkaRaftClientTest|KafkaNetworkChannelTest).java"/>
<suppress checks="ClassDataAbstractionCoupling"
files="(Errors|SaslAuthenticatorTest|AgentTest|CoordinatorTest|NetworkClientTest).java"/>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.kafka.common.message.EndQuorumEpochRequestData;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchSnapshotRequestData;
import org.apache.kafka.common.message.UpdateRaftVoterRequestData;
import org.apache.kafka.common.message.VoteRequestData;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
Expand All @@ -35,6 +36,7 @@
import org.apache.kafka.common.requests.EndQuorumEpochRequest;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchSnapshotRequest;
import org.apache.kafka.common.requests.UpdateRaftVoterRequest;
import org.apache.kafka.common.requests.VoteRequest;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.util.InterBrokerSendThread;
Expand Down Expand Up @@ -187,6 +189,8 @@ static AbstractRequest.Builder<? extends AbstractRequest> buildRequest(ApiMessag
return new FetchRequest.SimpleBuilder((FetchRequestData) requestData);
if (requestData instanceof FetchSnapshotRequestData)
return new FetchSnapshotRequest.Builder((FetchSnapshotRequestData) requestData);
if (requestData instanceof UpdateRaftVoterRequestData)
return new UpdateRaftVoterRequest.Builder((UpdateRaftVoterRequestData) requestData);
if (requestData instanceof ApiVersionsRequestData)
return new ApiVersionsRequest.Builder((ApiVersionsRequestData) requestData,
ApiKeys.API_VERSIONS.oldestVersion(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,14 @@
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.feature.SupportedVersionRange;
import org.apache.kafka.common.message.ApiVersionsResponseData;
import org.apache.kafka.common.message.BeginQuorumEpochResponseData;
import org.apache.kafka.common.message.EndQuorumEpochResponseData;
import org.apache.kafka.common.message.FetchRequestData;
import org.apache.kafka.common.message.FetchResponseData;
import org.apache.kafka.common.message.FetchSnapshotResponseData;
import org.apache.kafka.common.message.UpdateRaftVoterResponseData;
import org.apache.kafka.common.message.VoteResponseData;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.protocol.ApiKeys;
Expand All @@ -42,6 +44,7 @@
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.FetchSnapshotResponse;
import org.apache.kafka.common.requests.UpdateRaftVoterResponse;
import org.apache.kafka.common.requests.VoteRequest;
import org.apache.kafka.common.requests.VoteResponse;
import org.apache.kafka.common.utils.MockTime;
Expand Down Expand Up @@ -85,7 +88,8 @@ public void update(Time time, MockClient.MetadataUpdate update) { }
ApiKeys.BEGIN_QUORUM_EPOCH,
ApiKeys.END_QUORUM_EPOCH,
ApiKeys.FETCH,
ApiKeys.FETCH_SNAPSHOT
ApiKeys.FETCH_SNAPSHOT,
ApiKeys.UPDATE_RAFT_VOTER
);

private final int requestTimeoutMs = 30000;
Expand Down Expand Up @@ -304,6 +308,15 @@ private ApiMessage buildTestRequest(ApiKeys key) {
10
);

case UPDATE_RAFT_VOTER:
return RaftUtil.updateVoterRequest(
clusterId,
ReplicaKey.of(1, ReplicaKey.NO_DIRECTORY_ID),
5,
new SupportedVersionRange((short) 1, (short) 1),
Endpoints.empty()
);

default:
throw new AssertionError("Unexpected api " + key);
}
Expand Down Expand Up @@ -337,6 +350,8 @@ private ApiMessage buildTestErrorResponse(ApiKeys key, Errors error) {
return new FetchResponseData().setErrorCode(error.code());
case FETCH_SNAPSHOT:
return new FetchSnapshotResponseData().setErrorCode(error.code());
case UPDATE_RAFT_VOTER:
return new UpdateRaftVoterResponseData().setErrorCode(error.code());
default:
throw new AssertionError("Unexpected api " + key);
}
Expand All @@ -354,6 +369,8 @@ private Errors extractError(ApiMessage response) {
code = ((VoteResponseData) response).errorCode();
} else if (response instanceof FetchSnapshotResponseData) {
code = ((FetchSnapshotResponseData) response).errorCode();
} else if (response instanceof UpdateRaftVoterResponseData) {
code = ((UpdateRaftVoterResponseData) response).errorCode();
} else {
throw new IllegalArgumentException("Unexpected type for responseData: " + response);
}
Expand All @@ -372,6 +389,8 @@ private AbstractResponse buildResponse(ApiMessage responseData) {
return new FetchResponse((FetchResponseData) responseData);
} else if (responseData instanceof FetchSnapshotResponseData) {
return new FetchSnapshotResponse((FetchSnapshotResponseData) responseData);
} else if (responseData instanceof UpdateRaftVoterResponseData) {
return new UpdateRaftVoterResponse((UpdateRaftVoterResponseData) responseData);
} else {
throw new IllegalArgumentException("Unexpected type for responseData: " + responseData);
}
Expand Down

0 comments on commit a8f84ca

Please sign in to comment.