Skip to content

Commit

Permalink
Merge pull request #2288 from Shopify/dnwe/kafka
Browse files Browse the repository at this point in the history
chore: add and test against kafka 3.2.0
  • Loading branch information
dnwe authored Jul 22, 2022
2 parents e1fbb94 + d854c83 commit 739ace7
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 44 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/fvt.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ jobs:
fail-fast: false
matrix:
go-version: [1.18.x]
kafka-version: [2.8.1, 3.0.1, 3.1.0]
kafka-version: [2.8.1, 3.0.1, 3.1.0, 3.2.0]
env:
DEBUG: true
GOFLAGS: -trimpath
Expand Down
1 change: 1 addition & 0 deletions Dockerfile.kafka
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ ARG KAFKA_MIRROR="https://s3-us-west-2.amazonaws.com/kafka-packages"
RUN mkdir -p "/opt/kafka-2.8.1" && chmod a+rw /opt/kafka-2.8.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-2.8.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-2.8.1"
RUN mkdir -p "/opt/kafka-3.0.1" && chmod a+rw /opt/kafka-3.0.1 && curl -s "$KAFKA_MIRROR/kafka_2.12-3.0.1.tgz" | tar xz --strip-components=1 -C "/opt/kafka-3.0.1"
RUN mkdir -p "/opt/kafka-3.1.0" && chmod a+rw /opt/kafka-3.1.0 && curl -s "$KAFKA_MIRROR/kafka_2.12-3.1.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-3.1.0"
RUN mkdir -p "/opt/kafka-3.2.0" && chmod a+rw /opt/kafka-3.2.0 && curl -s "$KAFKA_MIRROR/kafka_2.12-3.2.0.tgz" | tar xz --strip-components=1 -C "/opt/kafka-3.2.0"

