Skip to content

Commit

Permalink
fix(test): use Sarama transactional producer
Browse files Browse the repository at this point in the history
One of the pieces of #1695 was to use a small jar for publishing
uncommited messages as part of the functional test. Replacing that with
the native Sarama transactional producer

Fixes #1733

Co-authored-by: KJTsanaktsidis <[email protected]>
  • Loading branch information
dnwe and KJTsanaktsidis committed Feb 8, 2022
1 parent a5fdd87 commit 1de9f7e
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 55 deletions.
121 changes: 120 additions & 1 deletion functional_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"math"
"os"
"sort"
"strconv"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -143,20 +144,138 @@ func TestReadOnlyAndAllCommittedMessages(t *testing.T) {
defer teardownFunctionalTest(t)

config := NewTestConfig()
config.ClientID = t.Name()
config.Net.MaxOpenRequests = 1
config.Consumer.IsolationLevel = ReadCommitted
config.Producer.Idempotent = true
config.Producer.Return.Successes = true
config.Producer.RequiredAcks = WaitForAll
config.Version = V0_11_0_0

client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
if err != nil {
t.Fatal(err)
}
defer client.Close()
controller, err := client.Controller()
if err != nil {
t.Fatal(err)
}
defer controller.Close()

transactionalID := strconv.FormatInt(time.Now().UnixNano()/(1<<22), 10)

var coordinator *Broker

// find the transaction coordinator
for {
coordRes, err := controller.FindCoordinator(&FindCoordinatorRequest{
Version: 2,
CoordinatorKey: transactionalID,
CoordinatorType: CoordinatorTransaction,
})
if err != nil {
t.Fatal(err)
}
if coordRes.Err != ErrNoError {
continue
}
if err := coordRes.Coordinator.Open(client.Config()); err != nil {
t.Fatal(err)
}
coordinator = coordRes.Coordinator
break
}

// produce some uncommitted messages to the topic
pidRes, err := coordinator.InitProducerID(&InitProducerIDRequest{
TransactionalID: &transactionalID,
TransactionTimeout: 10 * time.Second,
})
if err != nil {
t.Fatal(err)
}
_, _ = coordinator.AddPartitionsToTxn(&AddPartitionsToTxnRequest{
TransactionalID: transactionalID,
ProducerID: pidRes.ProducerID,
ProducerEpoch: pidRes.ProducerEpoch,
TopicPartitions: map[string][]int32{
uncommittedTopic: {0},
},
})
if err != nil {
t.Fatal(err)
}
ps := &produceSet{
msgs: make(map[string]map[int32]*partitionSet),
parent: &asyncProducer{
conf: config,
},
producerID: pidRes.ProducerID,
producerEpoch: pidRes.ProducerEpoch,
}
_ = ps.add(&ProducerMessage{
Topic: uncommittedTopic,
Partition: 0,
Value: StringEncoder("uncommitted message 1"),
})
_ = ps.add(&ProducerMessage{
Topic: uncommittedTopic,
Partition: 0,
Value: StringEncoder("uncommitted message 2"),
})
produceReq := ps.buildRequest()
produceReq.TransactionalID = &transactionalID
if resp, err := coordinator.Produce(produceReq); err != nil {
t.Fatal(err)
} else {
b := resp.GetBlock(uncommittedTopic, 0)
if b != nil {
t.Logf("uncommitted message 1 to %s-%d at offset %d", uncommittedTopic, 0, b.Offset)
t.Logf("uncommitted message 2 to %s-%d at offset %d", uncommittedTopic, 0, b.Offset+1)
}
}

// now produce some committed messages to the topic
producer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config)
if err != nil {
t.Fatal(err)
}
defer producer.Close()

for i := 1; i <= 6; i++ {
producer.Input() <- &ProducerMessage{
Topic: uncommittedTopic,
Partition: 0,
Value: StringEncoder(fmt.Sprintf("Committed %v", i)),
}
msg := <-producer.Successes()
t.Logf("Committed %v to %s-%d at offset %d", i, msg.Topic, msg.Partition, msg.Offset)
}

// now abort the uncommitted transaction
if _, err := coordinator.EndTxn(&EndTxnRequest{
TransactionalID: transactionalID,
ProducerID: pidRes.ProducerID,
ProducerEpoch: pidRes.ProducerEpoch,
TransactionResult: false, // aborted
}); err != nil {
t.Fatal(err)
}

consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config)
if err != nil {
t.Fatal(err)
}
defer consumer.Close()

pc, err := consumer.ConsumePartition("uncommitted-topic-test-4", 0, OffsetOldest)
pc, err := consumer.ConsumePartition(uncommittedTopic, 0, OffsetOldest)
require.NoError(t, err)

