Skip to content

Commit

Permalink
Fix race condition by making testRetryWriter type thread safe
Browse files Browse the repository at this point in the history
  • Loading branch information
Evanjt1 committed Feb 11, 2020
1 parent 407ea25 commit 9e8274a
Showing 1 changed file with 31 additions and 18 deletions.
49 changes: 31 additions & 18 deletions writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"io"
"math"
"sync"
"testing"
"time"
)
Expand Down Expand Up @@ -664,38 +665,52 @@ func testWriterSmallBatchBytes(t *testing.T) {
}

type testRetryWriter struct {
errs int
ch chan writerMessage
join sync.WaitGroup
}

func (w *testRetryWriter) messages() chan<- writerMessage {
ch := make(chan writerMessage, 1)
return w.ch
}

go func() {
for {
msg := <-ch
if w.errs > 0 {
func (w *testRetryWriter) close() {
close(w.ch)
w.join.Wait()
}

func (w *testRetryWriter) run(errs int) {
w.join.Add(1)
defer w.join.Done()

var done bool
for !done {
msg, ok := <-w.ch
if !ok {
done = true
} else {
if errs > 0 {
msg.res <- writerResponse{
id: msg.id,
err: &WriterError{
Msg: msg.msg,
Err: errors.New("bad attempt"),
},
}
w.errs -= 1
errs -= 1
} else {
msg.res <- writerResponse{
id: msg.id,
err: nil,
}
}
}
}()

return ch
}
}

func (w *testRetryWriter) close() {

func newTestRetryWriter(_ int, _ WriterConfig, _ *writerStats) partitionWriter {
w := &testRetryWriter{ch: make(chan writerMessage, 1)}
go w.run(2)
return w
}

func testWriterRetries(t *testing.T) {
Expand Down Expand Up @@ -737,12 +752,10 @@ func testWriterRetries(t *testing.T) {

for i, tc := range tcs {
w := newTestWriter(WriterConfig{
Topic: topic,
MaxAttempts: 2,
Balancer: &RoundRobin{},
newPartitionWriter: func(_ int, _ WriterConfig, _ *writerStats) partitionWriter {
return &testRetryWriter{errs: 2}
},
Topic: topic,
MaxAttempts: 2,
Balancer: &RoundRobin{},
newPartitionWriter: newTestRetryWriter,
})

err := w.WriteMessages(context.Background(), tc.msgs()...)
Expand Down

0 comments on commit 9e8274a

Please sign in to comment.