Skip to content

Commit

Permalink
move writer retries so async messages are retried (#382)
Browse files Browse the repository at this point in the history
  • Loading branch information
abuchanan-nr authored Feb 22, 2020
1 parent 55e867e commit bf3be08
Showing 1 changed file with 48 additions and 63 deletions.
111 changes: 48 additions & 63 deletions writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,83 +308,50 @@ func (w *Writer) WriteMessages(ctx context.Context, msgs ...Message) error {
res = make(chan error, len(msgs))
}
t0 := time.Now()
defer w.stats.writeTime.observeDuration(time.Since(t0))

for attempt := 0; attempt < w.config.MaxAttempts; attempt++ {
w.mutex.RLock()
w.mutex.RLock()
closed := w.closed
w.mutex.RUnlock()

if w.closed {
w.mutex.RUnlock()
return io.ErrClosedPipe
}
if closed {
return io.ErrClosedPipe
}

for i, msg := range msgs {
if int(msg.size()) > w.config.BatchBytes {
err := MessageTooLargeError{
Message: msg,
Remaining: msgs[i+1:],
}
w.mutex.RUnlock()
return err
}
select {
case w.msgs <- writerMessage{
msg: msg,
res: res,
}:
case <-ctx.Done():
w.mutex.RUnlock()
return ctx.Err()
for i, msg := range msgs {

if int(msg.size()) > w.config.BatchBytes {
err := MessageTooLargeError{
Message: msg,
Remaining: msgs[i+1:],
}
return err
}

w.mutex.RUnlock()

if w.config.Async {
break
}
wm := writerMessage{msg: msg, res: res}

var retry []Message

for i := 0; i != len(msgs); i++ {
select {
case e := <-res:
if e != nil {
if we, ok := e.(*writerError); ok {
w.stats.retries.observe(1)
retry, err = append(retry, we.msg), we.err
} else {
err = e
}
}
case <-ctx.Done():
return ctx.Err()
}
select {
case w.msgs <- wm:
case <-ctx.Done():
return ctx.Err()
}
}

if msgs = retry; len(msgs) == 0 {
break
}
if w.config.Async {
return nil
}

timer := time.NewTimer(backoff(attempt+1, 100*time.Millisecond, 1*time.Second))
for i := 0; i != len(msgs); i++ {
select {
case <-timer.C:
// Only clear the error (so we retry the loop) if we have more retries, otherwise
// we risk silencing the error.
if attempt < w.config.MaxAttempts-1 {
err = nil
case e := <-res:
if e != nil {
err = e
}
case <-ctx.Done():
err = ctx.Err()
case <-w.done:
err = io.ErrClosedPipe
}
timer.Stop()

if err != nil {
break
return ctx.Err()
}
}
w.stats.writeTime.observeDuration(time.Since(t0))

return err
}

Expand Down Expand Up @@ -571,6 +538,7 @@ type writer struct {
codec CompressionCodec
logger Logger
errorLogger Logger
maxAttempts int
}

func newWriter(partition int, config WriterConfig, stats *writerStats) *writer {
Expand All @@ -590,6 +558,7 @@ func newWriter(partition int, config WriterConfig, stats *writerStats) *writer {
codec: config.CompressionCodec,
logger: config.Logger,
errorLogger: config.ErrorLogger,
maxAttempts: config.MaxAttempts,
}
w.join.Add(1)
go w.run()
Expand Down Expand Up @@ -701,13 +670,15 @@ func (w *writer) run() {
if len(batch) == 0 {
continue
}

var err error
if conn, err = w.write(conn, batch, resch); err != nil {
if conn, err = w.writeWithRetries(conn, batch, resch); err != nil {
if conn != nil {
conn.Close()
conn = nil
}
}

idleConnDeadline = time.Now().Add(w.idleConnTimeout)
for i := range batch {
batch[i] = Message{}
Expand Down Expand Up @@ -737,6 +708,20 @@ func (w *writer) dial() (conn *Conn, err error) {
return
}

func (w *writer) writeWithRetries(conn *Conn, batch []Message, resch [](chan<- error)) (*Conn, error) {
var err error

for attempt := 0; attempt < w.maxAttempts; attempt++ {
conn, err = w.write(conn, batch, resch)
if err == nil {
break
}
w.stats.retries.observe(1)
time.Sleep(backoff(attempt+1, 100*time.Millisecond, 1*time.Second))
}
return conn, err
}

func (w *writer) write(conn *Conn, batch []Message, resch [](chan<- error)) (ret *Conn, err error) {
w.stats.writes.observe(1)
if conn == nil {
Expand Down

0 comments on commit bf3be08

Please sign in to comment.