From 09ced0bb61d8c92757f75bf0bde6744471e28d38 Mon Sep 17 00:00:00 2001 From: Dominic Evans Date: Thu, 3 Aug 2023 22:55:41 +0100 Subject: [PATCH] fix(proto): use full range of MetadataRequest Even though most of these are identical, we may as well match up correctly. Signed-off-by: Dominic Evans --- async_producer_test.go | 22 +++++++++++----------- client_test.go | 2 +- metadata_request.go | 28 +++++++++++++++++----------- metadata_response.go | 24 +++++++++++++----------- sync_producer_test.go | 2 +- transaction_manager_test.go | 12 ++++++------ 6 files changed, 49 insertions(+), 41 deletions(-) diff --git a/async_producer_test.go b/async_producer_test.go index 71479c469..fc3d69fb2 100644 --- a/async_producer_test.go +++ b/async_producer_test.go @@ -1111,7 +1111,7 @@ func TestAsyncProducerIdempotentGoldenPath(t *testing.T) { broker := NewMockBroker(t, 1) metadataResponse := &MetadataResponse{ - Version: 1, + Version: 4, ControllerID: 1, } metadataResponse.AddBroker(broker.Addr(), broker.BrokerID()) @@ -1169,7 +1169,7 @@ func TestAsyncProducerIdempotentRetryCheckBatch(t *testing.T) { broker := NewMockBroker(t, 1) metadataResponse := &MetadataResponse{ - Version: 1, + Version: 4, ControllerID: 1, } metadataResponse.AddBroker(broker.Addr(), broker.BrokerID()) @@ -1307,7 +1307,7 @@ func TestAsyncProducerIdempotentRetryCheckBatch_2378(t *testing.T) { broker := NewMockBroker(t, 1) metadataResponse := &MetadataResponse{ - Version: 1, + Version: 4, ControllerID: 1, } metadataResponse.AddBroker(broker.Addr(), broker.BrokerID()) @@ -1375,7 +1375,7 @@ func TestAsyncProducerIdempotentErrorOnOutOfSeq(t *testing.T) { broker := NewMockBroker(t, 1) metadataResponse := &MetadataResponse{ - Version: 1, + Version: 4, ControllerID: 1, } metadataResponse.AddBroker(broker.Addr(), broker.BrokerID()) @@ -1425,7 +1425,7 @@ func TestAsyncProducerIdempotentEpochRollover(t *testing.T) { defer broker.Close() metadataResponse := &MetadataResponse{ - Version: 1, + Version: 4, ControllerID: 1, } metadataResponse.AddBroker(broker.Addr(), broker.BrokerID()) @@ -1501,7 +1501,7 @@ func TestAsyncProducerIdempotentEpochExhaustion(t *testing.T) { ) metadataResponse := &MetadataResponse{ - Version: 1, + Version: 4, ControllerID: 1, } metadataResponse.AddBroker(broker.Addr(), broker.BrokerID()) @@ -1737,7 +1737,7 @@ func TestTxmngInitProducerId(t *testing.T) { defer broker.Close() metadataLeader := new(MetadataResponse) - metadataLeader.Version = 1 + metadataLeader.Version = 4 metadataLeader.AddBroker(broker.Addr(), broker.BrokerID()) broker.Returns(metadataLeader) @@ -1879,7 +1879,7 @@ func TestTxnProduceRecordWithCommit(t *testing.T) { config.Net.MaxOpenRequests = 1 metadataLeader := new(MetadataResponse) - metadataLeader.Version = 1 + metadataLeader.Version = 4 metadataLeader.ControllerID = broker.brokerID metadataLeader.AddBroker(broker.Addr(), broker.BrokerID()) metadataLeader.AddTopic("test-topic", ErrNoError) @@ -1960,7 +1960,7 @@ func TestTxnProduceBatchAddPartition(t *testing.T) { config.Producer.Partitioner = NewManualPartitioner metadataLeader := new(MetadataResponse) - metadataLeader.Version = 1 + metadataLeader.Version = 4 metadataLeader.ControllerID = broker.brokerID metadataLeader.AddBroker(broker.Addr(), broker.BrokerID()) metadataLeader.AddTopic("test-topic", ErrNoError) @@ -2067,7 +2067,7 @@ func TestTxnProduceRecordWithAbort(t *testing.T) { config.Net.MaxOpenRequests = 1 metadataLeader := new(MetadataResponse) - metadataLeader.Version = 1 + metadataLeader.Version = 4 metadataLeader.ControllerID = broker.brokerID metadataLeader.AddBroker(broker.Addr(), broker.BrokerID()) metadataLeader.AddTopic("test-topic", ErrNoError) @@ -2147,7 +2147,7 @@ func TestTxnCanAbort(t *testing.T) { config.Net.MaxOpenRequests = 1 metadataLeader := new(MetadataResponse) - metadataLeader.Version = 1 + metadataLeader.Version = 4 metadataLeader.ControllerID = broker.brokerID metadataLeader.AddBroker(broker.Addr(), broker.BrokerID()) metadataLeader.AddTopic("test-topic", ErrNoError) diff --git a/client_test.go b/client_test.go index 90cf63547..b416fbe02 100644 --- a/client_test.go +++ b/client_test.go @@ -1071,7 +1071,7 @@ func TestClientCoordinatorConnectionRefused(t *testing.T) { func TestInitProducerIDConnectionRefused(t *testing.T) { t.Parallel() seedBroker := NewMockBroker(t, 1) - seedBroker.Returns(&MetadataResponse{Version: 1}) + seedBroker.Returns(&MetadataResponse{Version: 4}) config := NewTestConfig() config.Producer.Idempotent = true diff --git a/metadata_request.go b/metadata_request.go index 9e46eef94..9e1e61b17 100644 --- a/metadata_request.go +++ b/metadata_request.go @@ -17,6 +17,10 @@ func NewMetadataRequest(version KafkaVersion, topics []string) *MetadataRequest m.Version = 6 } else if version.IsAtLeast(V1_0_0_0) { m.Version = 5 + } else if version.IsAtLeast(V0_11_0_0) { + m.Version = 4 + } else if version.IsAtLeast(V0_10_1_0) { + m.Version = 2 } else if version.IsAtLeast(V0_10_0_0) { m.Version = 1 } @@ -94,19 +98,21 @@ func (r *MetadataRequest) isValidVersion() bool { func (r *MetadataRequest) requiredVersion() KafkaVersion { switch r.Version { - case 1: - return V0_10_0_0 - case 2: - return V0_10_1_0 - case 3, 4: - return V0_11_0_0 - case 5: - return V1_0_0_0 - case 6: - return V2_0_0_0 case 7: return V2_1_0_0 + case 6: + return V2_0_0_0 + case 5: + return V1_0_0_0 + case 3, 4: + return V0_11_0_0 + case 2: + return V0_10_1_0 + case 1: + return V0_10_0_0 + case 0: + return V0_8_2_0 default: - return MinVersion + return V2_1_0_0 } } diff --git a/metadata_response.go b/metadata_response.go index 48c8b953f..902364963 100644 --- a/metadata_response.go +++ b/metadata_response.go @@ -281,20 +281,22 @@ func (r *MetadataResponse) isValidVersion() bool { func (r *MetadataResponse) requiredVersion() KafkaVersion { switch r.Version { - case 1: - return V0_10_0_0 - case 2: - return V0_10_1_0 - case 3, 4: - return V0_11_0_0 - case 5: - return V1_0_0_0 - case 6: - return V2_0_0_0 case 7: return V2_1_0_0 + case 6: + return V2_0_0_0 + case 5: + return V1_0_0_0 + case 3, 4: + return V0_11_0_0 + case 2: + return V0_10_1_0 + case 1: + return V0_10_0_0 + case 0: + return V0_8_2_0 default: - return MinVersion + return V2_1_0_0 } } diff --git a/sync_producer_test.go b/sync_producer_test.go index 3413b9b0f..33ca66ba7 100644 --- a/sync_producer_test.go +++ b/sync_producer_test.go @@ -73,7 +73,7 @@ func TestSyncProducerTransactional(t *testing.T) { config.Net.MaxOpenRequests = 1 metadataResponse := new(MetadataResponse) - metadataResponse.Version = 1 + metadataResponse.Version = 4 metadataResponse.ControllerID = leader.BrokerID() metadataResponse.AddBroker(leader.Addr(), leader.BrokerID()) metadataResponse.AddTopic("my_topic", ErrNoError) diff --git a/transaction_manager_test.go b/transaction_manager_test.go index 9516f6dba..3100a9743 100644 --- a/transaction_manager_test.go +++ b/transaction_manager_test.go @@ -87,7 +87,7 @@ func TestTxnmgrInitProducerIdTxn(t *testing.T) { defer broker.Close() metadataLeader := new(MetadataResponse) - metadataLeader.Version = 1 + metadataLeader.Version = 4 metadataLeader.ControllerID = broker.brokerID metadataLeader.AddBroker(broker.Addr(), broker.BrokerID()) broker.Returns(metadataLeader) @@ -217,7 +217,7 @@ func TestMaybeAddPartitionToCurrentTxn(t *testing.T) { defer broker.Close() metadataLeader := new(MetadataResponse) - metadataLeader.Version = 1 + metadataLeader.Version = 4 metadataLeader.ControllerID = broker.brokerID metadataLeader.AddBroker(broker.Addr(), broker.BrokerID()) metadataLeader.AddTopic("test-topic", ErrNoError) @@ -351,7 +351,7 @@ func TestAddOffsetsToTxn(t *testing.T) { defer broker.Close() metadataLeader := new(MetadataResponse) - metadataLeader.Version = 1 + metadataLeader.Version = 4 metadataLeader.ControllerID = broker.brokerID metadataLeader.AddBroker(broker.Addr(), broker.BrokerID()) metadataLeader.AddTopic("test-topic", ErrNoError) @@ -608,7 +608,7 @@ func TestTxnOffsetsCommit(t *testing.T) { config.Producer.Transaction.Retry.Backoff = 0 metadataLeader := new(MetadataResponse) - metadataLeader.Version = 1 + metadataLeader.Version = 4 metadataLeader.ControllerID = broker.brokerID metadataLeader.AddBroker(broker.Addr(), broker.BrokerID()) metadataLeader.AddTopic("test-topic", ErrNoError) @@ -739,7 +739,7 @@ func TestEndTxn(t *testing.T) { defer broker.Close() metadataLeader := new(MetadataResponse) - metadataLeader.Version = 1 + metadataLeader.Version = 4 metadataLeader.ControllerID = broker.brokerID metadataLeader.AddBroker(broker.Addr(), broker.BrokerID()) metadataLeader.AddTopic("test-topic", ErrNoError) @@ -904,7 +904,7 @@ func TestPublishPartitionToTxn(t *testing.T) { defer broker.Close() metadataLeader := new(MetadataResponse) - metadataLeader.Version = 1 + metadataLeader.Version = 4 metadataLeader.ControllerID = broker.brokerID metadataLeader.AddBroker(broker.Addr(), broker.BrokerID()) metadataLeader.AddTopic("test-topic", ErrNoError)