Skip to content

Commit

Permalink
Merge pull request #2028 from Shopify/dnwe/send-apiversionsrequest
Browse files Browse the repository at this point in the history
feat: send ApiVersionsRequest on broker open
  • Loading branch information
bai authored Sep 16, 2021
2 parents 74aaffe + 4a17d91 commit abd562c
Show file tree
Hide file tree
Showing 7 changed files with 53 additions and 4 deletions.
5 changes: 5 additions & 0 deletions admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -340,13 +340,15 @@ func TestClusterAdminAlterPartitionReassignments(t *testing.T) {
defer secondBroker.Close()

seedBroker.SetHandlerByMap(map[string]MockResponse{
"ApiVersionsRequest": NewMockApiVersionsResponse(t),
"MetadataRequest": NewMockMetadataResponse(t).
SetController(secondBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
})

secondBroker.SetHandlerByMap(map[string]MockResponse{
"ApiVersionsRequest": NewMockApiVersionsResponse(t),
"AlterPartitionReassignmentsRequest": NewMockAlterPartitionReassignmentsResponse(t),
})

Expand Down Expand Up @@ -417,13 +419,15 @@ func TestClusterAdminListPartitionReassignments(t *testing.T) {
defer secondBroker.Close()

seedBroker.SetHandlerByMap(map[string]MockResponse{
"ApiVersionsRequest": NewMockApiVersionsResponse(t),
"MetadataRequest": NewMockMetadataResponse(t).
SetController(secondBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()).
SetBroker(secondBroker.Addr(), secondBroker.BrokerID()),
})

secondBroker.SetHandlerByMap(map[string]MockResponse{
"ApiVersionsRequest": NewMockApiVersionsResponse(t),
"ListPartitionReassignmentsRequest": NewMockListPartitionReassignmentsResponse(t),
})

Expand Down Expand Up @@ -1335,6 +1339,7 @@ func TestDeleteOffset(t *testing.T) {
partition := int32(0)

handlerMap := map[string]MockResponse{
"ApiVersionsRequest": NewMockApiVersionsResponse(t),
"MetadataRequest": NewMockMetadataResponse(t).
SetController(seedBroker.BrokerID()).
SetBroker(seedBroker.Addr(), seedBroker.BrokerID()),
Expand Down
2 changes: 1 addition & 1 deletion api_versions_request.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package sarama

// const defaultClientSoftwareName = "sarama"
const defaultClientSoftwareName = "sarama"

type ApiVersionsRequest struct {
// Version defines the protocol version to use for encode and decode
Expand Down
19 changes: 17 additions & 2 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,23 @@ func (b *Broker) Open(conf *Config) error {
b.lock.Lock()

go withRecover(func() {
defer b.lock.Unlock()

defer func() {
b.lock.Unlock()

// Send an ApiVersionsRequest to identify the client (KIP-511).
// Ideally Sarama would use the response to control protocol versions,
// but for now just fire-and-forget just to send
if conf.Version.IsAtLeast(V2_4_0_0) && conf.ApiVersionsRequest {
_, err = b.ApiVersions(&ApiVersionsRequest{
Version: 3,
ClientSoftwareName: defaultClientSoftwareName,
ClientSoftwareVersion: version(),
})
if err != nil {
Logger.Printf("Error while sending ApiVersionsRequest to broker %s: %s\n", b.addr, err)
}
}
}()
dialer := conf.getDialer()
b.conn, b.connErr = dialer.Dial("tcp", b.addr)
if b.connErr != nil {
Expand Down
1 change: 1 addition & 0 deletions broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ func TestSimpleBrokerCommunication(t *testing.T) {
// Set the broker id in order to validate local broker metrics
broker.id = 0
conf := NewTestConfig()
conf.ApiVersionsRequest = false
conf.Version = tt.version
err := broker.Open(conf)
if err != nil {
Expand Down
6 changes: 6 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,11 @@ type Config struct {
// in the background while user code is working, greatly improving throughput.
// Defaults to 256.
ChannelBufferSize int
// ApiVersionsRequest determines whether Sarama should send an
// ApiVersionsRequest message to each broker as part of its initial
// connection. This defaults to `true` to match the official Java client
// and most 3rdparty ones.
ApiVersionsRequest bool
// The version of Kafka that Sarama will assume it is running against.
// Defaults to the oldest supported stable version. Since Kafka provides
// backwards-compatibility, setting it to a version older than you have
Expand Down Expand Up @@ -492,6 +497,7 @@ func NewConfig() *Config {

c.ClientID = defaultClientID
c.ChannelBufferSize = 256
c.ApiVersionsRequest = true
c.Version = DefaultVersion
c.MetricRegistry = metrics.NewRegistry()

Expand Down
4 changes: 3 additions & 1 deletion real_decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,9 @@ func (rd *realDecoder) getCompactString() (string, error) {
}

length := int(n - 1)

if length < 0 {
return "", errInvalidByteSliceLength
}
tmpStr := string(rd.raw[rd.off : rd.off+length])
rd.off += length
return tmpStr, nil
Expand Down
20 changes: 20 additions & 0 deletions version.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package sarama

import "runtime/debug"

var v string

func version() string {
if v == "" {
bi, ok := debug.ReadBuildInfo()
if ok {
v = bi.Main.Version
} else {
// if we can't read a go module version then they're using a git
// clone or vendored module so all we can do is report "dev" for
// the version
v = "dev"
}
}
return v
}

0 comments on commit abd562c

Please sign in to comment.