Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add and test against kafka 3.2.0 #2288

Merged
merged 4 commits into from
Jul 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/fvt.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
fail-fast: false
matrix:
go-version: [1.18.x]
kafka-version: [2.8.1, 3.0.1, 3.1.0]
kafka-version: [2.8.1, 3.0.1, 3.1.0, 3.2.0]
env:
DEBUG: true
GOFLAGS: -trimpath
Expand Down
1 change: 1 addition & 0 deletions Dockerfile.kafka
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ ARG KAFKA_MIRROR="https://s3-us-west-2.amazonaws.com/kafka-packages"
RUN mkdir -p "/opt/kafka-2.8.1" && chmod a+rw /opt/kafka-2.8.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.8.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.8.1"
RUN mkdir -p "/opt/kafka-3.0.1" && chmod a+rw /opt/kafka-3.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-3.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-3.0.1"
RUN mkdir -p "/opt/kafka-3.1.0" && chmod a+rw /opt/kafka-3.1.0 && curl -s "$KAFKA_MIRROR/kafka_2.12-3.1.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-3.1.0"
RUN mkdir -p "/opt/kafka-3.2.0" && chmod a+rw /opt/kafka-3.2.0 && curl -s "$KAFKA_MIRROR/kafka_2.12-3.2.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-3.2.0"

