Skip to content

Commit

Permalink
Merge remote-tracking branch 'shopify_sarama/master'
Browse files Browse the repository at this point in the history
  • Loading branch information
antsbean committed Jan 17, 2020
2 parents fd421dc + 7a7d874 commit 0cce202
Show file tree
Hide file tree
Showing 22 changed files with 604 additions and 89 deletions.
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
dist: xenial
language: go
go:
- 1.11.x
- 1.12.x
- 1.13.x

Expand Down
38 changes: 38 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,43 @@
# Changelog

#### Version 1.25.0 (2020-01-13)

New Features:
- Support TLS protocol in kafka-producer-performance
([1538](https://github.com/Shopify/sarama/pull/1538)).
- Add support for kafka 2.4.0
([1552](https://github.com/Shopify/sarama/pull/1552)).

Improvements:
- Allow the Consumer to disable auto-commit offsets
([1164](https://github.com/Shopify/sarama/pull/1164)).
- Produce records with consistent timestamps
([1455](https://github.com/Shopify/sarama/pull/1455)).

Bug Fixes:
- Fix incorrect SetTopicMetadata name mentions
([1534](https://github.com/Shopify/sarama/pull/1534)).
- Fix client.tryRefreshMetadata Println
([1535](https://github.com/Shopify/sarama/pull/1535)).
- Fix panic on calling updateMetadata on closed client
([1531](https://github.com/Shopify/sarama/pull/1531)).
- Fix possible faulty metrics in TestFuncProducing
([1545](https://github.com/Shopify/sarama/pull/1545)).

#### Version 1.24.1 (2019-10-31)

New Features:
- Add DescribeLogDirs Request/Response pair
([1520](https://github.com/Shopify/sarama/pull/1520)).

Bug Fixes:
- Fix ClusterAdmin returning invalid controller ID on DescribeCluster
([1518](https://github.com/Shopify/sarama/pull/1518)).
- Fix issue with consumergroup not rebalancing when new partition is added
([1525](https://github.com/Shopify/sarama/pull/1525)).
- Ensure consistent use of read/write deadlines
([1529](https://github.com/Shopify/sarama/pull/1529)).

#### Version 1.24.0 (2019-10-09)

New Features:
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ You might also want to look at the [Frequently Asked Questions](https://github.c
Sarama provides a "2 releases + 2 months" compatibility guarantee: we support
the two latest stable releases of Kafka and Go, and we provide a two month
grace period for older releases. This means we currently officially support
Go 1.11 through 1.13, and Kafka 2.1 through 2.3, although older releases are
Go 1.12 through 1.13, and Kafka 2.1 through 2.4, although older releases are
still likely to work.

Sarama follows semantic versioning and provides API stability via the gopkg.in service.
Expand Down
58 changes: 54 additions & 4 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package sarama

import (
"errors"
"fmt"
"math/rand"
"strconv"
"sync"
)

Expand Down Expand Up @@ -214,7 +216,7 @@ func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32
Topics: []string{},
}

if ca.conf.Version.IsAtLeast(V0_11_0_0) {
if ca.conf.Version.IsAtLeast(V0_10_0_0) {
request.Version = 1
}

Expand All @@ -226,6 +228,16 @@ func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32
return response.Brokers, response.ControllerID, nil
}

func (ca *clusterAdmin) findBroker(id int32) (*Broker, error) {
brokers := ca.client.Brokers()
for _, b := range brokers {
if b.ID() == id {
return b, nil
}
}
return nil, fmt.Errorf("could not find broker id %d", id)
}

func (ca *clusterAdmin) findAnyBroker() (*Broker, error) {
brokers := ca.client.Brokers()
if len(brokers) > 0 {
Expand Down Expand Up @@ -432,8 +444,14 @@ func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]i
return nil
}

func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
// Returns a bool indicating whether the resource request needs to go to a
// specific broker
func dependsOnSpecificNode(resource ConfigResource) bool {
return (resource.Type == BrokerResource && resource.Name != "") ||
resource.Type == BrokerLoggerResource
}

func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
var entries []ConfigEntry
var resources []*ConfigResource
resources = append(resources, &resource)
Expand All @@ -442,11 +460,31 @@ func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry,
Resources: resources,
}

b, err := ca.Controller()
if ca.conf.Version.IsAtLeast(V1_1_0_0) {
request.Version = 1
}

if ca.conf.Version.IsAtLeast(V2_0_0_0) {
request.Version = 2
}

var (
b *Broker
err error
)

// DescribeConfig of broker/broker logger must be sent to the broker in question
if dependsOnSpecificNode(resource) {
id, _ := strconv.Atoi(resource.Name)
b, err = ca.findBroker(int32(id))
} else {
b, err = ca.findAnyBroker()
}
if err != nil {
return nil, err
}

_ = b.Open(ca.client.Config())
rsp, err := b.DescribeConfigs(request)
if err != nil {
return nil, err
Expand Down Expand Up @@ -479,11 +517,23 @@ func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string
ValidateOnly: validateOnly,
}

b, err := ca.Controller()
var (
b *Broker
err error
)

// AlterConfig of broker/broker logger must be sent to the broker in question
if dependsOnSpecificNode(ConfigResource{Name: name, Type: resourceType}) {
id, _ := strconv.Atoi(name)
b, err = ca.findBroker(int32(id))
} else {
b, err = ca.findAnyBroker()
}
if err != nil {
return err
}

_ = b.Open(ca.client.Config())
rsp, err := b.AlterConfigs(request)
if err != nil {
return err
Expand Down
154 changes: 146 additions & 8 deletions admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -488,21 +488,105 @@ func TestClusterAdminDescribeConfig(t *testing.T) {
"DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
})

var tests = []struct {
saramaVersion KafkaVersion
requestVersion int16
includeSynonyms bool
}{
{V1_0_0_0, 0, false},
{V1_1_0_0, 1, true},
{V1_1_1_0, 1, true},
{V2_0_0_0, 2, true},
}
for _, tt := range tests {
config := NewConfig()
config.Version = tt.saramaVersion
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
if err != nil {
t.Fatal(err)
}
defer func() {
_ = admin.Close()
}()

resource := ConfigResource{
Name: "r1",
Type: TopicResource,
ConfigNames: []string{"my_topic"},
}

entries, err := admin.DescribeConfig(resource)
if err != nil {
t.Fatal(err)
}

history := seedBroker.History()
describeReq, ok := history[len(history)-1].Request.(*DescribeConfigsRequest)
if !ok {
t.Fatal("failed to find DescribeConfigsRequest in mockBroker history")
}

if describeReq.Version != tt.requestVersion {
t.Fatalf(
"requestVersion %v did not match expected %v",
describeReq.Version, tt.requestVersion)
}

if len(entries) <= 0 {
t.Fatal(errors.New("no resource present"))
}
if tt.includeSynonyms {
if len(entries[0].Synonyms) == 0 {
t.Fatal("expected synonyms to have been included")
}
}
}
}

// TestClusterAdminDescribeBrokerConfig ensures that a describe broker config
// is sent to the broker in the resource struct, _not_ the controller
func TestClusterAdminDescribeBrokerConfig(t *testing.T) {
controllerBroker := NewMockBroker(t, 1)
defer controllerBroker.Close()
configBroker := NewMockBroker(t, 2)
defer configBroker.Close()

controllerBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(controllerBroker.BrokerID()).
SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
SetBroker(configBroker.Addr(), configBroker.BrokerID()),
})

configBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(controllerBroker.BrokerID()).
SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
SetBroker(configBroker.Addr(), configBroker.BrokerID()),
"DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
})

config := NewConfig()
config.Version = V1_0_0_0
admin, err := NewClusterAdmin([]string{seedBroker.Addr()}, config)
admin, err := NewClusterAdmin(
[]string{
controllerBroker.Addr(),
configBroker.Addr(),
}, config)
if err != nil {
t.Fatal(err)
}

resource := ConfigResource{Name: "r1", Type: TopicResource, ConfigNames: []string{"my_topic"}}
entries, err := admin.DescribeConfig(resource)
if err != nil {
t.Fatal(err)
}
for _, resourceType := range []ConfigResourceType{BrokerResource, BrokerLoggerResource} {
resource := ConfigResource{Name: "2", Type: resourceType}
entries, err := admin.DescribeConfig(resource)
if err != nil {
t.Fatal(err)
}

if len(entries) <= 0 {
t.Fatal(errors.New("no resource present"))
if len(entries) <= 0 {
t.Fatal(errors.New("no resource present"))
}
}

err = admin.Close()
Expand Down Expand Up @@ -544,6 +628,60 @@ func TestClusterAdminAlterConfig(t *testing.T) {
}
}

func TestClusterAdminAlterBrokerConfig(t *testing.T) {
controllerBroker := NewMockBroker(t, 1)
defer controllerBroker.Close()
configBroker := NewMockBroker(t, 2)
defer configBroker.Close()

controllerBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(controllerBroker.BrokerID()).
SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
SetBroker(configBroker.Addr(), configBroker.BrokerID()),
})
configBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(controllerBroker.BrokerID()).
SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
SetBroker(configBroker.Addr(), configBroker.BrokerID()),
"AlterConfigsRequest": NewMockAlterConfigsResponse(t),
})

config := NewConfig()
config.Version = V1_0_0_0
admin, err := NewClusterAdmin(
[]string{
controllerBroker.Addr(),
configBroker.Addr(),
}, config)
if err != nil {
t.Fatal(err)
}

var value string
entries := make(map[string]*string)
value = "3"
entries["min.insync.replicas"] = &value

for _, resourceType := range []ConfigResourceType{BrokerResource, BrokerLoggerResource} {
resource := ConfigResource{Name: "2", Type: resourceType}
err = admin.AlterConfig(
resource.Type,
resource.Name,
entries,
false)
if err != nil {
t.Fatal(err)
}
}

err = admin.Close()
if err != nil {
t.Fatal(err)
}
}

func TestClusterAdminCreateAcl(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()
Expand Down
14 changes: 13 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,9 @@ func (client *client) Close() error {
}

func (client *client) Closed() bool {
client.lock.RLock()
defer client.lock.RUnlock()

return client.brokers == nil
}

Expand Down Expand Up @@ -529,6 +532,11 @@ func (client *client) RefreshCoordinator(consumerGroup string) error {
// in the brokers map. It returns the broker that is registered, which may be the provided broker,
// or a previously registered Broker instance. You must hold the write lock before calling this function.
func (client *client) registerBroker(broker *Broker) {
if client.brokers == nil {
Logger.Printf("cannot register broker #%d at %s, client already closed", broker.ID(), broker.Addr())
return
}

if client.brokers[broker.ID()] == nil {
client.brokers[broker.ID()] = broker
Logger.Printf("client/brokers registered new broker #%d at %s", broker.ID(), broker.Addr())
Expand Down Expand Up @@ -822,7 +830,7 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int,
}

if broker != nil {
Logger.Println("client/metadata not fetching metadata from broker %s as we would go past the metadata timeout\n", broker.addr)
Logger.Printf("client/metadata not fetching metadata from broker %s as we would go past the metadata timeout\n", broker.addr)
return retry(ErrOutOfBrokers)
}

Expand All @@ -833,6 +841,10 @@ func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int,

// if no fatal error, returns a list of topics that need retrying due to ErrLeaderNotAvailable
func (client *client) updateMetadata(data *MetadataResponse, allKnownMetaData bool) (retry bool, err error) {
if client.Closed() {
return
}

client.lock.Lock()
defer client.lock.Unlock()

Expand Down
Loading

0 comments on commit 0cce202

Please sign in to comment.