Skip to content

Commit

Permalink
fix(test): use Sarama transactional producer
Browse files Browse the repository at this point in the history
One of the pieces of #1695 was to use a small jar for publishing
uncommited messages as part of the functional test. Replacing that with
the native Sarama transactional producer

Fixes #1733

Co-authored-by: KJTsanaktsidis <[email protected]>
  • Loading branch information
dnwe and KJTsanaktsidis committed Sep 13, 2021
1 parent 9e06d75 commit e44f998
Show file tree
Hide file tree
Showing 2 changed files with 124 additions and 42 deletions.
123 changes: 122 additions & 1 deletion functional_consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,20 +138,141 @@ func TestVersionMatrixIdempotent(t *testing.T) {
}

func TestReadOnlyAndAllCommittedMessages(t *testing.T) {
prevLogger := Logger
defer func() { Logger = prevLogger }()
Logger = &testLogger{t}

checkKafkaVersion(t, "0.11.0")
setupFunctionalTest(t)
defer teardownFunctionalTest(t)

config := NewTestConfig()
config.ClientID = t.Name()
config.Net.MaxOpenRequests = 1
config.Consumer.IsolationLevel = ReadCommitted
config.Producer.Idempotent = true
config.Producer.Return.Successes = true
config.Producer.RequiredAcks = WaitForAll
config.Version = V0_11_0_0

client, err := NewClient(FunctionalTestEnv.KafkaBrokerAddrs, config)
if err != nil {
t.Fatal(err)
}
defer client.Close()
controller, err := client.Controller()
if err != nil {
t.Fatal(err)
}
defer controller.Close()

transactionalID := "transactional.id config"

var coordinator *Broker

// wait until transactional topic is available and then connect to the
// coordinator broker
for coordinator == nil {
if _, err := client.Leader("__transaction_state", 0); err != nil {
time.Sleep(2 * time.Second)
_ = client.RefreshMetadata("__transaction_state")
continue
}
coordRes, err := controller.FindCoordinator(&FindCoordinatorRequest{
Version: 2,
CoordinatorKey: transactionalID,
CoordinatorType: CoordinatorTransaction,
})
if err != nil {
t.Fatal(err)
}
if coordRes.Err != ErrNoError {
continue
}
if err := coordRes.Coordinator.Open(client.Config()); err != nil {
t.Fatal(err)
}
coordinator = coordRes.Coordinator
break
}

// produce some uncommitted messages to the topic
pidRes, err := coordinator.InitProducerID(&InitProducerIDRequest{
TransactionalID: &transactionalID,
TransactionTimeout: 10 * time.Second,
})
if err != nil {
t.Fatal(err)
}
_, _ = coordinator.AddPartitionsToTxn(&AddPartitionsToTxnRequest{
TransactionalID: transactionalID,
ProducerID: pidRes.ProducerID,
ProducerEpoch: pidRes.ProducerEpoch,
TopicPartitions: map[string][]int32{
uncommittedTopic: {0},
},
})
if err != nil {
t.Fatal(err)
}
ps := &produceSet{
msgs: make(map[string]map[int32]*partitionSet),
parent: &asyncProducer{
conf: config,
},
producerID: pidRes.ProducerID,
producerEpoch: pidRes.ProducerEpoch,
}
_ = ps.add(&ProducerMessage{
Topic: uncommittedTopic,
Partition: 0,
Value: StringEncoder("uncommitted message 1"),
})
_ = ps.add(&ProducerMessage{
Topic: uncommittedTopic,
Partition: 0,
Value: StringEncoder("uncommitted message 2"),
})
produceReq := ps.buildRequest()
produceReq.TransactionalID = &transactionalID
if _, err := coordinator.Produce(produceReq); err != nil {

This comment has been minimized.

Copy link
@td-middleware

td-middleware Nov 3, 2021

produce will fail, for this coordinator is not partition leader broker,

This comment has been minimized.

Copy link
@td-middleware

This comment has been minimized.

Copy link
@td-middleware
t.Fatal(err)
}

// now produce some committed messages to the topic
producer, err := NewAsyncProducer(FunctionalTestEnv.KafkaBrokerAddrs, config)
if err != nil {
t.Fatal(err)
}
defer producer.Close()

for i := 1; i <= 6; i++ {
producer.Input() <- &ProducerMessage{
Topic: uncommittedTopic,
Partition: 0,
Value: StringEncoder(fmt.Sprintf("Committed %v", i)),
}
<-producer.Successes()
}

// now abort the uncommitted transaction
if _, err := coordinator.EndTxn(&EndTxnRequest{
TransactionalID: transactionalID,
ProducerID: pidRes.ProducerID,
ProducerEpoch: pidRes.ProducerEpoch,
TransactionResult: false, // aborted

This comment has been minimized.

Copy link
@td-middleware

td-middleware Nov 3, 2021

message not abort, consumer stall can consume

}); err != nil {
t.Fatal(err)
}


consumer, err := NewConsumer(FunctionalTestEnv.KafkaBrokerAddrs, config)
if err != nil {
t.Fatal(err)
}
defer consumer.Close()

pc, err := consumer.ConsumePartition("uncommitted-topic-test-4", 0, OffsetOldest)
pc, err := consumer.ConsumePartition(uncommittedTopic, 0, OffsetOldest)
require.NoError(t, err)

msgChannel := pc.Messages()
Expand Down
43 changes: 2 additions & 41 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,11 @@ package sarama
import (
"context"
"fmt"
"io"
"log"
"net"
"net/http"
"net/url"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"testing"
Expand All @@ -22,9 +19,7 @@ import (
toxiproxy "github.com/Shopify/toxiproxy/client"
)

const (
uncomittedMsgJar = "https://github.com/FrancoisPoinsot/simplest-uncommitted-msg/releases/download/0.1/simplest-uncommitted-msg-0.1-jar-with-dependencies.jar"
)
const uncommittedTopic = "uncommitted-topic-test-4"

var (
testTopicDetails = map[string]*TopicDetail{
Expand All @@ -40,7 +35,7 @@ var (
NumPartitions: 64,
ReplicationFactor: 3,
},
"uncommitted-topic-test-4": {
uncommittedTopic: {
NumPartitions: 1,
ReplicationFactor: 3,
},
Expand Down Expand Up @@ -341,40 +336,6 @@ func prepareTestTopics(ctx context.Context, env *testEnvironment) error {
}
}

// This is kind of gross, but we don't actually have support for doing transactional publishing
// with sarama, so we need to use a java-based tool to publish uncommitted messages to
// the uncommitted-topic-test-4 topic
jarName := filepath.Base(uncomittedMsgJar)
if _, err := os.Stat(jarName); err != nil {
Logger.Printf("Downloading %s\n", uncomittedMsgJar)
req, err := http.NewRequest("GET", uncomittedMsgJar, nil)
if err != nil {
return fmt.Errorf("failed creating requst for uncommitted msg jar: %w", err)
}
res, err := http.DefaultClient.Do(req)
if err != nil {
return fmt.Errorf("failed fetching the uncommitted msg jar: %w", err)
}
defer res.Body.Close()
jarFile, err := os.OpenFile(jarName, os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0o644)
if err != nil {
return fmt.Errorf("failed opening the uncommitted msg jar: %w", err)
}
defer jarFile.Close()

_, err = io.Copy(jarFile, res.Body)
if err != nil {
return fmt.Errorf("failed writing the uncommitted msg jar: %w", err)
}
}

c := exec.Command("java", "-jar", jarName, "-b", env.KafkaBrokerAddrs[0], "-c", "4")
c.Stdout = os.Stdout
c.Stderr = os.Stderr
err = c.Run()
if err != nil {
return fmt.Errorf("failed running uncommitted msg jar: %w", err)
}
return nil
}

Expand Down

0 comments on commit e44f998

Please sign in to comment.