From 2ac7122e30a78f472715fd2610212fbec5284961 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ch=C3=A1bek?= Date: Sat, 4 Dec 2021 15:21:54 +0100 Subject: [PATCH] Fix wrong offsets in mock Consumer --- mocks/consumer.go | 8 +++++- mocks/consumer_test.go | 65 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 72 insertions(+), 1 deletion(-) diff --git a/mocks/consumer.go b/mocks/consumer.go index baec88b519..3a7e13e65e 100644 --- a/mocks/consumer.go +++ b/mocks/consumer.go @@ -155,7 +155,13 @@ func (c *Consumer) ExpectConsumePartition(topic string, partition int32, offset } if c.partitionConsumers[topic][partition] == nil { + highWatermarkOffset := offset + if offset == sarama.OffsetOldest { + highWatermarkOffset = 0 + } + c.partitionConsumers[topic][partition] = &PartitionConsumer{ + highWaterMarkOffset: highWatermarkOffset, t: c.t, topic: topic, partition: partition, @@ -282,7 +288,7 @@ func (pc *PartitionConsumer) YieldMessage(msg *sarama.ConsumerMessage) { msg.Topic = pc.topic msg.Partition = pc.partition - msg.Offset = atomic.AddInt64(&pc.highWaterMarkOffset, 1) + msg.Offset = atomic.AddInt64(&pc.highWaterMarkOffset, 1) - 1 pc.messages <- msg } diff --git a/mocks/consumer_test.go b/mocks/consumer_test.go index e1202cc7aa..69044ffdbc 100644 --- a/mocks/consumer_test.go +++ b/mocks/consumer_test.go @@ -247,3 +247,68 @@ func TestConsumerUnexpectedTopicMetadata(t *testing.T) { t.Errorf("Expected an expectation failure to be set on the error reporter.") } } + +func TestConsumerOffsetsAreManagedCorrectlyWithOffsetOldest(t *testing.T) { + trm := newTestReporterMock() + consumer := NewConsumer(trm, NewTestConfig()) + pcmock := consumer.ExpectConsumePartition("test", 0, sarama.OffsetOldest) + pcmock.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello")}) + pcmock.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello")}) + pcmock.ExpectMessagesDrainedOnClose() + + pc, err := consumer.ConsumePartition("test", 0, sarama.OffsetOldest) + if err != nil { + t.Error(err) + } + + message1 := <-pc.Messages() + if message1.Offset != 0 { + t.Errorf("Expected offset of first message in the partition to be 0, got %d", message1.Offset) + } + + message2 := <-pc.Messages() + if message2.Offset != 1 { + t.Errorf("Expected offset of second message in the partition to be 1, got %d", message2.Offset) + } + + if err := consumer.Close(); err != nil { + t.Error(err) + } + + if len(trm.errors) != 0 { + t.Errorf("Expected to not report any errors, found: %v", trm.errors) + } +} + +func TestConsumerOffsetsAreManagedCorrectlyWithSpecifiedOffset(t *testing.T) { + startingOffset := int64(123) + trm := newTestReporterMock() + consumer := NewConsumer(trm, NewTestConfig()) + pcmock := consumer.ExpectConsumePartition("test", 0, startingOffset) + pcmock.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello")}) + pcmock.YieldMessage(&sarama.ConsumerMessage{Value: []byte("hello")}) + pcmock.ExpectMessagesDrainedOnClose() + + pc, err := consumer.ConsumePartition("test", 0, startingOffset) + if err != nil { + t.Error(err) + } + + message1 := <-pc.Messages() + if message1.Offset != startingOffset { + t.Errorf("Expected offset of first message to be %d, got %d", startingOffset, message1.Offset) + } + + message2 := <-pc.Messages() + if message2.Offset != startingOffset+1 { + t.Errorf("Expected offset of second message to be %d, got %d", startingOffset+1, message2.Offset) + } + + if err := consumer.Close(); err != nil { + t.Error(err) + } + + if len(trm.errors) != 0 { + t.Errorf("Expected to not report any errors, found: %v", trm.errors) + } +}