Skip to content

Commit

Permalink
Merge pull request #1586 from dnwe/refresh-controller-on-change
Browse files Browse the repository at this point in the history
fix: retry topic request on ControllerNotAvailable
  • Loading branch information
d1egoaz authored Jan 24, 2020
2 parents 30e7094 + 5160d7d commit e4d725d
Show file tree
Hide file tree
Showing 4 changed files with 196 additions and 51 deletions.
153 changes: 103 additions & 50 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math/rand"
"strconv"
"sync"
"time"
)

// ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
Expand Down Expand Up @@ -134,8 +135,45 @@ func (ca *clusterAdmin) Controller() (*Broker, error) {
return ca.client.Controller()
}

func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
func (ca *clusterAdmin) refreshController() (*Broker, error) {
return ca.client.RefreshController()
}

// isErrNoController returns `true` if the given error type unwraps to an
// `ErrNotController` response from Kafka
func isErrNoController(err error) bool {
switch e := err.(type) {
case *TopicError:
return e.Err == ErrNotController
case *TopicPartitionError:
return e.Err == ErrNotController
case KError:
return e == ErrNotController
}
return false
}

// retryOnError will repeatedly call the given (error-returning) func in the
// case that its response is non-nil and retriable (as determined by the
// provided retriable func) up to the maximum number of tries permitted by
// the admin client configuration
func (ca *clusterAdmin) retryOnError(retriable func(error) bool, fn func() error) error {
var err error
for attempt := 0; attempt < ca.conf.Admin.Retry.Max; attempt++ {
err = fn()
if err == nil || !retriable(err) {
return err
}
Logger.Printf(
"admin/request retrying after %dms... (%d attempts remaining)\n",
ca.conf.Admin.Retry.Backoff/time.Millisecond, ca.conf.Admin.Retry.Max-attempt)
time.Sleep(ca.conf.Admin.Retry.Backoff)
continue
}
return err
}

func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
if topic == "" {
return ErrInvalidTopic
}
Expand All @@ -160,26 +198,31 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO
request.Version = 2
}

b, err := ca.Controller()
if err != nil {
return err
}
return ca.retryOnError(isErrNoController, func() error {
b, err := ca.Controller()
if err != nil {
return err
}

rsp, err := b.CreateTopics(request)
if err != nil {
return err
}
rsp, err := b.CreateTopics(request)
if err != nil {
return err
}

topicErr, ok := rsp.TopicErrors[topic]
if !ok {
return ErrIncompleteResponse
}
topicErr, ok := rsp.TopicErrors[topic]
if !ok {
return ErrIncompleteResponse
}

if topicErr.Err != ErrNoError {
return topicErr
}
if topicErr.Err != ErrNoError {
if topicErr.Err == ErrNotController {
_, _ = ca.refreshController()
}
return topicErr
}

return nil
return nil
})
}

func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) {
Expand Down Expand Up @@ -320,7 +363,6 @@ func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
}

func (ca *clusterAdmin) DeleteTopic(topic string) error {

if topic == "" {
return ErrInvalidTopic
}
Expand All @@ -334,25 +376,31 @@ func (ca *clusterAdmin) DeleteTopic(topic string) error {
request.Version = 1
}

b, err := ca.Controller()
if err != nil {
return err
}
return ca.retryOnError(isErrNoController, func() error {
b, err := ca.Controller()
if err != nil {
return err
}

rsp, err := b.DeleteTopics(request)
if err != nil {
return err
}
rsp, err := b.DeleteTopics(request)
if err != nil {
return err
}

topicErr, ok := rsp.TopicErrorCodes[topic]
if !ok {
return ErrIncompleteResponse
}
topicErr, ok := rsp.TopicErrorCodes[topic]
if !ok {
return ErrIncompleteResponse
}

if topicErr != ErrNoError {
return topicErr
}
return nil
if topicErr != ErrNoError {
if topicErr == ErrNotController {
_, _ = ca.refreshController()
}
return topicErr
}

return nil
})
}

func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
Expand All @@ -368,26 +416,31 @@ func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [
Timeout: ca.conf.Admin.Timeout,
}

b, err := ca.Controller()
if err != nil {
return err
}
return ca.retryOnError(isErrNoController, func() error {
b, err := ca.Controller()
if err != nil {
return err
}

rsp, err := b.CreatePartitions(request)
if err != nil {
return err
}
rsp, err := b.CreatePartitions(request)
if err != nil {
return err
}

topicErr, ok := rsp.TopicPartitionErrors[topic]
if !ok {
return ErrIncompleteResponse
}
topicErr, ok := rsp.TopicPartitionErrors[topic]
if !ok {
return ErrIncompleteResponse
}

if topicErr.Err != ErrNoError {
return topicErr
}
if topicErr.Err != ErrNoError {
if topicErr.Err == ErrNotController {
_, _ = ca.refreshController()
}
return topicErr
}

return nil
return nil
})
}

