diff --git a/broker.go b/broker.go index f494fcb33..e995e8b9f 100644 --- a/broker.go +++ b/broker.go @@ -40,6 +40,7 @@ type Broker struct { outgoingByteRate metrics.Meter responseRate metrics.Meter responseSize metrics.Histogram + requestsInFlight metrics.Counter brokerIncomingByteRate metrics.Meter brokerRequestRate metrics.Meter brokerRequestSize metrics.Histogram @@ -47,6 +48,7 @@ type Broker struct { brokerOutgoingByteRate metrics.Meter brokerResponseRate metrics.Meter brokerResponseSize metrics.Histogram + brokerRequestsInFlight metrics.Counter kerberosAuthenticator GSSAPIKerberosAuth } @@ -197,6 +199,7 @@ func (b *Broker) Open(conf *Config) error { b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry) b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry) b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry) + b.requestsInFlight = metrics.GetOrRegisterCounter("requests-in-flight", conf.MetricRegistry) // Do not gather metrics for seeded broker (only used during bootstrap) because they share // the same id (-1) and are already exposed through the global metrics above if b.id >= 0 { @@ -760,16 +763,19 @@ func (b *Broker) sendWithPromise(rb protocolBody, promise *responsePromise) erro } requestTime := time.Now() + // Will be decremented in responseReceiver (except error or request with NoResponse) + b.addRequestInFlightMetrics(1) bytes, err := b.conn.Write(buf) - b.updateOutgoingCommunicationMetrics(bytes) //TODO: should it be after error check + b.updateOutgoingCommunicationMetrics(bytes) if err != nil { + b.addRequestInFlightMetrics(-1) return err } b.correlationID++ if promise == nil { // Record request latency without the response - b.updateRequestLatencyMetrics(time.Since(requestTime)) + b.updateRequestLatencyAndInFlightMetrics(time.Since(requestTime)) return nil } @@ -872,6 +878,9 @@ func (b *Broker) responseReceiver() { err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout)) if err != nil { + // This was previously incremented in send() and + // we are not calling updateIncomingCommunicationMetrics() + b.addRequestInFlightMetrics(-1) dead = err response.handle(nil, err) continue @@ -953,9 +962,12 @@ func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int } requestTime := time.Now() + // Will be decremented in updateIncomingCommunicationMetrics (except error) + b.addRequestInFlightMetrics(1) bytes, err := b.conn.Write(buf) b.updateOutgoingCommunicationMetrics(bytes) if err != nil { + b.addRequestInFlightMetrics(-1) Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error()) return err } @@ -964,6 +976,7 @@ func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int header := make([]byte, 8) // response header _, err = io.ReadFull(b.conn, header) if err != nil { + b.addRequestInFlightMetrics(-1) Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error()) return err } @@ -972,6 +985,7 @@ func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int payload := make([]byte, length-4) n, err := io.ReadFull(b.conn, payload) if err != nil { + b.addRequestInFlightMetrics(-1) Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error()) return err } @@ -1051,9 +1065,12 @@ func (b *Broker) sendAndReceiveV0SASLPlainAuth() error { } requestTime := time.Now() + // Will be decremented in updateIncomingCommunicationMetrics (except error) + b.addRequestInFlightMetrics(1) bytesWritten, err := b.conn.Write(authBytes) b.updateOutgoingCommunicationMetrics(bytesWritten) if err != nil { + b.addRequestInFlightMetrics(-1) Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error()) return err } @@ -1078,11 +1095,13 @@ func (b *Broker) sendAndReceiveV1SASLPlainAuth() error { requestTime := time.Now() + // Will be decremented in updateIncomingCommunicationMetrics (except error) + b.addRequestInFlightMetrics(1) bytesWritten, err := b.sendSASLPlainAuthClientResponse(correlationID) - b.updateOutgoingCommunicationMetrics(bytesWritten) if err != nil { + b.addRequestInFlightMetrics(-1) Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error()) return err } @@ -1137,14 +1156,17 @@ func (b *Broker) sendAndReceiveSASLOAuth(provider AccessTokenProvider) error { func (b *Broker) sendClientMessage(message []byte) (bool, error) { requestTime := time.Now() + // Will be decremented in updateIncomingCommunicationMetrics (except error) + b.addRequestInFlightMetrics(1) correlationID := b.correlationID bytesWritten, err := b.sendSASLOAuthBearerClientMessage(message, correlationID) + b.updateOutgoingCommunicationMetrics(bytesWritten) if err != nil { + b.addRequestInFlightMetrics(-1) return false, err } - b.updateOutgoingCommunicationMetrics(bytesWritten) b.correlationID++ res := &SaslAuthenticateResponse{} @@ -1180,17 +1202,21 @@ func (b *Broker) sendAndReceiveSASLSCRAMv1() error { for !scramClient.Done() { requestTime := time.Now() + // Will be decremented in updateIncomingCommunicationMetrics (except error) + b.addRequestInFlightMetrics(1) correlationID := b.correlationID bytesWritten, err := b.sendSaslAuthenticateRequest(correlationID, []byte(msg)) + b.updateOutgoingCommunicationMetrics(bytesWritten) if err != nil { + b.addRequestInFlightMetrics(-1) Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error()) return err } - b.updateOutgoingCommunicationMetrics(bytesWritten) b.correlationID++ challenge, err := b.receiveSaslAuthenticateResponse(correlationID) if err != nil { + b.addRequestInFlightMetrics(-1) Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error()) return err } @@ -1361,7 +1387,7 @@ func (b *Broker) receiveSASLServerResponse(res *SaslAuthenticateResponse, correl } func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) { - b.updateRequestLatencyMetrics(requestLatency) + b.updateRequestLatencyAndInFlightMetrics(requestLatency) b.responseRate.Mark(1) if b.brokerResponseRate != nil { @@ -1380,7 +1406,7 @@ func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency ti } } -func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) { +func (b *Broker) updateRequestLatencyAndInFlightMetrics(requestLatency time.Duration) { requestLatencyInMs := int64(requestLatency / time.Millisecond) b.requestLatency.Update(requestLatencyInMs) @@ -1388,6 +1414,14 @@ func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) { b.brokerRequestLatency.Update(requestLatencyInMs) } + b.addRequestInFlightMetrics(-1) +} + +func (b *Broker) addRequestInFlightMetrics(i int64) { + b.requestsInFlight.Inc(i) + if b.brokerRequestsInFlight != nil { + b.brokerRequestsInFlight.Inc(i) + } } func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) { @@ -1417,6 +1451,7 @@ func (b *Broker) registerMetrics() { b.brokerOutgoingByteRate = b.registerMeter("outgoing-byte-rate") b.brokerResponseRate = b.registerMeter("response-rate") b.brokerResponseSize = b.registerHistogram("response-size") + b.brokerRequestsInFlight = b.registerCounter("requests-in-flight") } func (b *Broker) unregisterMetrics() { @@ -1436,3 +1471,9 @@ func (b *Broker) registerHistogram(name string) metrics.Histogram { b.registeredMetrics = append(b.registeredMetrics, nameForBroker) return getOrRegisterHistogram(nameForBroker, b.conf.MetricRegistry) } + +func (b *Broker) registerCounter(name string) metrics.Counter { + nameForBroker := getMetricNameForBroker(name, b) + b.registeredMetrics = append(b.registeredMetrics, nameForBroker) + return metrics.GetOrRegisterCounter(nameForBroker, b.conf.MetricRegistry) +} diff --git a/broker_test.go b/broker_test.go index f12c16901..251759553 100644 --- a/broker_test.go +++ b/broker_test.go @@ -218,6 +218,7 @@ func TestSASLOAuthBearer(t *testing.T) { broker.responseSize = metrics.NilHistogram{} broker.responseRate = metrics.NilMeter{} broker.requestLatency = metrics.NilHistogram{} + broker.requestsInFlight = metrics.NilCounter{} conf := NewConfig() conf.Net.SASL.Mechanism = SASLTypeOAuth @@ -337,6 +338,7 @@ func TestSASLSCRAMSHAXXX(t *testing.T) { broker.responseSize = metrics.NilHistogram{} broker.responseRate = metrics.NilMeter{} broker.requestLatency = metrics.NilHistogram{} + broker.requestsInFlight = metrics.NilCounter{} mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).SetAuthBytes([]byte(test.scramChallengeResp)) mockSASLHandshakeResponse := NewMockSaslHandshakeResponse(t).SetEnabledMechanisms([]string{SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512}) @@ -450,6 +452,7 @@ func TestSASLPlainAuth(t *testing.T) { broker.responseSize = metrics.NilHistogram{} broker.responseRate = metrics.NilMeter{} broker.requestLatency = metrics.NilHistogram{} + broker.requestsInFlight = metrics.NilCounter{} conf := NewConfig() conf.Net.SASL.Mechanism = SASLTypePlaintext @@ -493,6 +496,56 @@ func TestSASLPlainAuth(t *testing.T) { } } +// TestSASLReadTimeout ensures that the broker connection won't block forever +// if the remote end never responds after the handshake +func TestSASLReadTimeout(t *testing.T) { + mockBroker := NewMockBroker(t, 0) + defer mockBroker.Close() + + mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t). + SetAuthBytes([]byte(`response_payload`)) + + mockBroker.SetHandlerByMap(map[string]MockResponse{ + "SaslAuthenticateRequest": mockSASLAuthResponse, + }) + + broker := NewBroker(mockBroker.Addr()) + { + broker.requestRate = metrics.NilMeter{} + broker.outgoingByteRate = metrics.NilMeter{} + broker.incomingByteRate = metrics.NilMeter{} + broker.requestSize = metrics.NilHistogram{} + broker.responseSize = metrics.NilHistogram{} + broker.responseRate = metrics.NilMeter{} + broker.requestLatency = metrics.NilHistogram{} + broker.requestsInFlight = metrics.NilCounter{} + } + + conf := NewConfig() + { + conf.Net.ReadTimeout = time.Millisecond + conf.Net.SASL.Mechanism = SASLTypePlaintext + conf.Net.SASL.User = "token" + conf.Net.SASL.Password = "password" + conf.Net.SASL.Version = SASLHandshakeV1 + } + + broker.conf = conf + broker.conf.Version = V1_0_0_0 + dialer := net.Dialer{} + + conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String()) + if err != nil { + t.Fatal(err) + } + + broker.conn = conn + err = broker.authenticateViaSASL() + if err == nil { + t.Errorf("should never happen - expected read timeout") + } +} + func TestGSSAPIKerberosAuth_Authorize(t *testing.T) { testTable := []struct { @@ -557,6 +610,7 @@ func TestGSSAPIKerberosAuth_Authorize(t *testing.T) { broker.responseSize = metrics.NilHistogram{} broker.responseRate = metrics.NilMeter{} broker.requestLatency = metrics.NilHistogram{} + broker.requestsInFlight = metrics.NilCounter{} conf := NewConfig() conf.Net.SASL.Mechanism = SASLTypeGSSAPI conf.Net.SASL.GSSAPI.ServiceName = "kafka" @@ -924,6 +978,9 @@ func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics broke metricValidators.registerForAllBrokers(broker, countHistogramValidator("request-size", 1)) metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("request-size", mockBrokerBytesRead, mockBrokerBytesRead)) + // Check that there is no more requests in flight + metricValidators.registerForAllBrokers(broker, counterValidator("requests-in-flight", 0)) + // Run the validators metricValidators.run(t, broker.conf.MetricRegistry) } diff --git a/functional_producer_test.go b/functional_producer_test.go index 91bf3b5ee..6c118b8ee 100644 --- a/functional_producer_test.go +++ b/functional_producer_test.go @@ -264,6 +264,9 @@ func validateMetrics(t *testing.T, client Client) { metricValidators.registerForBroker(broker, minValHistogramValidator("response-size", 1)) } + // There should be no requests in flight anymore + metricValidators.registerForAllBrokers(broker, counterValidator("requests-in-flight", 0)) + // Run the validators metricValidators.run(t, client.Config().MetricRegistry) } diff --git a/metrics_test.go b/metrics_test.go index 789c0ff33..7572f5b90 100644 --- a/metrics_test.go +++ b/metrics_test.go @@ -170,3 +170,19 @@ func maxValHistogramValidator(name string, maxMax int) *metricValidator { } }) } + +func counterValidator(name string, expectedCount int) *metricValidator { + return &metricValidator{ + name: name, + validator: func(t *testing.T, metric interface{}) { + if counter, ok := metric.(metrics.Counter); !ok { + t.Errorf("Expected counter metric for '%s', got %T", name, metric) + } else { + count := counter.Count() + if count != int64(expectedCount) { + t.Errorf("Expected counter metric '%s' count = %d, got %d", name, expectedCount, count) + } + } + }, + } +} diff --git a/sarama.go b/sarama.go index 1e0277aeb..48f362d28 100644 --- a/sarama.go +++ b/sarama.go @@ -39,6 +39,10 @@ Broker related metrics: | response-rate-for-broker- | meter | Responses/second received from a given broker | | response-size | histogram | Distribution of the response size in bytes for all brokers | | response-size-for-broker- | histogram | Distribution of the response size in bytes for a given broker | + | requests-in-flight | counter | The current number of in-flight requests awaiting a response | + | | | for all brokers | + | requests-in-flight-for-broker- | counter | The current number of in-flight requests awaiting a response | + | | | for a given broker | +----------------------------------------------+------------+---------------------------------------------------------------+ Note that we do not gather specific metrics for seed brokers but they are part of the "all brokers" metrics. diff --git a/tools/kafka-producer-performance/main.go b/tools/kafka-producer-performance/main.go index 7bf1531c3..e1a8163b5 100644 --- a/tools/kafka-producer-performance/main.go +++ b/tools/kafka-producer-performance/main.go @@ -80,6 +80,11 @@ var ( 0, "The maximum number of messages to send per second (0 for no limit).", ) + maxOpenRequests = flag.Int( + "max-open-requests", + 5, + "The maximum number of unacknowledged requests the client will send on a single connection before blocking (default: 5).", + ) maxMessageBytes = flag.Int( "max-message-bytes", 1000000, @@ -248,6 +253,7 @@ func main() { config := sarama.NewConfig() + config.Net.MaxOpenRequests = *maxOpenRequests config.Producer.MaxMessageBytes = *maxMessageBytes config.Producer.RequiredAcks = sarama.RequiredAcks(*requiredAcks) config.Producer.Timeout = *timeout @@ -430,17 +436,20 @@ func printMetrics(w io.Writer, r metrics.Registry) { recordSendRateMetric := r.Get("record-send-rate") requestLatencyMetric := r.Get("request-latency-in-ms") outgoingByteRateMetric := r.Get("outgoing-byte-rate") + requestsInFlightMetric := r.Get("requests-in-flight") - if recordSendRateMetric == nil || requestLatencyMetric == nil || outgoingByteRateMetric == nil { + if recordSendRateMetric == nil || requestLatencyMetric == nil || outgoingByteRateMetric == nil || + requestsInFlightMetric == nil { return } recordSendRate := recordSendRateMetric.(metrics.Meter).Snapshot() requestLatency := requestLatencyMetric.(metrics.Histogram).Snapshot() requestLatencyPercentiles := requestLatency.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999}) outgoingByteRate := outgoingByteRateMetric.(metrics.Meter).Snapshot() + requestsInFlight := requestsInFlightMetric.(metrics.Counter).Count() fmt.Fprintf(w, "%d records sent, %.1f records/sec (%.2f MiB/sec ingress, %.2f MiB/sec egress), "+ "%.1f ms avg latency, %.1f ms stddev, %.1f ms 50th, %.1f ms 75th, "+ - "%.1f ms 95th, %.1f ms 99th, %.1f ms 99.9th\n", + "%.1f ms 95th, %.1f ms 99th, %.1f ms 99.9th, %d total req. in flight\n", recordSendRate.Count(), recordSendRate.RateMean(), recordSendRate.RateMean()*float64(*messageSize)/1024/1024, @@ -452,6 +461,7 @@ func printMetrics(w io.Writer, r metrics.Registry) { requestLatencyPercentiles[2], requestLatencyPercentiles[3], requestLatencyPercentiles[4], + requestsInFlight, ) }