Skip to content

Commit

Permalink
Not omit error in logger
Browse files Browse the repository at this point in the history
Add explicit errors as arguments so anyone can handle them in the logger implementation.
  • Loading branch information
itechdima committed May 4, 2023
1 parent 2304d4a commit 914d863
Showing 1 changed file with 5 additions and 5 deletions.
10 changes: 5 additions & 5 deletions reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func (r *Reader) commitLoopInterval(ctx context.Context, gen *Generation) {

commit := func() {
if err := r.commitOffsetsWithRetry(gen, offsets, defaultCommitRetries); err != nil {
r.withErrorLogger(func(l Logger) { l.Printf(err.Error()) })
r.withErrorLogger(func(l Logger) { l.Printf("%v", err) })
} else {
offsets.reset()
}
Expand Down Expand Up @@ -311,7 +311,7 @@ func (r *Reader) run(cg *ConsumerGroup) {
}
r.stats.errors.observe(1)
r.withErrorLogger(func(l Logger) {
l.Printf(err.Error())
l.Printf("%v", err)
})
// Continue with next attempt...
}
Expand Down Expand Up @@ -1346,7 +1346,7 @@ func (r *reader) run(ctx context.Context, offset int64) {

case errors.Is(err, UnknownTopicOrPartition):
r.withErrorLogger(func(log Logger) {
log.Printf("failed to read from current broker for partition %d of %s at offset %d, topic or parition not found on this broker, %v", r.partition, r.topic, toHumanOffset(offset), r.brokers)
log.Printf("failed to read from current broker %v for partition %d of %s at offset %d: %v", r.brokers, r.partition, r.topic, toHumanOffset(offset), err)
})

conn.Close()
Expand All @@ -1358,7 +1358,7 @@ func (r *reader) run(ctx context.Context, offset int64) {

case errors.Is(err, NotLeaderForPartition):
r.withErrorLogger(func(log Logger) {
log.Printf("failed to read from current broker for partition %d of %s at offset %d, not the leader", r.partition, r.topic, toHumanOffset(offset))
log.Printf("failed to read from current broker for partition %d of %s at offset %d: %v", r.partition, r.topic, toHumanOffset(offset), err)
})

conn.Close()
Expand All @@ -1372,7 +1372,7 @@ func (r *reader) run(ctx context.Context, offset int64) {
// Timeout on the kafka side, this can be safely retried.
errcount = 0
r.withLogger(func(log Logger) {
log.Printf("no messages received from kafka within the allocated time for partition %d of %s at offset %d", r.partition, r.topic, toHumanOffset(offset))
log.Printf("no messages received from kafka within the allocated time for partition %d of %s at offset %d: %v", r.partition, r.topic, toHumanOffset(offset), err)
})
r.stats.timeouts.observe(1)
continue
Expand Down

0 comments on commit 914d863

Please sign in to comment.