func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
Expand Down
48 changes: 48 additions & 0 deletions admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1104,3 +1104,51 @@ func TestDeleteConsumerGroup(t *testing.T) {
}

}

// TestRefreshMetaDataWithDifferentController ensures that the cached
// controller can be forcibly updated from Metadata by the admin client
func TestRefreshMetaDataWithDifferentController(t *testing.T) {
seedBroker1 := NewMockBroker(t, 1)
seedBroker2 := NewMockBroker(t, 2)
defer seedBroker1.Close()
defer seedBroker2.Close()

seedBroker1.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(seedBroker1.BrokerID()).
SetBroker(seedBroker1.Addr(), seedBroker1.BrokerID()).
SetBroker(seedBroker2.Addr(), seedBroker2.BrokerID()),
})

config := NewConfig()
config.Version = V1_1_0_0

client, err := NewClient([]string{seedBroker1.Addr()}, config)
if err != nil {
t.Fatal(err)
}

ca := clusterAdmin{client: client, conf: config}

if b, _ := ca.Controller(); seedBroker1.BrokerID() != b.ID() {
t.Fatalf("expected cached controller to be %d rather than %d",
seedBroker1.BrokerID(), b.ID())
}

seedBroker1.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(seedBroker2.BrokerID()).
SetBroker(seedBroker1.Addr(), seedBroker1.BrokerID()).
SetBroker(seedBroker2.Addr(), seedBroker2.BrokerID()),
})

if b, _ := ca.refreshController(); seedBroker2.BrokerID() != b.ID() {
t.Fatalf("expected refreshed controller to be %d rather than %d",
seedBroker2.BrokerID(), b.ID())
}

if b, _ := ca.Controller(); seedBroker2.BrokerID() != b.ID() {
t.Fatalf("expected cached controller to be %d rather than %d",
seedBroker2.BrokerID(), b.ID())
}
}
37 changes: 36 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,15 @@ type Client interface {
// altered after it has been created.
Config() *Config

// Controller returns the cluster controller broker. Requires Kafka 0.10 or higher.
// Controller returns the cluster controller broker. It will return a
// locally cached value if it's available. You can call RefreshController
// to update the cached value. Requires Kafka 0.10 or higher.
Controller() (*Broker, error)

// RefreshController retrieves the cluster controller from fresh metadata
// and stores it in the local cache. Requires Kafka 0.10 or higher.
RefreshController() (*Broker, error)

// Brokers returns the current set of active brokers as retrieved from cluster metadata.
Brokers() []*Broker

Expand Down Expand Up @@ -487,6 +493,35 @@ func (client *client) Controller() (*Broker, error) {
return controller, nil
}

// deregisterController removes the cached controllerID
func (client *client) deregisterController() {
client.lock.Lock()
defer client.lock.Unlock()
delete(client.brokers, client.controllerID)
}

// RefreshController retrieves the cluster controller from fresh metadata
// and stores it in the local cache. Requires Kafka 0.10 or higher.
func (client *client) RefreshController() (*Broker, error) {
if client.Closed() {
return nil, ErrClosedClient
}

client.deregisterController()

if err := client.refreshMetadata(); err != nil {
return nil, err
}

controller := client.cachedController()
if controller == nil {
return nil, ErrControllerNotAvailable
}

_ = controller.Open(client.conf)
return controller, nil
}

func (client *client) Coordinator(consumerGroup string) (*Broker, error) {
if client.Closed() {
return nil, ErrClosedClient
Expand Down
9 changes: 9 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ var validID = regexp.MustCompile(`\A[A-Za-z0-9._-]+\z`)
type Config struct {
// Admin is the namespace for ClusterAdmin properties used by the administrative Kafka client.
Admin struct {
Retry struct {
// The total number of times to retry sending (retriable) admin requests (default 5).
// Similar to the `retries` setting of the JVM AdminClientConfig.
Max int
// Backoff time between retries of a failed request (default 100ms)
Backoff time.Duration
}
// The maximum duration the administrative Kafka client will wait for ClusterAdmin operations,
// including topics, brokers, configurations and ACLs (defaults to 3 seconds).
Timeout time.Duration
Expand Down Expand Up @@ -408,6 +415,8 @@ type Config struct {
func NewConfig() *Config {
c := &Config{}

c.Admin.Retry.Max = 5
c.Admin.Retry.Backoff = 100 * time.Millisecond
c.Admin.Timeout = 3 * time.Second

c.Net.MaxOpenRequests = 5
Expand Down

0 comments on commit e4d725d

Please sign in to comment.