Skip to content

Commit

Permalink
Fix strimzi#204: Have the consuming client refresh its metadata if th…
Browse files Browse the repository at this point in the history
…e partition leadership changes

This minimises the time the canary spends consumer from followers after the partition leadership changes on the broker, and so avoid end
to end latency measure being skewed.

Signed-off-by: kwall <[email protected]>
  • Loading branch information
k-wall committed Nov 25, 2022
1 parent e4a6201 commit 37861d2
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 14 deletions.
34 changes: 34 additions & 0 deletions internal/services/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,12 @@ var (
Namespace: "strimzi_canary",
Help: "The total number of consumers not joining the group within the timeout",
}, []string{"clientid"})

refreshConsumerMetadataError = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "consumer_refresh_metadata_error_total",
Namespace: "strimzi_canary",
Help: "Total number of errors while refreshing consumer metadata",
}, []string{"clientid"})
)

// ConsumerService defines the service for consuming messages
Expand Down Expand Up @@ -230,3 +236,31 @@ func (cgh *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSessio
}
return nil
}

// Refresh does a refresh metadata on the underneath Sarama client
func (cs *ConsumerService) Refresh() {
glog.Infof("Consumer refreshing metadata")
if err := cs.client.RefreshMetadata(cs.canaryConfig.Topic); err != nil {
labels := prometheus.Labels{
"clientid": cs.canaryConfig.ClientID,
}
refreshConsumerMetadataError.With(labels).Inc()
glog.Errorf("Error refreshing metadata in consumer: %v", err)
}
}

