Skip to content

Commit

Permalink
prod: explicitly log every state change
Browse files Browse the repository at this point in the history
I worked out the formal state machines of the relevant goroutines and stuck a
log message in at every state change. This should help with debugging #179.

The logs should still only trigger on broker rebalance or super-heavy traffic,
so normal operation should be quiet.
  • Loading branch information
eapache committed Nov 15, 2014
1 parent 31d9f4a commit c115b65
Showing 1 changed file with 10 additions and 8 deletions.
18 changes: 10 additions & 8 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ func (p *Producer) topicDispatcher() {
}

if msg.flags&shutdown != 0 {
Logger.Println("Producer shutting down.")
break
}

Expand All @@ -265,8 +266,6 @@ func (p *Producer) topicDispatcher() {
handler <- msg
}

Logger.Println("Producer shutting down.")

for _, handler := range handlers {
close(handler)
}
Expand Down Expand Up @@ -330,15 +329,15 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
if backlog == nil {
// on the very first retried message we send off a chaser so that we know when everything "in between" has made it
// back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
Logger.Printf("producer/leader state change to [retrying] on %s/%d\n", topic, partition)
output <- &MessageToSend{Topic: topic, partition: partition, flags: chaser}
Logger.Println("Producer dispatching retried messages to new leader.")
backlog = make([]*MessageToSend, 0)
p.unrefBrokerWorker(leader)
output = nil
}
} else {
// retry *and* chaser flag set, flush the backlog and return to normal processing
Logger.Println("Producer finished dispatching retried messages, processing backlog.")
Logger.Printf("producer/leader state change to [flushing] on %s/%d\n", topic, partition)
if output == nil {
err := p.client.RefreshTopicMetadata(topic)
if err != nil {
Expand All @@ -360,7 +359,7 @@ func (p *Producer) leaderDispatcher(topic string, partition int32, input chan *M
for _, msg := range backlog {
output <- msg
}
Logger.Println("Producer backlog processsed.")
Logger.Printf("producer/leader state change to [normal] on %s/%d\n", topic, partition)

backlog = nil
continue
Expand Down Expand Up @@ -417,7 +416,7 @@ func (p *Producer) messageAggregator(broker *Broker, input chan *MessageToSend)

if (bytesAccumulated+msg.byteSize() >= forceFlushThreshold()) ||
(p.config.Compression != CompressionNone && bytesAccumulated+msg.byteSize() >= p.config.MaxMessageBytes) {
Logger.Println("Producer accumulated maximum request size, forcing blocking flush.")
Logger.Println("producer/aggregator hit maximum request size, forcing blocking flush")
flusher <- buffer
buffer = nil
doFlush = nil
Expand Down Expand Up @@ -467,6 +466,8 @@ func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
if currentRetries[msg.Topic] != nil && currentRetries[msg.Topic][msg.partition] != nil {
if msg.flags&chaser == chaser {
// we can start processing this topic/partition again
Logger.Printf("producer/flusher state change to [normal] on %s/%d\n",
msg.Topic, msg.partition)
currentRetries[msg.Topic][msg.partition] = nil
}
p.retryMessages([]*MessageToSend{msg}, currentRetries[msg.Topic][msg.partition])
Expand Down Expand Up @@ -498,6 +499,7 @@ func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
continue
default:
p.client.disconnectBroker(broker)
Logger.Println("producer/flusher state change to [closing] because", err)
closing = err
p.retryMessages(batch, err)
continue
Expand Down Expand Up @@ -532,6 +534,8 @@ func (p *Producer) flusher(broker *Broker, input chan []*MessageToSend) {
p.returnSuccesses(msgs)
}
case UnknownTopicOrPartition, NotLeaderForPartition, LeaderNotAvailable:
Logger.Printf("producer/flusher state change to [retrying] on %s/%d because %v\n",
topic, partition, block.Err)
if currentRetries[topic] == nil {
currentRetries[topic] = make(map[int32]error)
}
Expand Down Expand Up @@ -696,7 +700,6 @@ func (p *Producer) returnSuccesses(batch []*MessageToSend) {
}

func (p *Producer) retryMessages(batch []*MessageToSend, err error) {
Logger.Println("Producer requeueing batch of", len(batch), "messages due to error:", err)
for _, msg := range batch {
if msg == nil {
continue
Expand All @@ -708,7 +711,6 @@ func (p *Producer) retryMessages(batch []*MessageToSend, err error) {
p.retries <- msg
}
}
Logger.Println("Messages requeued")
}

type brokerWorker struct {
Expand Down

0 comments on commit c115b65

Please sign in to comment.