Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: retry topic request on ControllerNotAvailable #1586

Merged
merged 1 commit into from
Jan 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 103 additions & 50 deletions admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"math/rand"
"strconv"
"sync"
"time"
)

// ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
Expand Down Expand Up @@ -134,8 +135,45 @@ func (ca *clusterAdmin) Controller() (*Broker, error) {
return ca.client.Controller()
}

func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
func (ca *clusterAdmin) refreshController() (*Broker, error) {
return ca.client.RefreshController()
}

// isErrNoController returns `true` if the given error type unwraps to an
// `ErrNotController` response from Kafka
func isErrNoController(err error) bool {
switch e := err.(type) {
case *TopicError:
return e.Err == ErrNotController
case *TopicPartitionError:
return e.Err == ErrNotController
case KError:
return e == ErrNotController
}
return false
}

// retryOnError will repeatedly call the given (error-returning) func in the
// case that its response is non-nil and retriable (as determined by the
// provided retriable func) up to the maximum number of tries permitted by
// the admin client configuration
func (ca *clusterAdmin) retryOnError(retriable func(error) bool, fn func() error) error {
var err error
for attempt := 0; attempt < ca.conf.Admin.Retry.Max; attempt++ {
err = fn()
if err == nil || !retriable(err) {
return err
}
Logger.Printf(
"admin/request retrying after %dms... (%d attempts remaining)\n",
ca.conf.Admin.Retry.Backoff/time.Millisecond, ca.conf.Admin.Retry.Max-attempt)
time.Sleep(ca.conf.Admin.Retry.Backoff)
continue
}
return err
}

func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
if topic == "" {
return ErrInvalidTopic
}
Expand All @@ -160,26 +198,31 @@ func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateO
request.Version = 2
}

b, err := ca.Controller()
if err != nil {
return err
}
return ca.retryOnError(isErrNoController, func() error {
b, err := ca.Controller()
if err != nil {
return err
}

rsp, err := b.CreateTopics(request)
if err != nil {
return err
}
rsp, err := b.CreateTopics(request)
if err != nil {
return err
}

topicErr, ok := rsp.TopicErrors[topic]
if !ok {
return ErrIncompleteResponse
}
topicErr, ok := rsp.TopicErrors[topic]
if !ok {
return ErrIncompleteResponse
}

if topicErr.Err != ErrNoError {
return topicErr
}
if topicErr.Err != ErrNoError {
if topicErr.Err == ErrNotController {
_, _ = ca.refreshController()
}
return topicErr
}

return nil
return nil
})
}

func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) {
Expand Down Expand Up @@ -320,7 +363,6 @@ func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
}

func (ca *clusterAdmin) DeleteTopic(topic string) error {

if topic == "" {
return ErrInvalidTopic
}
Expand All @@ -334,25 +376,31 @@ func (ca *clusterAdmin) DeleteTopic(topic string) error {
request.Version = 1
}

b, err := ca.Controller()
if err != nil {
return err
}
return ca.retryOnError(isErrNoController, func() error {
b, err := ca.Controller()
if err != nil {
return err
}

rsp, err := b.DeleteTopics(request)
if err != nil {
return err
}
rsp, err := b.DeleteTopics(request)
if err != nil {
return err
}

topicErr, ok := rsp.TopicErrorCodes[topic]
if !ok {
return ErrIncompleteResponse
}
topicErr, ok := rsp.TopicErrorCodes[topic]
if !ok {
return ErrIncompleteResponse
}

if topicErr != ErrNoError {
return topicErr
}
return nil
if topicErr != ErrNoError {
if topicErr == ErrNotController {
_, _ = ca.refreshController()
}
return topicErr
}

return nil
})
}

func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
Expand All @@ -368,26 +416,31 @@ func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [
Timeout: ca.conf.Admin.Timeout,
}

b, err := ca.Controller()
if err != nil {
return err
}
return ca.retryOnError(isErrNoController, func() error {
b, err := ca.Controller()
if err != nil {
return err
}

rsp, err := b.CreatePartitions(request)
if err != nil {
return err
}
rsp, err := b.CreatePartitions(request)
if err != nil {
return err
}

topicErr, ok := rsp.TopicPartitionErrors[topic]
if !ok {
return ErrIncompleteResponse
}
topicErr, ok := rsp.TopicPartitionErrors[topic]
if !ok {
return ErrIncompleteResponse
}

if topicErr.Err != ErrNoError {
return topicErr
}
if topicErr.Err != ErrNoError {
if topicErr.Err == ErrNotController {
_, _ = ca.refreshController()
}
dnwe marked this conversation as resolved.
Show resolved Hide resolved
return topicErr
}

return nil
return nil
})
}

