Skip to content

Commit

Permalink
Fix bug #179
Browse files Browse the repository at this point in the history
Also reported at #199 (comment)

Go's closure semantics are really annoying - simply stop spawning goroutines
with the wrong arguments.

Add a test (heavily based on https://gist.github.com/ORBAT/d0adcd790dff34b37b04)
to ensure this behaviour doesn't regress.

Huge thanks to Tom Eklöf for getting me all the logs etc. needed to track this
down.
  • Loading branch information
eapache committed Nov 19, 2014
1 parent 1618fc4 commit 38d4c47
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 2 deletions.
42 changes: 42 additions & 0 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"net"
"os"
"sync"
"testing"
"time"
)
Expand Down Expand Up @@ -72,6 +73,47 @@ func TestFuncProducingFlushing(t *testing.T) {
testProducingMessages(t, config)
}

func TestFuncMultiPartitionProduce(t *testing.T) {
checkKafkaAvailability(t)
client, err := NewClient("functional_test", []string{kafkaAddr}, nil)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, client)

config := NewProducerConfig()
config.FlushFrequency = 50 * time.Millisecond
config.FlushMsgCount = 200
config.ChannelBufferSize = 20
config.AckSuccesses = true
producer, err := NewProducer(client, config)
if err != nil {
t.Fatal(err)
}

var wg sync.WaitGroup
wg.Add(TestBatchSize)

for i := 1; i <= TestBatchSize; i++ {

go func(i int, w *sync.WaitGroup) {
defer w.Done()
msg := &MessageToSend{Topic: "multi_partition", Key: nil, Value: StringEncoder(fmt.Sprintf("hur %d", i))}
producer.Input() <- msg
select {
case ret := <-producer.Errors():
t.Fatal(ret.Err)
case <-producer.Successes():
}
}(i, &wg)
}

wg.Wait()
if err := producer.Close(); err != nil {
t.Error(err)
}
}

func testProducingMessages(t *testing.T, config *ProducerConfig) {
checkKafkaAvailability(t)

Expand Down
7 changes: 5 additions & 2 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,8 @@ func (p *Producer) topicDispatcher() {
if handler == nil {
p.retries <- &MessageToSend{flags: ref}
newHandler := make(chan *MessageToSend, p.config.ChannelBufferSize)
go withRecover(func() { p.partitionDispatcher(msg.Topic, newHandler) })
topic := msg.Topic // block local because go's closure semantics suck
go withRecover(func() { p.partitionDispatcher(topic, newHandler) })
handler = newHandler
handlers[msg.Topic] = handler
}
Expand Down Expand Up @@ -303,7 +304,9 @@ func (p *Producer) partitionDispatcher(topic string, input chan *MessageToSend)
if handler == nil {
p.retries <- &MessageToSend{flags: ref}
newHandler := make(chan *MessageToSend, p.config.ChannelBufferSize)
go withRecover(func() { p.leaderDispatcher(msg.Topic, msg.partition, newHandler) })
topic := msg.Topic // block local because go's closure semantics suck
partition := msg.partition // block local because go's closure semantics suck
go withRecover(func() { p.leaderDispatcher(topic, partition, newHandler) })
handler = newHandler
handlers[msg.partition] = handler
}
Expand Down

0 comments on commit 38d4c47

Please sign in to comment.