From 005a989bf34b3a50566934571c6bd7427f8f69c9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ch=C3=A1bek?= Date: Sat, 4 Dec 2021 15:21:54 +0100 Subject: [PATCH] Fix wrong offsets in mock Consumer --- mocks/consumer.go | 8 +++++- mocks/consumer_test.go | 65 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 1 deletion(-) diff --git a/mocks/consumer.go b/mocks/consumer.go index c10ae4765f..9f6f42503a 100644 --- a/mocks/consumer.go +++ b/mocks/consumer.go @@ -155,7 +155,13 @@ func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset } if c.partitionConsumers[topic][partition] == nil { + highWatermarkOffset := offset + if offset == sarama.OffsetOldest { + highWatermarkOffset = 0 + } + c.partitionConsumers[topic][partition] = &PartitionConsumer{ + highWaterMarkOffset: highWatermarkOffset, t: c.t, topic: topic, partition: partition, @@ -282,7 +288,7 @@ func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) *Partitio msg.Topic = pc.topic msg.Partition = pc.partition - msg.Offset = atomic.AddInt64(&pc.highWaterMarkOffset, 1) + msg.Offset = atomic.AddInt64(&pc.highWaterMarkOffset, 1) - 1 pc.messages <- msg diff --git a/mocks/consumer_test.go b/mocks/consumer_test.go index 9367d6aa17..c46212fcf5 100644 --- a/mocks/consumer_test.go +++ b/mocks/consumer_test.go @@ -247,3 +247,68 @@ func TestConsumerUnexpectedTopicMetadata(t *testing.T) { t.Errorf("Expected an expectation failure to be set on the error reporter.") } } + +func TestConsumerOffsetsAreManagedCorrectlyWithOffsetOldest(t *testing.T) { + trm := newTestReporterMock() + consumer := NewConsumer(trm, NewTestConfig()) + pcmock := consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest) + pcmock.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello")}) + pcmock.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello")}) + pcmock.ExpectMessagesDrainedOnClose() + + pc, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest) + if err != nil { + t.Error(err) + } + + message1 := <-pc.Messages() + if message1.Offset != 0 { + t.Errorf("Expected offset of first message in the partition to be 0, got %d", message1.Offset) + } + + message2 := <-pc.Messages() + if message2.Offset != 1 { + t.Errorf("Expected offset of second message in the partition to be 1, got %d", message2.Offset) + } + + if err := consumer.Close(); err != nil { + t.Error(err) + } + + if len(trm.errors) != 0 { + t.Errorf("Expected to not report any errors, found: %v", trm.errors) + } +} + +func TestConsumerOffsetsAreManagedCorrectlyWithSpecifiedOffset(t *testing.T) { + startingOffset := int64(123) + trm := newTestReporterMock() + consumer := NewConsumer(trm, NewTestConfig()) + pcmock := consumer.ExpectConsumePartition("test", 0, startingOffset) + pcmock.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello")}) + pcmock.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello")}) + pcmock.ExpectMessagesDrainedOnClose() + + pc, err := consumer.ConsumePartition("test", 0, startingOffset) + if err != nil { + t.Error(err) + } + + message1 := <-pc.Messages() + if message1.Offset != startingOffset { + t.Errorf("Expected offset of first message to be %d, got %d", startingOffset, message1.Offset) + } + + message2 := <-pc.Messages() + if message2.Offset != startingOffset+1 { + t.Errorf("Expected offset of second message to be %d, got %d", startingOffset+1, message2.Offset) + } + + if err := consumer.Close(); err != nil { + t.Error(err) + } + + if len(trm.errors) != 0 { + t.Errorf("Expected to not report any errors, found: %v", trm.errors) + } +}