COPY entrypoint.sh /

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ test: $(GOBIN)/tparse
|| NO_COLOR=1 $(GOBIN)/tparse -format markdown -file output.json -all >"$${GITHUB_STEP_SUMMARY:-/dev/null}"
.PHONY: test_functional
test_functional: $(GOBIN)/tparse
$(GOTEST) -timeout 12m -tags=functional -json ./... \
$(GOTEST) -timeout 15m -tags=functional -json ./... \
| tee output.json | $(GOBIN)/tparse -follow -all
[ -z "$${GITHUB_STEP_SUMMARY:-}" ] \
|| NO_COLOR=1 $(GOBIN)/tparse -format markdown -file output.json -all >"$${GITHUB_STEP_SUMMARY:-/dev/null}"
140 changes: 99 additions & 41 deletions functional_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ import (
"testing"
"time"

"golang.org/x/sync/errgroup"

"github.com/rcrowley/go-metrics"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -83,6 +86,11 @@ func TestConsumerHighWaterMarkOffset(t *testing.T) {
// from this test case. It has a similar version matrix test case below that
// only checks versions from v0.10.0.0 until KAFKA_VERSION.
func TestVersionMatrix(t *testing.T) {
metrics.UseNilMetrics = true // disable Sarama's go-metrics library
t.Cleanup(func() {
metrics.UseNilMetrics = false
})

setupFunctionalTest(t)
defer teardownFunctionalTest(t)

Expand All @@ -99,6 +107,10 @@ func TestVersionMatrix(t *testing.T) {
// Support for LZ4 codec was introduced in v0.10.0.0 so a version matrix to
// test LZ4 should start with v0.10.0.0.
func TestVersionMatrixLZ4(t *testing.T) {
metrics.UseNilMetrics = true // disable Sarama's go-metrics library
t.Cleanup(func() {
metrics.UseNilMetrics = false
})
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

Expand All @@ -115,6 +127,10 @@ func TestVersionMatrixLZ4(t *testing.T) {

// Support for zstd codec was introduced in v2.1.0.0
func TestVersionMatrixZstd(t *testing.T) {
metrics.UseNilMetrics = true // disable Sarama's go-metrics library
t.Cleanup(func() {
metrics.UseNilMetrics = false
})
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

Expand All @@ -129,6 +145,10 @@ func TestVersionMatrixZstd(t *testing.T) {
}

func TestVersionMatrixIdempotent(t *testing.T) {
metrics.UseNilMetrics = true // disable Sarama's go-metrics library
t.Cleanup(func() {
metrics.UseNilMetrics = false
})
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

Expand Down Expand Up @@ -424,8 +444,8 @@ func versionRange(lower KafkaVersion) []KafkaVersion {
upper = MaxVersion
}

versions := make([]KafkaVersion, 0, len(SupportedVersions))
for _, v := range SupportedVersions {
versions := make([]KafkaVersion, 0, len(fvtRangeVersions))
for _, v := range fvtRangeVersions {
if !v.IsAtLeast(lower) {
continue
}
Expand All @@ -439,33 +459,44 @@ func versionRange(lower KafkaVersion) []KafkaVersion {

func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []CompressionCodec, flush int, countPerVerCodec int, idempotent bool) []*ProducerMessage {
var (
wg sync.WaitGroup
producers []SyncProducer
producedMessagesMu sync.Mutex
producedMessages []*ProducerMessage
)
g := errgroup.Group{}
for _, prodVer := range clientVersions {
for _, codec := range codecs {
t.Run("producer-"+prodVer.String()+"-"+codec.String(), func(t *testing.T) {
t.Logf("*** Producing with client version %s codec %s\n", prodVer, codec)
prodCfg := NewTestConfig()
prodCfg.Version = prodVer
prodCfg.Producer.Return.Successes = true
prodCfg.Producer.Return.Errors = true
prodCfg.Producer.Flush.MaxMessages = flush
prodCfg.Producer.Compression = codec
prodCfg.Producer.Idempotent = idempotent
if idempotent {
prodCfg.Producer.RequiredAcks = WaitForAll
prodCfg.Net.MaxOpenRequests = 1
}
prodCfg := NewTestConfig()
prodCfg.ClientID = t.Name() + "-Producer-" + prodVer.String()
if idempotent {
prodCfg.ClientID += "-idempotent"
}
if codec > 0 {
prodCfg.ClientID += "-" + codec.String()
}
prodCfg.Metadata.Full = false
prodCfg.Version = prodVer
prodCfg.Producer.Return.Successes = true
prodCfg.Producer.Return.Errors = true
prodCfg.Producer.Flush.MaxMessages = flush
prodCfg.Producer.Compression = codec
prodCfg.Producer.Idempotent = idempotent
if idempotent {
prodCfg.Producer.RequiredAcks = WaitForAll
prodCfg.Net.MaxOpenRequests = 1
}

p, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, prodCfg)
if err != nil {
t.Errorf("Failed to create producer: version=%s, compression=%s, err=%v", prodVer, codec, err)
return
}
producers = append(producers, p)
p, err := NewSyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, prodCfg)
if err != nil {
t.Fatalf("Failed to create producer: version=%s, compression=%s, err=%v", prodVer, codec, err)
}
producers = append(producers, p)

prodVer := prodVer
codec := codec
g.Go(func() error {
t.Logf("*** Producing with client version %s codec %s\n", prodVer, codec)
var wg sync.WaitGroup
for i := 0; i < countPerVerCodec; i++ {
msg := &ProducerMessage{
Topic: "test.1",
Expand All @@ -483,10 +514,14 @@ func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []Compressi
producedMessagesMu.Unlock()
}()
}
wg.Wait()
return nil
})
}
}
wg.Wait()
if err := g.Wait(); err != nil {
t.Fatal(err)
}

for _, p := range producers {
safeClose(t, p)
Expand All @@ -496,6 +531,7 @@ func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []Compressi
sort.Slice(producedMessages, func(i, j int) bool {
return producedMessages[i].Offset < producedMessages[j].Offset
})
require.NotEmpty(t, producedMessages, "should have produced >0 messages")
t.Logf("*** Total produced %d, firstOffset=%d, lastOffset=%d\n",
len(producedMessages), producedMessages[0].Offset, producedMessages[len(producedMessages)-1].Offset)
return producedMessages
Expand All @@ -504,26 +540,33 @@ func produceMsgs(t *testing.T, clientVersions []KafkaVersion, codecs []Compressi
func consumeMsgs(t *testing.T, clientVersions []KafkaVersion, producedMessages []*ProducerMessage) {
// Consume all produced messages with all client versions supported by the
// cluster.
g := errgroup.Group{}
for _, consVer := range clientVersions {
t.Run("consumer-"+consVer.String(), func(t *testing.T) {
t.Logf("*** Consuming with client version %s\n", consVer)
// Create a partition consumer that should start from the first produced
// message.
consCfg := NewTestConfig()
consCfg.Version = consVer
c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, consCfg)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, c)
pc, err := c.ConsumePartition("test.1", 0, producedMessages[0].Offset)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, pc)
// Create a partition consumer that should start from the first produced
// message.
consCfg := NewTestConfig()
consCfg.ClientID = t.Name() + "-Consumer-" + consVer.String()
consCfg.Consumer.MaxProcessingTime = time.Second
consCfg.Metadata.Full = false
consCfg.Version = consVer
c, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, consCfg)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, c)
pc, err := c.ConsumePartition("test.1", 0, producedMessages[0].Offset)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, pc)

var wg sync.WaitGroup
wg.Add(1)
consVer := consVer
g.Go(func() error {
// Consume as many messages as there have been produced and make sure that
// order is preserved.
t.Logf("*** Consuming with client version %s\n", consVer)
for i, prodMsg := range producedMessages {
select {
case consMsg := <-pc.Messages():
Expand All @@ -535,10 +578,25 @@ func consumeMsgs(t *testing.T, clientVersions []KafkaVersion, producedMessages [
t.Fatalf("Consumed unexpected msg: version=%s, index=%d, want=%s, got=%s",
consVer, i, prodMsg2Str(prodMsg), consMsg2Str(consMsg))
}
case <-time.After(3 * time.Second):
t.Fatalf("Timeout waiting for: index=%d, offset=%d, msg=%s", i, prodMsg.Offset, prodMsg.Value)
if i == 0 {
t.Logf("Consumed first msg: version=%s, index=%d, got=%s",
consVer, i, consMsg2Str(consMsg))
wg.Done()
}
if i%1000 == 0 {
t.Logf("Consumed messages: version=%s, index=%d, got=%s",
consVer, i, consMsg2Str(consMsg))
}
case <-time.After(15 * time.Second):
t.Fatalf("Timeout %s waiting for: index=%d, offset=%d, msg=%s",
consCfg.ClientID, i, prodMsg.Offset, prodMsg.Value)
}
}
return nil
})
wg.Wait() // wait for first message to be consumed before starting next consumer
}
if err := g.Wait(); err != nil {
t.Fatal(err)
}
}
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ require (
github.com/xdg-go/scram v1.1.1
golang.org/x/crypto v0.0.0-20220214200702-86341886e292 // indirect
golang.org/x/net v0.0.0-20220708220712-1185a9018129
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
)

Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,7 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a h1:DcqTD9SDLc+1P/r1EmRBwnVsrOwW+kk2vWf9n+1sGhs=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
24 changes: 23 additions & 1 deletion utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ var (
V3_0_0_0 = newKafkaVersion(3, 0, 0, 0)
V3_0_1_0 = newKafkaVersion(3, 0, 1, 0)
V3_1_0_0 = newKafkaVersion(3, 1, 0, 0)
V3_2_0_0 = newKafkaVersion(3, 2, 0, 0)

SupportedVersions = []KafkaVersion{
V0_8_2_0,
Expand Down Expand Up @@ -227,10 +228,31 @@ var (
V3_0_0_0,
V3_0_1_0,
V3_1_0_0,
V3_2_0_0,
}
MinVersion = V0_8_2_0
MaxVersion = V3_1_0_0
MaxVersion = V3_2_0_0
DefaultVersion = V1_0_0_0

// reduced set of versions to matrix test
fvtRangeVersions = []KafkaVersion{
V0_8_2_2,
V0_10_2_2,
V1_0_2_0,
V1_1_1_0,
V2_0_1_0,
V2_1_1_0,
V2_2_2_0,
V2_3_1_0,
V2_4_1_0,
V2_5_1_0,
V2_6_2_0,
V2_7_1_0,
V2_8_1_0,
V3_0_1_0,
V3_1_0_0,
V3_2_0_0,
}
)

// ParseKafkaVersion parses and returns kafka version or error from a string
Expand Down

0 comments on commit 739ace7

Please sign in to comment.