func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
Expand Down
48 changes: 48 additions & 0 deletions admin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1104,3 +1104,51 @@ func TestDeleteConsumerGroup(t *testing.T) {
}

}

// TestRefreshMetaDataWithDifferentController ensures that the cached
// controller can be forcibly updated from Metadata by the admin client
func TestRefreshMetaDataWithDifferentController(t *testing.T) {
seedBroker1 := NewMockBroker(t, 1)
seedBroker2 := NewMockBroker(t, 2)
defer seedBroker1.Close()
defer seedBroker2.Close()

seedBroker1.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(seedBroker1.BrokerID()).
SetBroker(seedBroker1.Addr(), seedBroker1.BrokerID()).
SetBroker(seedBroker2.Addr(), seedBroker2.BrokerID()),
})

config := NewConfig()
config.Version = V1_1_0_0

client, err := NewClient([]string{seedBroker1.Addr()}, config)
if err != nil {
t.Fatal(err)
}

ca := clusterAdmin{client: client, conf: config}

if b, _ := ca.Controller(); seedBroker1.BrokerID() != b.ID() {
t.Fatalf("expected cached controller to be %d rather than %d",
seedBroker1.BrokerID(), b.ID())
}

seedBroker1.SetHandlerByMap(map[string]MockResponse{
"MetadataRequest": NewMockMetadataResponse(t).
SetController(seedBroker2.BrokerID()).
SetBroker(seedBroker1.Addr(), seedBroker1.BrokerID()).
SetBroker(seedBroker2.Addr(), seedBroker2.BrokerID()),
})

if b, _ := ca.refreshController(); seedBroker2.BrokerID() != b.ID() {
t.Fatalf("expected refreshed controller to be %d rather than %d",
seedBroker2.BrokerID(), b.ID())
}

if b, _ := ca.Controller(); seedBroker2.BrokerID() != b.ID() {
t.Fatalf("expected cached controller to be %d rather than %d",
seedBroker2.BrokerID(), b.ID())
}
}
37 changes: 36 additions & 1 deletion client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,15 @@ type Client interface {
// altered after it has been created.
Config() *Config

// Controller returns the cluster controller broker. Requires Kafka 0.10 or higher.
// Controller returns the cluster controller broker. It will return a
// locally cached value if it's available. You can call RefreshController
// to update the cached value. Requires Kafka 0.10 or higher.
Controller() (*Broker, error)

// RefreshController retrieves the cluster controller from fresh metadata
// and stores it in the local cache. Requires Kafka 0.10 or higher.
RefreshController() (*Broker, error)

// Brokers returns the current set of active brokers as retrieved from cluster metadata.
Brokers() []*Broker

Expand Down Expand Up @@ -487,6 +493,35 @@ func (client *client) Controller() (*Broker, error) {
return controller, nil
}

// deregisterController removes the cached controllerID
func (client *client) deregisterController() {
client.lock.Lock()
defer client.lock.Unlock()
delete(client.brokers, client.controllerID)
}

// RefreshController retrieves the cluster controller from fresh metadata
// and stores it in the local cache. Requires Kafka 0.10 or higher.
func (client *client) RefreshController() (*Broker, error) {
if client.Closed() {
return nil, ErrClosedClient
}

client.deregisterController()

if err := client.refreshMetadata(); err != nil {
return nil, err
}

controller := client.cachedController()
if controller == nil {
return nil, ErrControllerNotAvailable
}

_ = controller.Open(client.conf)
return controller, nil
}

func (client *client) Coordinator(consumerGroup string) (*Broker, error) {
if client.Closed() {
return nil, ErrClosedClient
Expand Down
9 changes: 9 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,13 @@ var validID = regexp.MustCompile(`\A[A-Za-z0-9._-]+\z`)
type Config struct {
// Admin is the namespace for ClusterAdmin properties used by the administrative Kafka client.
Admin struct {
Retry struct {
// The total number of times to retry sending (retriable) admin requests (default 5).
// Similar to the `retries` setting of the JVM AdminClientConfig.
Max int
// Backoff time between retries of a failed request (default 100ms)
Backoff time.Duration
}
// The maximum duration the administrative Kafka client will wait for ClusterAdmin operations,
// including topics, brokers, configurations and ACLs (defaults to 3 seconds).
Timeout time.Duration
Expand Down Expand Up @@ -408,6 +415,8 @@ type Config struct {
func NewConfig() *Config {
c := &Config{}

c.Admin.Retry.Max = 5
c.Admin.Retry.Backoff = 100 * time.Millisecond
c.Admin.Timeout = 3 * time.Second

c.Net.MaxOpenRequests = 5
Expand Down