diff --git a/broker.go b/broker.go index cb4e9cf8d..7b797d5a0 100644 --- a/broker.go +++ b/broker.go @@ -49,6 +49,7 @@ type Broker struct { brokerResponseRate metrics.Meter brokerResponseSize metrics.Histogram brokerRequestsInFlight metrics.Counter + brokerThrottleTime metrics.Histogram kerberosAuthenticator GSSAPIKerberosAuth } @@ -336,6 +337,15 @@ func (b *Broker) Produce(request *ProduceRequest) (*ProduceResponse, error) { } else { response = new(ProduceResponse) err = b.sendAndReceive(request, response) + if response.ThrottleTime != time.Duration(0) { + DebugLogger.Printf( + "producer/broker/%d ProduceResponse throttled %v\n", + b.ID(), response.ThrottleTime) + if b.brokerThrottleTime != nil { + throttleTimeInMs := int64(response.ThrottleTime / time.Millisecond) + b.brokerThrottleTime.Update(throttleTimeInMs) + } + } } if err != nil { @@ -1527,6 +1537,7 @@ func (b *Broker) registerMetrics() { b.brokerResponseRate = b.registerMeter("response-rate") b.brokerResponseSize = b.registerHistogram("response-size") b.brokerRequestsInFlight = b.registerCounter("requests-in-flight") + b.brokerThrottleTime = b.registerHistogram("throttle-time-in-ms") } func (b *Broker) unregisterMetrics() {