COPY entrypoint.sh /

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ test: $(GOBIN)/tparse
|| NO_COLOR=1 $(GOBIN)/tparse -format markdown -file output.json -all >"$${GITHUB_STEP_SUMMARY:-/dev/null}"
.PHONY: test_functional
test_functional: $(GOBIN)/tparse
$(GOTEST) -timeout 12m -tags=functional -json ./... \
$(GOTEST) -timeout 15m -tags=functional -json ./... \
| tee output.json | $(GOBIN)/tparse -follow -all
[ -z "$${GITHUB_STEP_SUMMARY:-}" ] \
|| NO_COLOR=1 $(GOBIN)/tparse -format markdown -file output.json -all >"$${GITHUB_STEP_SUMMARY:-/dev/null}"
140 changes: 99 additions & 41 deletions functional_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import (
"testing"
"time"

"golang.org/x/sync/errgroup"

"github.com/rcrowley/go-metrics"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -83,6 +86,11 @@ func TestConsumerHighWaterMarkOffset(t *testing.T) {
// from this test case. It has a similar version matrix test case below that
// only checks versions from v0.10.0.0 until KAFKA_VERSION.
func TestVersionMatrix(t *testing.T) {
metrics.UseNilMetrics = true // disable Sarama's go-metrics library
t.Cleanup(func() {
metrics.UseNilMetrics = false
})

setupFunctionalTest(t)
defer teardownFunctionalTest(t)

Expand All @@ -99,6 +107,10 @@ func TestVersionMatrix(t *testing.T) {
// Support for LZ4 codec was introduced in v0.10.0.0 so a version matrix to
// test LZ4 should start with v0.10.0.0.
func TestVersionMatrixLZ4(t *testing.T) {
metrics.UseNilMetrics = true // disable Sarama's go-metrics library
t.Cleanup(func() {
metrics.UseNilMetrics = false
})
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

Expand All @@ -115,6 +127,10 @@ func TestVersionMatrixLZ4(t *testing.T) {

// Support for zstd codec was introduced in v2.1.0.0
func TestVersionMatrixZstd(t *testing.T) {
metrics.UseNilMetrics = true // disable Sarama's go-metrics library
t.Cleanup(func() {
metrics.UseNilMetrics = false
})
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

Expand All @@ -129,6 +145,10 @@ func TestVersionMatrixZstd(t *testing.T) {
}

func TestVersionMatrixIdempotent(t *testing.T) {
metrics.UseNilMetrics = true // disable Sarama's go-metrics library
t.Cleanup(func() {
metrics.UseNilMetrics = false
})
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

Expand Down Expand Up @@ -424,8 +444,8 @@ func versionRange(lower KafkaVersion) []KafkaVersion {
upper = MaxVersion
}

versions := make([]KafkaVersion, 0, len(SupportedVersions))
for _, v := range SupportedVersions {
versions := make([]KafkaVersion, 0, len(fvtRangeVersions))
for _, v := range fvtRangeVersions {
if !v.IsAtLeast(lower) {
continue
}
Expand All @@ -439,33 +459,44 @@ func versionRange(lower KafkaVersion) []KafkaVersion {

func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []CompressionCodec, flush int, countPerVerCodec int, idempotent bool) []*ProducerMessage {
var (
wg sync.WaitGroup
producers []SyncProducer
producedMessagesMu sync.Mutex
producedMessages []*ProducerMessage
)
g := errgroup.Group{}
for _, prodVer := range clientVersions {
for _, codec := range codecs {
t.Run("producer-"+prodVer.String()+"-"+codec.String(), func(t *testing.T) {
t.Logf("*** Producing with client version %s codec %s\n", prodVer, codec)
prodCfg := NewTestConfig()
prodCfg.Version = prodVer
prodCfg.Producer.Return.Successes = true
prodCfg.Producer.Return.Errors = true
prodCfg.Producer.Flush.MaxMessages = flush
prodCfg.Producer.Compression = codec
prodCfg.Producer.Idempotent = idempotent
if idempotent {
prodCfg.Producer.RequiredAcks = WaitForAll
prodCfg.Net.MaxOpenRequests = 1
}
prodCfg := NewTestConfig()
prodCfg.ClientID = t.Name() + "-Producer-" + prodVer.String()
if idempotent {
prodCfg.ClientID += "-idempotent"
}
if codec > 0 {
prodCfg.ClientID += "-" + codec.String()
}
prodCfg.Metadata.Full = false
prodCfg.Version = prodVer
prodCfg.Producer.Return.Successes = true
prodCfg.Producer.Return.Errors = true
prodCfg.Producer.Flush.MaxMessages = flush
prodCfg.Producer.Compression = codec
prodCfg.Producer.Idempotent = idempotent
if idempotent {
prodCfg.Producer.RequiredAcks = WaitForAll
prodCfg.Net.MaxOpenRequests = 1
}

p, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, prodCfg)
if err != nil {
t.Errorf("Failed to create producer: version=%s, compression=%s, err=%v", prodVer, codec, err)
return
}
producers = append(producers, p)
p, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, prodCfg)
if err != nil {
t.Fatalf("Failed to create producer: version=%s, compression=%s, err=%v", prodVer, codec, err)
}
producers = append(producers, p)

prodVer := prodVer
codec := codec
g.Go(func() error {
t.Logf("*** Producing with client version %s codec %s\n", prodVer, codec)
var wg sync.WaitGroup
for i := 0; i < countPerVerCodec; i++ {
msg := &ProducerMessage{
Topic: "test.1",
Expand All @@ -483,10 +514,14 @@ func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []Compressi
producedMessagesMu.Unlock()
}()
}
wg.Wait()
return nil
})
}
}
wg.Wait()
if err := g.Wait(); err != nil {
t.Fatal(err)
}

for _, p := range producers {
safeClose(t, p)
Expand All @@ -496,6 +531,7 @@ func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []Compressi
sort.Slice(producedMessages, func(i, j int) bool {
return producedMessages[i].Offset < producedMessages[j].Offset
})
require.NotEmpty(t, producedMessages, "should have produced >0 messages")
t.Logf("*** Total produced %d, firstOffset=%d, lastOffset=%d\n",
len(producedMessages), producedMessages[0].Offset, producedMessages[len(producedMessages)-1].Offset)
return producedMessages
Expand All @@ -504,26 +540,33 @@ func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []Compressi
func consumeMsgs(t *testing.T, clientVersions []KafkaVersion, producedMessages []*ProducerMessage) {
// Consume all produced messages with all client versions supported by the
// cluster.
g := errgroup.Group{}
for _, consVer := range clientVersions {
t.Run("consumer-"+consVer.String(), func(t *testing.T) {
t.Logf("*** Consuming with client version %s\n", consVer)
// Create a partition consumer that should start from the first produced
// message.
consCfg := NewTestConfig()
consCfg.Version = consVer
c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, consCfg)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, c)
pc, err := c.ConsumePartition("test.1", 0, producedMessages[0].Offset)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, pc)
// Create a partition consumer that should start from the first produced
// message.
consCfg := NewTestConfig()
consCfg.ClientID = t.Name() + "-Consumer-" + consVer.String()
consCfg.Consumer.MaxProcessingTime = time.Second
consCfg.Metadata.Full = false
consCfg.Version = consVer
c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, consCfg)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, c)
pc, err := c.ConsumePartition("test.1", 0, producedMessages[0].Offset)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, pc)

var wg sync.WaitGroup
wg.Add(1)
consVer := consVer
g.Go(func() error {
// Consume as many messages as there have been produced and make sure that
// order is preserved.
t.Logf("*** Consuming with client version %s\n", consVer)
for i, prodMsg := range producedMessages {
select {
case consMsg := <-pc.Messages():
Expand All @@ -535,10 +578,25 @@ func consumeMsgs(t *testing.T, clientVersions []KafkaVersion, producedMessages [
t.Fatalf("Consumed unexpected msg: version=%s, index=%d, want=%s, got=%s",
consVer, i, prodMsg2Str(prodMsg), consMsg2Str(consMsg))
}
case <-time.After(3 * time.Second):
t.Fatalf("Timeout waiting for: index=%d, offset=%d, msg=%s", i, prodMsg.Offset, prodMsg.Value)
if i == 0 {
t.Logf("Consumed first msg: version=%s, index=%d, got=%s",
consVer, i, consMsg2Str(consMsg))
wg.Done()
}
if i%1000 == 0 {
t.Logf("Consumed messages: version=%s, index=%d, got=%s",
consVer, i, consMsg2Str(consMsg))
}
case <-time.After(15 * time.Second):
t.Fatalf("Timeout %s waiting for: index=%d, offset=%d, msg=%s",
consCfg.ClientID, i, prodMsg.Offset, prodMsg.Value)
}
}
return nil
})
wg.Wait() // wait for first message to be consumed before starting next consumer
}
if err := g.Wait(); err != nil {
t.Fatal(err)
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/xdg-go/scram v1.1.1
golang.org/x/crypto v0.0.0-20220214200702-86341886e292 // indirect
golang.org/x/net v0.0.0-20220708220712-1185a9018129
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
)

Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
24 changes: 23 additions & 1 deletion utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ var (
V3_0_0_0 = newKafkaVersion(3, 0, 0, 0)
V3_0_1_0 = newKafkaVersion(3, 0, 1, 0)
V3_1_0_0 = newKafkaVersion(3, 1, 0, 0)
V3_2_0_0 = newKafkaVersion(3, 2, 0, 0)

SupportedVersions = []KafkaVersion{
V0_8_2_0,
Expand Down Expand Up @@ -227,10 +228,31 @@ var (
V3_0_0_0,
V3_0_1_0,
V3_1_0_0,
V3_2_0_0,
}
MinVersion = V0_8_2_0
MaxVersion = V3_1_0_0
MaxVersion = V3_2_0_0
DefaultVersion = V1_0_0_0

// reduced set of versions to matrix test
fvtRangeVersions = []KafkaVersion{
V0_8_2_2,
V0_10_2_2,
V1_0_2_0,
V1_1_1_0,
V2_0_1_0,
V2_1_1_0,
V2_2_2_0,
V2_3_1_0,
V2_4_1_0,
V2_5_1_0,
V2_6_2_0,
V2_7_1_0,
V2_8_1_0,
V3_0_1_0,
V3_1_0_0,
V3_2_0_0,
}
)

// ParseKafkaVersion parses and returns kafka version or error from a string
Expand Down