diff --git a/reader.go b/reader.go index 13a336e51..1acb676e9 100644 --- a/reader.go +++ b/reader.go @@ -238,7 +238,7 @@ func (r *Reader) commitLoopInterval(ctx context.Context, gen *Generation) { commit := func() { if err := r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries); err != nil { - r.withErrorLogger(func(l Logger) { l.Printf(err.Error()) }) + r.withErrorLogger(func(l Logger) { l.Printf("%v", err) }) } else { offsets.reset() } @@ -311,7 +311,7 @@ func (r *Reader) run(cg *ConsumerGroup) { } r.stats.errors.observe(1) r.withErrorLogger(func(l Logger) { - l.Printf(err.Error()) + l.Printf("%v", err) }) // Continue with next attempt... } @@ -1346,7 +1346,7 @@ func (r *reader) run(ctx context.Context, offset int64) { case errors.Is(err, UnknownTopicOrPartition): r.withErrorLogger(func(log Logger) { - log.Printf("failed to read from current broker for partition %d of %s at offset %d, topic or parition not found on this broker, %v", r.partition, r.topic, toHumanOffset(offset), r.brokers) + log.Printf("failed to read from current broker %v for partition %d of %s at offset %d: %v", r.brokers, r.partition, r.topic, toHumanOffset(offset), err) }) conn.Close() @@ -1358,7 +1358,7 @@ func (r *reader) run(ctx context.Context, offset int64) { case errors.Is(err, NotLeaderForPartition): r.withErrorLogger(func(log Logger) { - log.Printf("failed to read from current broker for partition %d of %s at offset %d, not the leader", r.partition, r.topic, toHumanOffset(offset)) + log.Printf("failed to read from current broker for partition %d of %s at offset %d: %v", r.partition, r.topic, toHumanOffset(offset), err) }) conn.Close() @@ -1372,7 +1372,7 @@ func (r *reader) run(ctx context.Context, offset int64) { // Timeout on the kafka side, this can be safely retried. errcount = 0 r.withLogger(func(log Logger) { - log.Printf("no messages received from kafka within the allocated time for partition %d of %s at offset %d", r.partition, r.topic, toHumanOffset(offset)) + log.Printf("no messages received from kafka within the allocated time for partition %d of %s at offset %d: %v", r.partition, r.topic, toHumanOffset(offset), err) }) r.stats.timeouts.observe(1) continue