From aca9acd3611cc24c8ed5d89e2989d1ab3dab7a1a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ch=C3=A1bek?= Date: Sun, 5 Dec 2021 11:56:58 +0100 Subject: [PATCH 1/3] Add test for ConsumePartition with OffsetOldest Also refactor similiar tests and fix docs --- consumer_test.go | 87 ++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 74 insertions(+), 13 deletions(-) diff --git a/consumer_test.go b/consumer_test.go index 7f9ff2c2e..8d8c33b8c 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -20,18 +20,28 @@ func TestConsumerOffsetManual(t *testing.T) { // Given broker0 := NewMockBroker(t, 0) + manualOffset := int64(1234) + offsetNewest := int64(2345) + offsetNewestAfterFetchRequest := int64(3456) + mockFetchResponse := NewMockFetchResponse(t, 1) - for i := 0; i < 10; i++ { - mockFetchResponse.SetMessage("my_topic", 0, int64(i+1234), testMsg) + + // skipped because parseRecords(): offset < child.offset + mockFetchResponse.SetMessage("my_topic", 0, manualOffset-1, testMsg) + + for i := int64(0); i < 10; i++ { + mockFetchResponse.SetMessage("my_topic", 0, i+manualOffset, testMsg) } + mockFetchResponse.SetHighWaterMark("my_topic", 0, offsetNewestAfterFetchRequest) + broker0.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetBroker(broker0.Addr(), broker0.BrokerID()). SetLeader("my_topic", 0, broker0.BrokerID()), "OffsetRequest": NewMockOffsetResponse(t). SetOffset("my_topic", 0, OffsetOldest, 0). - SetOffset("my_topic", 0, OffsetNewest, 2345), + SetOffset("my_topic", 0, OffsetNewest, offsetNewest), "FetchRequest": mockFetchResponse, }) @@ -41,44 +51,50 @@ func TestConsumerOffsetManual(t *testing.T) { t.Fatal(err) } - consumer, err := master.ConsumePartition("my_topic", 0, 1234) + consumer, err := master.ConsumePartition("my_topic", 0, manualOffset) if err != nil { t.Fatal(err) } - // Then: messages starting from offset 1234 are consumed. - for i := 0; i < 10; i++ { + // Then + for i := int64(0); i < 10; i++ { select { case message := <-consumer.Messages(): - assertMessageOffset(t, message, int64(i+1234)) + assertMessageOffset(t, message, i+manualOffset) case err := <-consumer.Errors(): t.Error(err) } } + if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewestAfterFetchRequest { + t.Errorf("Expected high water mark offset %d, found %d", offsetNewestAfterFetchRequest, hwmo) + } + safeClose(t, consumer) safeClose(t, master) broker0.Close() } // If `OffsetNewest` is passed as the initial offset then the first consumed -// message is indeed corresponds to the offset that broker claims to be the +// message indeed corresponds to the offset that broker claims to be the // newest in its metadata response. func TestConsumerOffsetNewest(t *testing.T) { // Given + offsetNewest := int64(10) + offsetNewestAfterFetchRequest := int64(50) broker0 := NewMockBroker(t, 0) broker0.SetHandlerByMap(map[string]MockResponse{ "MetadataRequest": NewMockMetadataResponse(t). SetBroker(broker0.Addr(), broker0.BrokerID()). SetLeader("my_topic", 0, broker0.BrokerID()), "OffsetRequest": NewMockOffsetResponse(t). - SetOffset("my_topic", 0, OffsetNewest, 10). + SetOffset("my_topic", 0, OffsetNewest, offsetNewest). SetOffset("my_topic", 0, OffsetOldest, 7), "FetchRequest": NewMockFetchResponse(t, 1). - SetMessage("my_topic", 0, 9, testMsg). + SetMessage("my_topic", 0, 9, testMsg). // skipped because parseRecords(): offset < child.offset SetMessage("my_topic", 0, 10, testMsg). SetMessage("my_topic", 0, 11, testMsg). - SetHighWaterMark("my_topic", 0, 14), + SetHighWaterMark("my_topic", 0, offsetNewestAfterFetchRequest), }) master, err := NewConsumer([]string{broker0.Addr()}, NewTestConfig()) @@ -94,8 +110,53 @@ func TestConsumerOffsetNewest(t *testing.T) { // Then assertMessageOffset(t, <-consumer.Messages(), 10) - if hwmo := consumer.HighWaterMarkOffset(); hwmo != 14 { - t.Errorf("Expected high water mark offset 14, found %d", hwmo) + if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewestAfterFetchRequest { + t.Errorf("Expected high water mark offset %d, found %d", offsetNewestAfterFetchRequest, hwmo) + } + + safeClose(t, consumer) + safeClose(t, master) + broker0.Close() +} + +// If `OffsetOldest` is passed as the initial offset then the first consumed +// message is indeed the first available in the partition. +func TestConsumerOffsetOldest(t *testing.T) { + // Given + offsetNewest := int64(10) + broker0 := NewMockBroker(t, 0) + broker0.SetHandlerByMap(map[string]MockResponse{ + "MetadataRequest": NewMockMetadataResponse(t). + SetBroker(broker0.Addr(), broker0.BrokerID()). + SetLeader("my_topic", 0, broker0.BrokerID()), + "OffsetRequest": NewMockOffsetResponse(t). + SetOffset("my_topic", 0, OffsetNewest, offsetNewest). + SetOffset("my_topic", 0, OffsetOldest, 7), + "FetchRequest": NewMockFetchResponse(t, 1). + // skipped because parseRecords(): offset < child.offset + SetMessage("my_topic", 0, 6, testMsg). + // these will get to the Messages() channel + SetMessage("my_topic", 0, 7, testMsg). + SetMessage("my_topic", 0, 8, testMsg). + SetMessage("my_topic", 0, 9, testMsg). + SetHighWaterMark("my_topic", 0, offsetNewest), + }) + + master, err := NewConsumer([]string{broker0.Addr()}, NewTestConfig()) + if err != nil { + t.Fatal(err) + } + + // When + consumer, err := master.ConsumePartition("my_topic", 0, OffsetOldest) + if err != nil { + t.Fatal(err) + } + + // Then + assertMessageOffset(t, <-consumer.Messages(), int64(7)) + if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewest { + t.Errorf("Expected high water mark offset %d, found %d", offsetNewest, hwmo) } safeClose(t, consumer) From 1c4079e7e3fcd97a677a83c14d50ef9f80cc4292 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ch=C3=A1bek?= Date: Sun, 5 Dec 2021 12:07:33 +0100 Subject: [PATCH 2/3] Add test for partitionConsumer HWMO before consumption This will fail now, because HWMO is not set correctly --- consumer_test.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/consumer_test.go b/consumer_test.go index 8d8c33b8c..0f752c4e0 100644 --- a/consumer_test.go +++ b/consumer_test.go @@ -57,6 +57,9 @@ func TestConsumerOffsetManual(t *testing.T) { } // Then + if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewest { + t.Errorf("Expected high water mark offset %d, found %d", offsetNewest, hwmo) + } for i := int64(0); i < 10; i++ { select { case message := <-consumer.Messages(): @@ -109,6 +112,9 @@ func TestConsumerOffsetNewest(t *testing.T) { } // Then + if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewest { + t.Errorf("Expected high water mark offset %d, found %d", offsetNewest, hwmo) + } assertMessageOffset(t, <-consumer.Messages(), 10) if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewestAfterFetchRequest { t.Errorf("Expected high water mark offset %d, found %d", offsetNewestAfterFetchRequest, hwmo) @@ -154,6 +160,9 @@ func TestConsumerOffsetOldest(t *testing.T) { } // Then + if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewest { + t.Errorf("Expected high water mark offset %d, found %d", offsetNewest, hwmo) + } assertMessageOffset(t, <-consumer.Messages(), int64(7)) if hwmo := consumer.HighWaterMarkOffset(); hwmo != offsetNewest { t.Errorf("Expected high water mark offset %d, found %d", offsetNewest, hwmo) From 179dc9d28899a06b7afab22781a3ed3d1a7c1a10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jakub=20Ch=C3=A1bek?= Date: Sun, 5 Dec 2021 12:14:17 +0100 Subject: [PATCH 3/3] Set HWMO during creation of partitionConsumer --- consumer.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/consumer.go b/consumer.go index 1cb910deb..87a135072 100644 --- a/consumer.go +++ b/consumer.go @@ -400,6 +400,9 @@ func (child *partitionConsumer) chooseStartingOffset(offset int64) error { if err != nil { return err } + + child.highWaterMarkOffset = newestOffset + oldestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetOldest) if err != nil { return err