Skip to content

Commit

Permalink
fix: use the broker for any admin on BrokerConfig
Browse files Browse the repository at this point in the history
When operating on Broker configuration via the Admin API, the request
_must_ be sent to the specific broker that the change applies to, _not_
just (as usual) to the Controller.
  • Loading branch information
dnwe committed Jan 15, 2020
1 parent a20d267 commit fb8b9b5
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 18 deletions.
47 changes: 45 additions & 2 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package sarama

import (
"errors"
"fmt"
"math/rand"
"strconv"
"sync"
)

Expand Down Expand Up @@ -226,6 +228,16 @@ func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32
return response.Brokers, response.ControllerID, nil
}

func (ca *clusterAdmin) findBroker(id int32) (*Broker, error) {
brokers := ca.client.Brokers()
for _, b := range brokers {
if b.ID() == id {
return b, nil
}
}
return nil, fmt.Errorf("could not find broker id %d", id)
}

func (ca *clusterAdmin) findAnyBroker() (*Broker, error) {
brokers := ca.client.Brokers()
if len(brokers) > 0 {
Expand Down Expand Up @@ -432,6 +444,13 @@ func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]i
return nil
}

// Returns a bool indicating whether the resource request needs to go to a
// specific broker
func dependsOnSpecificNode(resource ConfigResource) bool {
return (resource.Type == BrokerResource && resource.Name != "") ||
resource.Type == BrokerLoggerResource
}

func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {

var entries []ConfigEntry
Expand All @@ -442,11 +461,23 @@ func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry,
Resources: resources,
}

b, err := ca.Controller()
var (
b *Broker
err error
)

// DescribeConfig of broker/broker logger must be sent to the broker in question
if dependsOnSpecificNode(resource) {
id, _ := strconv.Atoi(resource.Name)
b, err = ca.findBroker(int32(id))
} else {
b, err = ca.findAnyBroker()
}
if err != nil {
return nil, err
}

_ = b.Open(ca.client.Config())
rsp, err := b.DescribeConfigs(request)
if err != nil {
return nil, err
Expand Down Expand Up @@ -479,11 +510,23 @@ func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string
ValidateOnly: validateOnly,
}

b, err := ca.Controller()
var (
b *Broker
err error
)

// AlterConfig of broker/broker logger must be sent to the broker in question
if dependsOnSpecificNode(ConfigResource{Name: name, Type: resourceType}) {
id, _ := strconv.Atoi(name)
b, err = ca.findBroker(int32(id))
} else {
b, err = ca.findAnyBroker()
}
if err != nil {
return err
}

_ = b.Open(ca.client.Config())
rsp, err := b.AlterConfigs(request)
if err != nil {
return err
Expand Down
113 changes: 113 additions & 0 deletions admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@ package sarama

import (
"errors"
"fmt"
"io/ioutil"
"log"
"os"
"strings"
"testing"
)
Expand Down Expand Up @@ -511,6 +515,61 @@ func TestClusterAdminDescribeConfig(t *testing.T) {
}
}

// TestClusterAdminDescribeBrokerConfig ensures that a describe broker config
// is sent to the broker in the resource struct, _not_ the controller
func TestClusterAdminDescribeBrokerConfig(t *testing.T) {
Logger = log.New(os.Stdout, fmt.Sprintf("[%s] ", t.Name()), log.LstdFlags)
defer func() { Logger = log.New(ioutil.Discard, "[Sarama] ", log.LstdFlags) }()

controllerBroker := NewMockBroker(t, 1)
defer controllerBroker.Close()
configBroker := NewMockBroker(t, 2)
defer configBroker.Close()

controllerBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(controllerBroker.BrokerID()).
SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
SetBroker(configBroker.Addr(), configBroker.BrokerID()),
})

configBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(controllerBroker.BrokerID()).
SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
SetBroker(configBroker.Addr(), configBroker.BrokerID()),
"DescribeConfigsRequest": NewMockDescribeConfigsResponse(t),
})

config := NewConfig()
config.Version = V1_0_0_0
admin, err := NewClusterAdmin(
[]string{
controllerBroker.Addr(),
configBroker.Addr(),
}, config)
if err != nil {
t.Fatal(err)
}

for _, resourceType := range []ConfigResourceType{BrokerResource, BrokerLoggerResource} {
resource := ConfigResource{Name: "2", Type: resourceType}
entries, err := admin.DescribeConfig(resource)
if err != nil {
t.Fatal(err)
}

if len(entries) <= 0 {
t.Fatal(errors.New("no resource present"))
}
}