msgChannel := pc.Messages()
for i := 1; i <= 6; i++ {
msg := <-msgChannel
t.Logf("Received %s from %s-%d at offset %d", msg.Value, msg.Topic, msg.Partition, msg.Offset)
require.Equal(t, fmt.Sprintf("Committed %v", i), string(msg.Value))
}
}
Expand Down
100 changes: 46 additions & 54 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,10 @@ package sarama
import (
"context"
"fmt"
"io"
"log"
"net/http"
"net/url"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"testing"
Expand All @@ -21,9 +18,7 @@ import (
toxiproxy "github.com/Shopify/toxiproxy/v2/client"
)

const (
uncomittedMsgJar = "https://github.com/FrancoisPoinsot/simplest-uncommitted-msg/releases/download/0.1/simplest-uncommitted-msg-0.1-jar-with-dependencies.jar"
)
const uncommittedTopic = "uncommitted-topic-test-4"

var (
testTopicDetails = map[string]*TopicDetail{
Expand All @@ -39,7 +34,7 @@ var (
NumPartitions: 64,
ReplicationFactor: 3,
},
"uncommitted-topic-test-4": {
uncommittedTopic: {
NumPartitions: 1,
ReplicationFactor: 3,
},
Expand Down Expand Up @@ -277,7 +272,7 @@ func prepareTestTopics(ctx context.Context, env *testEnvironment) error {
config := NewTestConfig()
config.Metadata.Retry.Max = 5
config.Metadata.Retry.Backoff = 10 * time.Second
config.ClientID = "sarama-tests"
config.ClientID = "sarama-prepareTestTopics"
var err error
config.Version, err = ParseKafkaVersion(env.KafkaVersion)
if err != nil {
Expand Down Expand Up @@ -312,25 +307,29 @@ func prepareTestTopics(ctx context.Context, env *testEnvironment) error {

// wait for the topics to _actually_ be gone - the delete is not guaranteed to be processed
// synchronously
var topicsOk bool
for i := 0; i < 20 && !topicsOk; i++ {
time.Sleep(1 * time.Second)
md, err := controller.GetMetadata(&MetadataRequest{
Topics: testTopicNames,
})
if err != nil {
return fmt.Errorf("failed to get metadata for test topics: %w", err)
}
{
var topicsOk bool
for i := 0; i < 30 && !topicsOk; i++ {
time.Sleep(1 * time.Second)
md, err := controller.GetMetadata(&MetadataRequest{
Topics: testTopicNames,
})
if err != nil {
return fmt.Errorf("failed to get metadata for test topics: %w", err)
}

topicsOk = true
for _, topicsMd := range md.Topics {
if !isTopicNotExistsErrorOrOk(topicsMd.Err) {
topicsOk = false
if len(md.Topics) == len(testTopicNames) {
topicsOk = true
for _, topicsMd := range md.Topics {
if !isTopicNotExistsErrorOrOk(topicsMd.Err) {
topicsOk = false
}
}
}
}
}
if !topicsOk {
return fmt.Errorf("timed out waiting for test topics to be gone")
if !topicsOk {
return fmt.Errorf("timed out waiting for test topics to be gone")
}
}

// now create the topics empty
Expand All @@ -347,40 +346,33 @@ func prepareTestTopics(ctx context.Context, env *testEnvironment) error {
}
}

// This is kind of gross, but we don't actually have support for doing transactional publishing
// with sarama, so we need to use a java-based tool to publish uncommitted messages to
// the uncommitted-topic-test-4 topic
jarName := filepath.Base(uncomittedMsgJar)
if _, err := os.Stat(jarName); err != nil {
Logger.Printf("Downloading %s\n", uncomittedMsgJar)
req, err := http.NewRequest("GET", uncomittedMsgJar, nil)
if err != nil {
return fmt.Errorf("failed creating requst for uncommitted msg jar: %w", err)
}
res, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("failed fetching the uncommitted msg jar: %w", err)
}
defer res.Body.Close()
jarFile, err := os.OpenFile(jarName, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0o644)
if err != nil {
return fmt.Errorf("failed opening the uncommitted msg jar: %w", err)
}
defer jarFile.Close()
// wait for the topics to _actually_ exist - the creates are not guaranteed to be processed
// synchronously
{
var topicsOk bool
for i := 0; i < 30 && !topicsOk; i++ {
time.Sleep(1 * time.Second)
md, err := controller.GetMetadata(&MetadataRequest{
Topics: testTopicNames,
})
if err != nil {
return fmt.Errorf("failed to get metadata for test topics: %w", err)
}

_, err = io.Copy(jarFile, res.Body)
if err != nil {
return fmt.Errorf("failed writing the uncommitted msg jar: %w", err)
if len(md.Topics) == len(testTopicNames) {
topicsOk = true
for _, topicsMd := range md.Topics {
if topicsMd.Err != ErrNoError {
topicsOk = false
}
}
}
}
if !topicsOk {
return fmt.Errorf("timed out waiting for test topics to be created")
}
}

c := exec.Command("java", "-jar", jarName, "-b", env.KafkaBrokerAddrs[0], "-c", "4")
c.Stdout = os.Stdout
c.Stderr = os.Stderr
err = c.Run()
if err != nil {
return fmt.Errorf("failed running uncommitted msg jar: %w", err)
}
return nil
}

Expand Down

0 comments on commit 1de9f7e

Please sign in to comment.