diff --git a/internal/services/consumer.go b/internal/services/consumer.go index 037cb8b..e068633 100644 --- a/internal/services/consumer.go +++ b/internal/services/consumer.go @@ -57,6 +57,12 @@ var ( Namespace: "strimzi_canary", Help: "The total number of consumers not joining the group within the timeout", }, []string{"clientid"}) + + refreshConsumerMetadataError = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "consumer_refresh_metadata_error_total", + Namespace: "strimzi_canary", + Help: "Total number of errors while refreshing consumer metadata", + }, []string{"clientid"}) ) // ConsumerService defines the service for consuming messages @@ -230,3 +236,31 @@ func (cgh *consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSessio } return nil } + +// Refresh does a refresh metadata on the underneath Sarama client +func (cs *ConsumerService) Refresh() { + glog.Infof("Consumer refreshing metadata") + if err := cs.client.RefreshMetadata(cs.canaryConfig.Topic); err != nil { + labels := prometheus.Labels{ + "clientid": cs.canaryConfig.ClientID, + } + refreshConsumerMetadataError.With(labels).Inc() + glog.Errorf("Error refreshing metadata in consumer: %v", err) + } +} + +func (cs *ConsumerService) Leaders() (map[int32]int32, error) { + partitions, err := cs.client.Partitions(cs.canaryConfig.Topic) + if err != nil { + return nil, err + } + leaders := make(map[int32]int32, len(partitions)) + for _, p := range partitions { + leader, err := cs.client.Leader(cs.canaryConfig.Topic, p) + if err != nil { + return nil, err + } + leaders[p] = leader.ID() + } + return leaders, err +} diff --git a/internal/services/producer.go b/internal/services/producer.go index e34663d..f4dae0e 100644 --- a/internal/services/producer.go +++ b/internal/services/producer.go @@ -40,7 +40,7 @@ var ( // it's defined when the service is created because buckets are configurable recordsProducedLatency *prometheus.HistogramVec - refreshMetadataError = promauto.NewCounterVec(prometheus.CounterOpts{ + refreshProducerMetadataError = promauto.NewCounterVec(prometheus.CounterOpts{ Name: "producer_refresh_metadata_error_total", Namespace: "strimzi_canary", Help: "Total number of errors while refreshing producer metadata", @@ -118,7 +118,7 @@ func (ps *ProducerService) Refresh() { labels := prometheus.Labels{ "clientid": ps.canaryConfig.ClientID, } - refreshMetadataError.With(labels).Inc() + refreshProducerMetadataError.With(labels).Inc() glog.Errorf("Error refreshing metadata in producer: %v", err) } } diff --git a/internal/services/topic.go b/internal/services/topic.go index 313e8cb..dddac5a 100644 --- a/internal/services/topic.go +++ b/internal/services/topic.go @@ -24,8 +24,10 @@ import ( type TopicReconcileResult struct { // new partitions assignments across brokers Assignments map[int32][]int32 + // partition to leader assignments + Leaders map[int32]int32 // if a refresh metadata is needed - RefreshMetadata bool + RefreshProducerMetadata bool } // TopicService defines the service for canary topic management @@ -106,14 +108,14 @@ func (ts *TopicService) Reconcile() (TopicReconcileResult, error) { if err != nil && util.IsDisconnection(err) { // Kafka brokers close connection to the topic service admin client not able to recover // Sarama issues: https://github.com/Shopify/sarama/issues/2042, https://github.com/Shopify/sarama/issues/1796 - // Workaround closing the topic service with its admin client and the reopen on next reconcile + // Workaround closing the topic service with its admin client and then reopen on next reconcile ts.Close() } return result, err } func (ts *TopicService) reconcileTopic() (TopicReconcileResult, error) { - result := TopicReconcileResult{nil, false} + result := TopicReconcileResult{} if ts.admin == nil { glog.Infof("Creating Sarama cluster admin") @@ -134,16 +136,10 @@ func (ts *TopicService) reconcileTopic() (TopicReconcileResult, error) { return result, err } - metadata, err := ts.admin.DescribeTopics([]string{ts.canaryConfig.Topic}) + topicMetadata, err := ts.describeCanaryTopic() if err != nil { - labels := prometheus.Labels{ - "topic": ts.canaryConfig.Topic, - } - describeTopicError.With(labels).Inc() - glog.Errorf("Error retrieving metadata for topic %s: %v", ts.canaryConfig.Topic, err) return result, err } - topicMetadata := metadata[0] if errors.Is(topicMetadata.Err, sarama.ErrUnknownTopicOrPartition) { @@ -161,6 +157,11 @@ func (ts *TopicService) reconcileTopic() (TopicReconcileResult, error) { return result, err } glog.Infof("The canary topic %s was created", topicMetadata.Name) + + topicMetadata, err = ts.describeCanaryTopic() + if err != nil { + return result, err + } } else { glog.Warningf("The canary topic %s wasn't created. Expected brokers %d, Actual brokers %d", topicMetadata.Name, ts.canaryConfig.ExpectedClusterSize, len(brokers)) @@ -189,7 +190,7 @@ func (ts *TopicService) reconcileTopic() (TopicReconcileResult, error) { if ts.isDynamicReassignmentEnabled() || (!ts.initialized && ts.canaryConfig.ExpectedClusterSize == len(brokers)) { glog.Infof("Going to reassign topic partitions if needed") - result.RefreshMetadata = len(brokers) != len(topicMetadata.Partitions) + result.RefreshProducerMetadata = len(brokers) != len(topicMetadata.Partitions) if result.Assignments, err = ts.alterTopicAssignments(len(topicMetadata.Partitions), brokers); err != nil { labels := prometheus.Labels{ "topic": topicMetadata.Name, @@ -212,10 +213,25 @@ func (ts *TopicService) reconcileTopic() (TopicReconcileResult, error) { return result, topicMetadata.Err } + result.Leaders = ts.currentLeaders(topicMetadata) ts.initialized = true return result, err } +func (ts *TopicService) describeCanaryTopic() (*sarama.TopicMetadata, error) { + metadata, err := ts.admin.DescribeTopics([]string{ts.canaryConfig.Topic}) + if err != nil { + labels := prometheus.Labels{ + "topic": ts.canaryConfig.Topic, + } + describeTopicError.With(labels).Inc() + glog.Errorf("Error retrieving metadata for topic %s: %v", ts.canaryConfig.Topic, err) + return nil, err + } + topicMetadata := metadata[0] + return topicMetadata, nil +} + // Close closes the underneath Sarama admin instance func (ts *TopicService) Close() { glog.Infof("Closing topic service") @@ -392,6 +408,14 @@ func (ts *TopicService) currentAssignments(topicMetadata *sarama.TopicMetadata) return assignments } +func (ts *TopicService) currentLeaders(topicMetadata *sarama.TopicMetadata) map[int32]int32 { + leaders := make(map[int32]int32, len(topicMetadata.Partitions)) + for _, p := range topicMetadata.Partitions { + leaders[p.ID] = p.Leader + } + return leaders +} + // Alter the replica assignment for the partitions // // After the request for the replica assignment, it run a loop for checking if the reassignment is still ongoing diff --git a/internal/workers/canary_manager.go b/internal/workers/canary_manager.go index 29155dc..4810580 100644 --- a/internal/workers/canary_manager.go +++ b/internal/workers/canary_manager.go @@ -7,6 +7,7 @@ package workers import ( + "reflect" "sync" "time" @@ -124,9 +125,14 @@ func (cm *CanaryManager) reconcile() { glog.Infof("Canary manager reconcile ...") if result, err := cm.topicService.Reconcile(); err == nil { - if result.RefreshMetadata { + if result.RefreshProducerMetadata { cm.producerService.Refresh() } + + leaders, err := cm.consumerService.Leaders() + if err != nil || !reflect.DeepEqual(result.Leaders, leaders) { + cm.consumerService.Refresh() + } // producer has to send to partitions assigned to brokers cm.producerService.Send(result.Assignments) }