err = admin.Close()
if err != nil {
t.Fatal(err)
}
}

func TestClusterAdminAlterConfig(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()
Expand Down Expand Up @@ -544,6 +603,60 @@ func TestClusterAdminAlterConfig(t *testing.T) {
}
}

func TestClusterAdminAlterBrokerConfig(t *testing.T) {
controllerBroker := NewMockBroker(t, 1)
defer controllerBroker.Close()
configBroker := NewMockBroker(t, 2)
defer configBroker.Close()

controllerBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(controllerBroker.BrokerID()).
SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
SetBroker(configBroker.Addr(), configBroker.BrokerID()),
})
configBroker.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(controllerBroker.BrokerID()).
SetBroker(controllerBroker.Addr(), controllerBroker.BrokerID()).
SetBroker(configBroker.Addr(), configBroker.BrokerID()),
"AlterConfigsRequest": NewMockAlterConfigsResponse(t),
})

config := NewConfig()
config.Version = V1_0_0_0
admin, err := NewClusterAdmin(
[]string{
controllerBroker.Addr(),
configBroker.Addr(),
}, config)
if err != nil {
t.Fatal(err)
}

var value string
entries := make(map[string]*string)
value = "3"
entries["min.insync.replicas"] = &value

for _, resourceType := range []ConfigResourceType{BrokerResource, BrokerLoggerResource} {
resource := ConfigResource{Name: "2", Type: resourceType}
err = admin.AlterConfig(
resource.Type,
resource.Name,
entries,
false)
if err != nil {
t.Fatal(err)
}
}

err = admin.Close()
if err != nil {
t.Fatal(err)
}
}

func TestClusterAdminCreateAcl(t *testing.T) {
seedBroker := NewMockBroker(t, 1)
defer seedBroker.Close()
Expand Down
26 changes: 11 additions & 15 deletions config_resource_type.go
Original file line number Diff line number Diff line change
@@ -1,22 +1,18 @@
package sarama

//ConfigResourceType is a type for config resource
// ConfigResourceType is a type for resources that have configs.
type ConfigResourceType int8

// Taken from :
// https://cwiki.apache.org/confluence/display/KAFKA/KIP-133%3A+Describe+and+Alter+Configs+Admin+APIs#KIP-133:DescribeandAlterConfigsAdminAPIs-WireFormattypes
// Taken from:
// https://github.com/apache/kafka/blob/ed7c071e07f1f90e4c2895582f61ca090ced3c42/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java#L32-L55

const (
//UnknownResource constant type
UnknownResource ConfigResourceType = iota
//AnyResource constant type
AnyResource
//TopicResource constant type
TopicResource
//GroupResource constant type
GroupResource
//ClusterResource constant type
ClusterResource
//BrokerResource constant type
BrokerResource
// UnknownResource constant type
UnknownResource ConfigResourceType = 0
// TopicResource constant type
TopicResource ConfigResourceType = 2
// BrokerResource constant type
BrokerResource ConfigResourceType = 4
// BrokerLoggerResource constant type
BrokerLoggerResource ConfigResourceType = 8
)
28 changes: 27 additions & 1 deletion mockresponses.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,6 +736,32 @@ func (mr *MockDescribeConfigsResponse) For(reqBody versionedDecoder) encoder {
for _, r := range req.Resources {
var configEntries []*ConfigEntry
switch r.Type {
case BrokerResource:
configEntries = append(configEntries,
&ConfigEntry{
Name: "min.insync.replicas",
Value: "2",
ReadOnly: false,
Default: false,
},
)
res.Resources = append(res.Resources, &ResourceResponse{
Name: r.Name,
Configs: configEntries,
})
case BrokerLoggerResource:
configEntries = append(configEntries,
&ConfigEntry{
Name: "kafka.controller.KafkaController",
Value: "DEBUG",
ReadOnly: false,
Default: false,
},
)
res.Resources = append(res.Resources, &ResourceResponse{
Name: r.Name,
Configs: configEntries,
})
case TopicResource:
configEntries = append(configEntries,
&ConfigEntry{Name: "max.message.bytes",
Expand Down Expand Up @@ -777,7 +803,7 @@ func (mr *MockAlterConfigsResponse) For(reqBody versionedDecoder) encoder {

for _, r := range req.Resources {
res.Resources = append(res.Resources, &AlterConfigsResourceResponse{Name: r.Name,
Type: TopicResource,
Type: r.Type,
ErrorMsg: "",
})
}
Expand Down

0 comments on commit fb8b9b5

Please sign in to comment.