Skip to content

Commit

Permalink
lint
Browse files Browse the repository at this point in the history
  • Loading branch information
mdisibio committed Jan 3, 2025
1 parent 0a6eb3d commit b51df12
Showing 1 changed file with 12 additions and 12 deletions.
24 changes: 12 additions & 12 deletions modules/blockbuilder/blockbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestBlockbuilder_lookbackOnNoCommit(t *testing.T) {
k, address := testkafka.CreateCluster(t, 1, testTopic)

kafkaCommits := atomic.NewInt32(0)
k.ControlKey(kmsg.OffsetCommit, func(r kmsg.Request) (kmsg.Response, error, bool) {
k.ControlKey(kmsg.OffsetCommit, func(kmsg.Request) (kmsg.Response, error, bool) {
kafkaCommits.Inc()
return nil, nil, false
})
Expand Down Expand Up @@ -73,7 +73,7 @@ func TestBlockbuilder_lookbackOnNoCommit(t *testing.T) {
}, time.Minute, time.Second)

// Check committed offset
requireLastCommitEquals(t, ctx, client, testTopic, testConsumerGroup, testPartition, producedRecords[len(producedRecords)-1].Offset+1)
requireLastCommitEquals(t, ctx, client, producedRecords[len(producedRecords)-1].Offset+1)
}

// Starting with a pre-existing commit,
Expand All @@ -86,7 +86,7 @@ func TestBlockbuilder_startWithCommit(t *testing.T) {
k, address := testkafka.CreateCluster(t, 1, testTopic)

kafkaCommits := atomic.NewInt32(0)
k.ControlKey(kmsg.OffsetCommit, func(r kmsg.Request) (kmsg.Response, error, bool) {
k.ControlKey(kmsg.OffsetCommit, func(kmsg.Request) (kmsg.Response, error, bool) {
kafkaCommits.Inc()
return nil, nil, false
})
Expand Down Expand Up @@ -128,7 +128,7 @@ func TestBlockbuilder_startWithCommit(t *testing.T) {
}, time.Minute, time.Second)

// Check committed offset
requireLastCommitEquals(t, ctx, client, testTopic, testConsumerGroup, testPartition, producedRecords[len(producedRecords)-1].Offset+1)
requireLastCommitEquals(t, ctx, client, producedRecords[len(producedRecords)-1].Offset+1)
}

// In case a block flush initially fails, the system retries until it succeeds.
Expand All @@ -139,7 +139,7 @@ func TestBlockbuilder_flushingFails(t *testing.T) {
k, address := testkafka.CreateCluster(t, 1, "test-topic")

kafkaCommits := atomic.NewInt32(0)
k.ControlKey(kmsg.OffsetCommit, func(r kmsg.Request) (kmsg.Response, error, bool) {
k.ControlKey(kmsg.OffsetCommit, func(kmsg.Request) (kmsg.Response, error, bool) {
kafkaCommits.Inc()
return nil, nil, false
})
Expand Down Expand Up @@ -173,7 +173,7 @@ func TestBlockbuilder_flushingFails(t *testing.T) {
}, time.Minute, time.Second)

// Check committed offset
requireLastCommitEquals(t, ctx, client, testTopic, testConsumerGroup, testPartition, producedRecords[len(producedRecords)-1].Offset+1)
requireLastCommitEquals(t, ctx, client, producedRecords[len(producedRecords)-1].Offset+1)
}

// Receiving records with older timestamps the block-builder processes them in the current cycle,
Expand All @@ -185,7 +185,7 @@ func TestBlockbuilder_receivesOldRecords(t *testing.T) {
k, address := testkafka.CreateCluster(t, 1, "test-topic")

kafkaCommits := atomic.NewInt32(0)
k.ControlKey(kmsg.OffsetCommit, func(r kmsg.Request) (kmsg.Response, error, bool) {
k.ControlKey(kmsg.OffsetCommit, func(kmsg.Request) (kmsg.Response, error, bool) {
kafkaCommits.Inc()
return nil, nil, false
})
Expand Down Expand Up @@ -231,7 +231,7 @@ func TestBlockbuilder_receivesOldRecords(t *testing.T) {
}, time.Minute, time.Second)

// Check committed offset
requireLastCommitEquals(t, ctx, client, testTopic, testConsumerGroup, testPartition, producedRecords[len(producedRecords)-1].Offset+1)
requireLastCommitEquals(t, ctx, client, producedRecords[len(producedRecords)-1].Offset+1)
}

// FIXME - Test is unstable and will fail if records cross two consumption cycles,
Expand Down Expand Up @@ -299,7 +299,7 @@ func TestBlockbuilder_committingFails(t *testing.T) {
}, time.Minute, time.Second)

// Check committed offset
requireLastCommitEquals(t, ctx, client, testTopic, testConsumerGroup, testPartition, producedRecords[len(producedRecords)-1].Offset+1)
requireLastCommitEquals(t, ctx, client, producedRecords[len(producedRecords)-1].Offset+1)
}

func TestCycleEndAtStartup(t *testing.T) {
Expand Down Expand Up @@ -548,10 +548,10 @@ func generateTraceID(t *testing.T) []byte {
}

// nolint: revive
func requireLastCommitEquals(t testing.TB, ctx context.Context, client *kgo.Client, topic, consumerGroup string, partition int32, expectedOffset int64) {
offsets, err := kadm.NewClient(client).FetchOffsetsForTopics(ctx, consumerGroup, topic)
func requireLastCommitEquals(t testing.TB, ctx context.Context, client *kgo.Client, expectedOffset int64) {
offsets, err := kadm.NewClient(client).FetchOffsetsForTopics(ctx, testConsumerGroup, testTopic)
require.NoError(t, err)
offset, ok := offsets.Lookup(topic, partition)
offset, ok := offsets.Lookup(testTopic, testPartition)
require.True(t, ok)
require.Equal(t, expectedOffset, offset.At)
}

0 comments on commit b51df12

Please sign in to comment.