func (cs *ConsumerService) Leaders() (map[int32]int32, error) {
partitions, err := cs.client.Partitions(cs.canaryConfig.Topic)
if err != nil {
return nil, err
}
leaders := make(map[int32]int32, len(partitions))
for _, p := range partitions {
leader, err := cs.client.Leader(cs.canaryConfig.Topic, p)
if err != nil {
return nil, err
}
leaders[p] = leader.ID()
}
return leaders, err
}
4 changes: 2 additions & 2 deletions internal/services/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var (
// it's defined when the service is created because buckets are configurable
recordsProducedLatency *prometheus.HistogramVec

refreshMetadataError = promauto.NewCounterVec(prometheus.CounterOpts{
refreshProducerMetadataError = promauto.NewCounterVec(prometheus.CounterOpts{
Name: "producer_refresh_metadata_error_total",
Namespace: "strimzi_canary",
Help: "Total number of errors while refreshing producer metadata",
Expand Down Expand Up @@ -118,7 +118,7 @@ func (ps *ProducerService) Refresh() {
labels := prometheus.Labels{
"clientid": ps.canaryConfig.ClientID,
}
refreshMetadataError.With(labels).Inc()
refreshProducerMetadataError.With(labels).Inc()
glog.Errorf("Error refreshing metadata in producer: %v", err)
}
}
Expand Down
46 changes: 35 additions & 11 deletions internal/services/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import (
type TopicReconcileResult struct {
// new partitions assignments across brokers
Assignments map[int32][]int32
// partition to leader assignments
Leaders map[int32]int32
// if a refresh metadata is needed
RefreshMetadata bool
RefreshProducerMetadata bool
}

// TopicService defines the service for canary topic management
Expand Down Expand Up @@ -106,14 +108,14 @@ func (ts *TopicService) Reconcile() (TopicReconcileResult, error) {
if err != nil && util.IsDisconnection(err) {
// Kafka brokers close connection to the topic service admin client not able to recover
// Sarama issues: https://github.com/Shopify/sarama/issues/2042, https://github.com/Shopify/sarama/issues/1796
// Workaround closing the topic service with its admin client and the reopen on next reconcile
// Workaround closing the topic service with its admin client and then reopen on next reconcile
ts.Close()
}
return result, err
}

func (ts *TopicService) reconcileTopic() (TopicReconcileResult, error) {
result := TopicReconcileResult{nil, false}
result := TopicReconcileResult{}

if ts.admin == nil {
glog.Infof("Creating Sarama cluster admin")
Expand All @@ -134,16 +136,10 @@ func (ts *TopicService) reconcileTopic() (TopicReconcileResult, error) {
return result, err
}

metadata, err := ts.admin.DescribeTopics([]string{ts.canaryConfig.Topic})
topicMetadata, err := ts.describeCanaryTopic()
if err != nil {
labels := prometheus.Labels{
"topic": ts.canaryConfig.Topic,
}
describeTopicError.With(labels).Inc()
glog.Errorf("Error retrieving metadata for topic %s: %v", ts.canaryConfig.Topic, err)
return result, err
}
topicMetadata := metadata[0]

if errors.Is(topicMetadata.Err, sarama.ErrUnknownTopicOrPartition) {

Expand All @@ -161,6 +157,11 @@ func (ts *TopicService) reconcileTopic() (TopicReconcileResult, error) {
return result, err
}
glog.Infof("The canary topic %s was created", topicMetadata.Name)

topicMetadata, err = ts.describeCanaryTopic()
if err != nil {
return result, err
}
} else {
glog.Warningf("The canary topic %s wasn't created. Expected brokers %d, Actual brokers %d",
topicMetadata.Name, ts.canaryConfig.ExpectedClusterSize, len(brokers))
Expand Down Expand Up @@ -189,7 +190,7 @@ func (ts *TopicService) reconcileTopic() (TopicReconcileResult, error) {
if ts.isDynamicReassignmentEnabled() || (!ts.initialized && ts.canaryConfig.ExpectedClusterSize == len(brokers)) {

glog.Infof("Going to reassign topic partitions if needed")
result.RefreshMetadata = len(brokers) != len(topicMetadata.Partitions)
result.RefreshProducerMetadata = len(brokers) != len(topicMetadata.Partitions)
if result.Assignments, err = ts.alterTopicAssignments(len(topicMetadata.Partitions), brokers); err != nil {
labels := prometheus.Labels{
"topic": topicMetadata.Name,
Expand All @@ -212,10 +213,25 @@ func (ts *TopicService) reconcileTopic() (TopicReconcileResult, error) {
return result, topicMetadata.Err
}

result.Leaders = ts.currentLeaders(topicMetadata)
ts.initialized = true
return result, err
}

func (ts *TopicService) describeCanaryTopic() (*sarama.TopicMetadata, error) {
metadata, err := ts.admin.DescribeTopics([]string{ts.canaryConfig.Topic})
if err != nil {
labels := prometheus.Labels{
"topic": ts.canaryConfig.Topic,
}
describeTopicError.With(labels).Inc()
glog.Errorf("Error retrieving metadata for topic %s: %v", ts.canaryConfig.Topic, err)
return nil, err
}
topicMetadata := metadata[0]
return topicMetadata, nil
}

// Close closes the underneath Sarama admin instance
func (ts *TopicService) Close() {
glog.Infof("Closing topic service")
Expand Down Expand Up @@ -392,6 +408,14 @@ func (ts *TopicService) currentAssignments(topicMetadata *sarama.TopicMetadata)
return assignments
}

func (ts *TopicService) currentLeaders(topicMetadata *sarama.TopicMetadata) map[int32]int32 {
leaders := make(map[int32]int32, len(topicMetadata.Partitions))
for _, p := range topicMetadata.Partitions {
leaders[p.ID] = p.Leader
}
return leaders
}

// Alter the replica assignment for the partitions
//
// After the request for the replica assignment, it run a loop for checking if the reassignment is still ongoing
Expand Down
8 changes: 7 additions & 1 deletion internal/workers/canary_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package workers

import (
"reflect"
"sync"
"time"

Expand Down Expand Up @@ -124,9 +125,14 @@ func (cm *CanaryManager) reconcile() {
glog.Infof("Canary manager reconcile ...")

if result, err := cm.topicService.Reconcile(); err == nil {
if result.RefreshMetadata {
if result.RefreshProducerMetadata {
cm.producerService.Refresh()
}

leaders, err := cm.consumerService.Leaders()
if err != nil || !reflect.DeepEqual(result.Leaders, leaders) {
cm.consumerService.Refresh()
}
// producer has to send to partitions assigned to brokers
cm.producerService.Send(result.Assignments)
}
Expand Down

0 comments on commit 37861d2

Please sign in to comment.