diff --git a/consumer.go b/consumer.go index f68eb1d2b..1cb910deb 100644 --- a/consumer.go +++ b/consumer.go @@ -132,16 +132,17 @@ func (c *consumer) Partitions(topic string) ([]int32, error) { func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) { child := &partitionConsumer{ - consumer: c, - conf: c.conf, - topic: topic, - partition: partition, - messages: make(chan *ConsumerMessage, c.conf.ChannelBufferSize), - errors: make(chan *ConsumerError, c.conf.ChannelBufferSize), - feeder: make(chan *FetchResponse, 1), - trigger: make(chan none, 1), - dying: make(chan none), - fetchSize: c.conf.Consumer.Fetch.Default, + consumer: c, + conf: c.conf, + topic: topic, + partition: partition, + messages: make(chan *ConsumerMessage, c.conf.ChannelBufferSize), + errors: make(chan *ConsumerError, c.conf.ChannelBufferSize), + feeder: make(chan *FetchResponse, 1), + preferredReadReplica: invalidPreferredReplicaID, + trigger: make(chan none, 1), + dying: make(chan none), + fetchSize: c.conf.Consumer.Fetch.Default, } if err := child.chooseStartingOffset(offset); err != nil { @@ -605,7 +606,9 @@ func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*Consu consumerBatchSizeMetric.Update(int64(nRecs)) - child.preferredReadReplica = block.PreferredReadReplica + if block.PreferredReadReplica != invalidPreferredReplicaID { + child.preferredReadReplica = block.PreferredReadReplica + } if nRecs == 0 { partialTrailingMessage, err := block.isPartial() diff --git a/fetch_response.go b/fetch_response.go index 19040c827..1b02b0070 100644 --- a/fetch_response.go +++ b/fetch_response.go @@ -5,6 +5,8 @@ import ( "time" ) +const invalidPreferredReplicaID = -1 + type AbortedTransaction struct { ProducerID int64 FirstOffset int64