Skip to content

Commit

Permalink
Cherry pick fix for IBM#179
Browse files Browse the repository at this point in the history
  • Loading branch information
eapache authored and Tom Eklöf committed Nov 19, 2014
1 parent 5fed534 commit e63f0f2
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 2 deletions.
41 changes: 41 additions & 0 deletions functional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,47 @@ func TestFuncProducingFlushing(t *testing.T) {
testProducingMessages(t, config)
}

func TestFuncMultiPartitionProduce(t *testing.T) {
checkKafkaAvailability(t)
client, err := NewClient("functional_test", []string{kafkaAddr}, nil)
if err != nil {
t.Fatal(err)
}
defer safeClose(t, client)

config := NewProducerConfig()
config.FlushFrequency = 50 * time.Millisecond
config.FlushMsgCount = 200
config.ChannelBufferSize = 20
config.AckSuccesses = true
producer, err := NewProducer(client, config)
if err != nil {
t.Fatal(err)
}

var wg sync.WaitGroup
wg.Add(TestBatchSize)

for i := 1; i <= TestBatchSize; i++ {

go func(i int, w *sync.WaitGroup) {
defer w.Done()
msg := &MessageToSend{Topic: "multi_partition", Key: nil, Value: StringEncoder(fmt.Sprintf("hur %d", i))}
producer.Input() <- msg
select {
case ret := <-producer.Errors():
t.Fatal(ret.Err)
case <-producer.Successes():
}
}(i, &wg)
}

wg.Wait()
if err := producer.Close(); err != nil {
t.Error(err)
}
}

func testProducingMessages(t *testing.T, config *ProducerConfig) {
checkKafkaAvailability(t)

Expand Down
7 changes: 5 additions & 2 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,8 @@ func (p *Producer) topicDispatcher() {
if handler == nil {
p.retries <- &MessageToSend{flags: ref}
newHandler := make(chan *MessageToSend, p.config.ChannelBufferSize)
go withRecover(func() { p.partitionDispatcher(msg.Topic, newHandler) })
topic := msg.Topic // block local because go's closure semantics suck
go withRecover(func() { p.partitionDispatcher(topic, newHandler) })
handler = newHandler
handlers[msg.Topic] = handler
}
Expand Down Expand Up @@ -286,7 +287,9 @@ func (p *Producer) partitionDispatcher(topic string, input chan *MessageToSend)
if handler == nil {
p.retries <- &MessageToSend{flags: ref}
newHandler := make(chan *MessageToSend, p.config.ChannelBufferSize)
go withRecover(func() { p.leaderDispatcher(msg.Topic, msg.partition, newHandler) })
topic := msg.Topic // block local because go's closure semantics suck
partition := msg.partition // block local because go's closure semantics suck
go withRecover(func() { p.leaderDispatcher(topic, partition, newHandler) })
handler = newHandler
handlers[msg.partition] = handler
}
Expand Down

0 comments on commit e63f0f2

Please sign in to comment.