From 37861d2b821ae0069e3bc4c34bc6789e5fb37527 Mon Sep 17 00:00:00 2001 From: kwall Date: Fri, 25 Nov 2022 15:56:15 +0000 Subject: [PATCH] Fix #204: Have the consuming client refresh its metadata if the partition leadership changes This minimises the time the canary spends consumer from followers after the partition leadership changes on the broker, and so avoid end to end latency measure being skewed. Signed-off-by: kwall --- internal/services/consumer.go | 34 ++++++++++++++++++++++ internal/services/producer.go | 4 +-- internal/services/topic.go | 46 +++++++++++++++++++++++------- internal/workers/canary_manager.go | 8 +++++- 4 files changed, 78 insertions(+), 14 deletions(-) diff --git a/internal/services/consumer.go b/internal/services/consumer.go index 037cb8b..e068633 100644 --- a/internal/services/consumer.go +++ b/internal/services/consumer.go @@ -57,6 +57,12 @@ var ( Namespace: "strimzi_canary", Help: "The total number of consumers not joining the group within the timeout", }, []string{"clientid"}) + + refreshConsumerMetadataError = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "consumer_refresh_metadata_error_total", + Namespace: "strimzi_canary", + Help: "Total number of errors while refreshing consumer metadata", + }, []string{"clientid"}) ) // ConsumerService defines the service for consuming messages @@ -230,3 +236,31 @@ func (cgh *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSessio } return nil } + +// Refresh does a refresh metadata on the underneath Sarama client +func (cs *ConsumerService) Refresh() { + glog.Infof("Consumer refreshing metadata") + if err := cs.client.RefreshMetadata(cs.canaryConfig.Topic); err != nil { + labels := prometheus.Labels{ + "clientid": cs.canaryConfig.ClientID, + } + refreshConsumerMetadataError.With(labels).Inc() + glog.Errorf("Error refreshing metadata in consumer: %v", err) + } +} + +func (cs *ConsumerService) Leaders() (map[int32]int32, error) { + partitions, err := cs.client.Partitions(cs.canaryConfig.Topic) + if err != nil { + return nil, err + } + leaders := make(map[int32]int32, len(partitions)) + for _, p := range partitions { + leader, err := cs.client.Leader(cs.canaryConfig.Topic, p) + if err != nil { + return nil, err + } + leaders[p] = leader.ID() + } + return leaders, err +} diff --git a/internal/services/producer.go b/internal/services/producer.go index e34663d..f4dae0e 100644 --- a/internal/services/producer.go +++ b/internal/services/producer.go @@ -40,7 +40,7 @@ var ( // it's defined when the service is created because buckets are configurable recordsProducedLatency *prometheus.HistogramVec - refreshMetadataError = promauto.NewCounterVec(prometheus.CounterOpts{ + refreshProducerMetadataError = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "producer_refresh_metadata_error_total", Namespace: "strimzi_canary", Help: "Total number of errors while refreshing producer metadata", @@ -118,7 +118,7 @@ func (ps *ProducerService) Refresh() { labels := prometheus.Labels{ "clientid": ps.canaryConfig.ClientID, } - refreshMetadataError.With(labels).Inc() + refreshProducerMetadataError.With(labels).Inc() glog.Errorf("Error refreshing metadata in producer: %v", err) } } diff --git a/internal/services/topic.go b/internal/services/topic.go index 313e8cb..dddac5a 100644 --- a/internal/services/topic.go +++ b/internal/services/topic.go @@ -24,8 +24,10 @@ import ( type TopicReconcileResult struct { // new partitions assignments across brokers Assignments map[int32][]int32 + // partition to leader assignments + Leaders map[int32]int32 // if a refresh metadata is needed - RefreshMetadata bool + RefreshProducerMetadata bool } // TopicService defines the service for canary topic management @@ -106,14 +108,14 @@ func (ts *TopicService) Reconcile() (TopicReconcileResult, error) { if err != nil && util.IsDisconnection(err) { // Kafka brokers close connection to the topic service admin client not able to recover // Sarama issues: https://github.com/Shopify/sarama/issues/2042, https://github.com/Shopify/sarama/issues/1796 - // Workaround closing the topic service with its admin client and the reopen on next reconcile + // Workaround closing the topic service with its admin client and then reopen on next reconcile ts.Close() } return result, err } func (ts *TopicService) reconcileTopic() (TopicReconcileResult, error) { - result := TopicReconcileResult{nil, false} + result := TopicReconcileResult{} if ts.admin == nil { glog.Infof("Creating Sarama cluster admin") @@ -134,16 +136,10 @@ func (ts *TopicService) reconcileTopic() (TopicReconcileResult, error) { return result, err } - metadata, err := ts.admin.DescribeTopics([]string{ts.canaryConfig.Topic}) + topicMetadata, err := ts.describeCanaryTopic() if err != nil { - labels := prometheus.Labels{ - "topic": ts.canaryConfig.Topic, - } - describeTopicError.With(labels).Inc() - glog.Errorf("Error retrieving metadata for topic %s: %v", ts.canaryConfig.Topic, err) return result, err } - topicMetadata := metadata[0] if errors.Is(topicMetadata.Err, sarama.ErrUnknownTopicOrPartition) { @@ -161,6 +157,11 @@ func (ts *TopicService) reconcileTopic() (TopicReconcileResult, error) { return result, err } glog.Infof("The canary topic %s was created", topicMetadata.Name) + + topicMetadata, err = ts.describeCanaryTopic() + if err != nil { + return result, err + } } else { glog.Warningf("The canary topic %s wasn't created. Expected brokers %d, Actual brokers %d", topicMetadata.Name, ts.canaryConfig.ExpectedClusterSize, len(brokers)) @@ -189,7 +190,7 @@ func (ts *TopicService) reconcileTopic() (TopicReconcileResult, error) { if ts.isDynamicReassignmentEnabled() || (!ts.initialized && ts.canaryConfig.ExpectedClusterSize == len(brokers)) { glog.Infof("Going to reassign topic partitions if needed") - result.RefreshMetadata = len(brokers) != len(topicMetadata.Partitions) + result.RefreshProducerMetadata = len(brokers) != len(topicMetadata.Partitions) if result.Assignments, err = ts.alterTopicAssignments(len(topicMetadata.Partitions), brokers); err != nil { labels := prometheus.Labels{ "topic": topicMetadata.Name, @@ -212,10 +213,25 @@ func (ts *TopicService) reconcileTopic() (TopicReconcileResult, error) { return result, topicMetadata.Err } + result.Leaders = ts.currentLeaders(topicMetadata) ts.initialized = true return result, err } +func (ts *TopicService) describeCanaryTopic() (*sarama.TopicMetadata, error) { + metadata, err := ts.admin.DescribeTopics([]string{ts.canaryConfig.Topic}) + if err != nil { + labels := prometheus.Labels{ + "topic": ts.canaryConfig.Topic, + } + describeTopicError.With(labels).Inc() + glog.Errorf("Error retrieving metadata for topic %s: %v", ts.canaryConfig.Topic, err) + return nil, err + } + topicMetadata := metadata[0] + return topicMetadata, nil +} + // Close closes the underneath Sarama admin instance func (ts *TopicService) Close() { glog.Infof("Closing topic service") @@ -392,6 +408,14 @@ func (ts *TopicService) currentAssignments(topicMetadata *sarama.TopicMetadata) return assignments } +func (ts *TopicService) currentLeaders(topicMetadata *sarama.TopicMetadata) map[int32]int32 { + leaders := make(map[int32]int32, len(topicMetadata.Partitions)) + for _, p := range topicMetadata.Partitions { + leaders[p.ID] = p.Leader + } + return leaders +} + // Alter the replica assignment for the partitions // // After the request for the replica assignment, it run a loop for checking if the reassignment is still ongoing diff --git a/internal/workers/canary_manager.go b/internal/workers/canary_manager.go index 29155dc..4810580 100644 --- a/internal/workers/canary_manager.go +++ b/internal/workers/canary_manager.go @@ -7,6 +7,7 @@ package workers import ( + "reflect" "sync" "time" @@ -124,9 +125,14 @@ func (cm *CanaryManager) reconcile() { glog.Infof("Canary manager reconcile ...") if result, err := cm.topicService.Reconcile(); err == nil { - if result.RefreshMetadata { + if result.RefreshProducerMetadata { cm.producerService.Refresh() } + + leaders, err := cm.consumerService.Leaders() + if err != nil || !reflect.DeepEqual(result.Leaders, leaders) { + cm.consumerService.Refresh() + } // producer has to send to partitions assigned to brokers cm.producerService.Send(result.Assignments) }