Skip to content

Commit

Permalink
Add requests-in-flight metric
Browse files Browse the repository at this point in the history
- add requests-in-flight and requests-in-flight-for-broker-<brokerID>
  metrics to the broker
- move some b.updateOutgoingCommunicationMetrics(...) just after
  b.write(...) for consistency
- add -max-open-requests flag to kafka-producer-performance
- update kafka-producer-performance to monitor total requests in flight
- update unit tests
- document new metrics

This is a squash of the official PR based on more recent Sarama master:
IBM#1539
  • Loading branch information
slaunay committed Dec 16, 2019
1 parent a0317c1 commit 1181889
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 9 deletions.
55 changes: 48 additions & 7 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,15 @@ type Broker struct {
outgoingByteRate metrics.Meter
responseRate metrics.Meter
responseSize metrics.Histogram
requestsInFlight metrics.Counter
brokerIncomingByteRate metrics.Meter
brokerRequestRate metrics.Meter
brokerRequestSize metrics.Histogram
brokerRequestLatency metrics.Histogram
brokerOutgoingByteRate metrics.Meter
brokerResponseRate metrics.Meter
brokerResponseSize metrics.Histogram
brokerRequestsInFlight metrics.Counter

kerberosAuthenticator GSSAPIKerberosAuth
}
Expand Down Expand Up @@ -197,6 +199,7 @@ func (b *Broker) Open(conf *Config) error {
b.outgoingByteRate = metrics.GetOrRegisterMeter("outgoing-byte-rate", conf.MetricRegistry)
b.responseRate = metrics.GetOrRegisterMeter("response-rate", conf.MetricRegistry)
b.responseSize = getOrRegisterHistogram("response-size", conf.MetricRegistry)
b.requestsInFlight = metrics.GetOrRegisterCounter("requests-in-flight", conf.MetricRegistry)
// Do not gather metrics for seeded broker (only used during bootstrap) because they share
// the same id (-1) and are already exposed through the global metrics above
if b.id >= 0 {
Expand Down Expand Up @@ -760,16 +763,19 @@ func (b *Broker) sendWithPromise(rb protocolBody, promise *responsePromise) erro
}

requestTime := time.Now()
// Will be decremented in responseReceiver (except error or request with NoResponse)
b.addRequestInFlightMetrics(1)
bytes, err := b.conn.Write(buf)
b.updateOutgoingCommunicationMetrics(bytes) //TODO: should it be after error check
b.updateOutgoingCommunicationMetrics(bytes)
if err != nil {
b.addRequestInFlightMetrics(-1)
return err
}
b.correlationID++

if promise == nil {
// Record request latency without the response
b.updateRequestLatencyMetrics(time.Since(requestTime))
b.updateRequestLatencyAndInFlightMetrics(time.Since(requestTime))
return nil
}

Expand Down Expand Up @@ -872,6 +878,9 @@ func (b *Broker) responseReceiver() {

err := b.conn.SetReadDeadline(time.Now().Add(b.conf.Net.ReadTimeout))
if err != nil {
// This was previously incremented in send() and
// we are not calling updateIncomingCommunicationMetrics()
b.addRequestInFlightMetrics(-1)
dead = err
response.handle(nil, err)
continue
Expand Down Expand Up @@ -953,9 +962,12 @@ func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int
}

requestTime := time.Now()
// Will be decremented in updateIncomingCommunicationMetrics (except error)
b.addRequestInFlightMetrics(1)
bytes, err := b.conn.Write(buf)
b.updateOutgoingCommunicationMetrics(bytes)
if err != nil {
b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to send SASL handshake %s: %s\n", b.addr, err.Error())
return err
}
Expand All @@ -964,6 +976,7 @@ func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int
header := make([]byte, 8) // response header
_, err = io.ReadFull(b.conn, header)
if err != nil {
b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to read SASL handshake header : %s\n", err.Error())
return err
}
Expand All @@ -972,6 +985,7 @@ func (b *Broker) sendAndReceiveSASLHandshake(saslType SASLMechanism, version int
payload := make([]byte, length-4)
n, err := io.ReadFull(b.conn, payload)
if err != nil {
b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to read SASL handshake payload : %s\n", err.Error())
return err
}
Expand Down Expand Up @@ -1051,9 +1065,12 @@ func (b *Broker) sendAndReceiveV0SASLPlainAuth() error {
}

requestTime := time.Now()
// Will be decremented in updateIncomingCommunicationMetrics (except error)
b.addRequestInFlightMetrics(1)
bytesWritten, err := b.conn.Write(authBytes)
b.updateOutgoingCommunicationMetrics(bytesWritten)
if err != nil {
b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
return err
}
Expand All @@ -1078,11 +1095,13 @@ func (b *Broker) sendAndReceiveV1SASLPlainAuth() error {

requestTime := time.Now()

// Will be decremented in updateIncomingCommunicationMetrics (except error)
b.addRequestInFlightMetrics(1)
bytesWritten, err := b.sendSASLPlainAuthClientResponse(correlationID)

b.updateOutgoingCommunicationMetrics(bytesWritten)

if err != nil {
b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
return err
}
Expand Down Expand Up @@ -1137,14 +1156,17 @@ func (b *Broker) sendAndReceiveSASLOAuth(provider AccessTokenProvider) error {
func (b *Broker) sendClientMessage(message []byte) (bool, error) {

requestTime := time.Now()
// Will be decremented in updateIncomingCommunicationMetrics (except error)
b.addRequestInFlightMetrics(1)
correlationID := b.correlationID

bytesWritten, err := b.sendSASLOAuthBearerClientMessage(message, correlationID)
b.updateOutgoingCommunicationMetrics(bytesWritten)
if err != nil {
b.addRequestInFlightMetrics(-1)
return false, err
}

b.updateOutgoingCommunicationMetrics(bytesWritten)
b.correlationID++

res := &SaslAuthenticateResponse{}
Expand Down Expand Up @@ -1180,17 +1202,21 @@ func (b *Broker) sendAndReceiveSASLSCRAMv1() error {

for !scramClient.Done() {
requestTime := time.Now()
// Will be decremented in updateIncomingCommunicationMetrics (except error)
b.addRequestInFlightMetrics(1)
correlationID := b.correlationID
bytesWritten, err := b.sendSaslAuthenticateRequest(correlationID, []byte(msg))
b.updateOutgoingCommunicationMetrics(bytesWritten)
if err != nil {
b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to write SASL auth header to broker %s: %s\n", b.addr, err.Error())
return err
}

b.updateOutgoingCommunicationMetrics(bytesWritten)
b.correlationID++
challenge, err := b.receiveSaslAuthenticateResponse(correlationID)
if err != nil {
b.addRequestInFlightMetrics(-1)
Logger.Printf("Failed to read response while authenticating with SASL to broker %s: %s\n", b.addr, err.Error())
return err
}
Expand Down Expand Up @@ -1361,7 +1387,7 @@ func (b *Broker) receiveSASLServerResponse(res *SaslAuthenticateResponse, correl
}

func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency time.Duration) {
b.updateRequestLatencyMetrics(requestLatency)
b.updateRequestLatencyAndInFlightMetrics(requestLatency)
b.responseRate.Mark(1)

if b.brokerResponseRate != nil {
Expand All @@ -1380,14 +1406,22 @@ func (b *Broker) updateIncomingCommunicationMetrics(bytes int, requestLatency ti
}
}

func (b *Broker) updateRequestLatencyMetrics(requestLatency time.Duration) {
func (b *Broker) updateRequestLatencyAndInFlightMetrics(requestLatency time.Duration) {
requestLatencyInMs := int64(requestLatency / time.Millisecond)
b.requestLatency.Update(requestLatencyInMs)

if b.brokerRequestLatency != nil {
b.brokerRequestLatency.Update(requestLatencyInMs)
}

b.addRequestInFlightMetrics(-1)
}

func (b *Broker) addRequestInFlightMetrics(i int64) {
b.requestsInFlight.Inc(i)
if b.brokerRequestsInFlight != nil {
b.brokerRequestsInFlight.Inc(i)
}
}

func (b *Broker) updateOutgoingCommunicationMetrics(bytes int) {
Expand Down Expand Up @@ -1417,6 +1451,7 @@ func (b *Broker) registerMetrics() {
b.brokerOutgoingByteRate = b.registerMeter("outgoing-byte-rate")
b.brokerResponseRate = b.registerMeter("response-rate")
b.brokerResponseSize = b.registerHistogram("response-size")
b.brokerRequestsInFlight = b.registerCounter("requests-in-flight")
}

func (b *Broker) unregisterMetrics() {
Expand All @@ -1436,3 +1471,9 @@ func (b *Broker) registerHistogram(name string) metrics.Histogram {
b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
return getOrRegisterHistogram(nameForBroker, b.conf.MetricRegistry)
}

func (b *Broker) registerCounter(name string) metrics.Counter {
nameForBroker := getMetricNameForBroker(name, b)
b.registeredMetrics = append(b.registeredMetrics, nameForBroker)
return metrics.GetOrRegisterCounter(nameForBroker, b.conf.MetricRegistry)
}
57 changes: 57 additions & 0 deletions broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func TestSASLOAuthBearer(t *testing.T) {
broker.responseSize = metrics.NilHistogram{}
broker.responseRate = metrics.NilMeter{}
broker.requestLatency = metrics.NilHistogram{}
broker.requestsInFlight = metrics.NilCounter{}

conf := NewConfig()
conf.Net.SASL.Mechanism = SASLTypeOAuth
Expand Down Expand Up @@ -337,6 +338,7 @@ func TestSASLSCRAMSHAXXX(t *testing.T) {
broker.responseSize = metrics.NilHistogram{}
broker.responseRate = metrics.NilMeter{}
broker.requestLatency = metrics.NilHistogram{}
broker.requestsInFlight = metrics.NilCounter{}

mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).SetAuthBytes([]byte(test.scramChallengeResp))
mockSASLHandshakeResponse := NewMockSaslHandshakeResponse(t).SetEnabledMechanisms([]string{SASLTypeSCRAMSHA256, SASLTypeSCRAMSHA512})
Expand Down Expand Up @@ -450,6 +452,7 @@ func TestSASLPlainAuth(t *testing.T) {
broker.responseSize = metrics.NilHistogram{}
broker.responseRate = metrics.NilMeter{}
broker.requestLatency = metrics.NilHistogram{}
broker.requestsInFlight = metrics.NilCounter{}

conf := NewConfig()
conf.Net.SASL.Mechanism = SASLTypePlaintext
Expand Down Expand Up @@ -493,6 +496,56 @@ func TestSASLPlainAuth(t *testing.T) {
}
}

// TestSASLReadTimeout ensures that the broker connection won't block forever
// if the remote end never responds after the handshake
func TestSASLReadTimeout(t *testing.T) {
mockBroker := NewMockBroker(t, 0)
defer mockBroker.Close()

mockSASLAuthResponse := NewMockSaslAuthenticateResponse(t).
SetAuthBytes([]byte(`response_payload`))

mockBroker.SetHandlerByMap(map[string]MockResponse{
"SaslAuthenticateRequest": mockSASLAuthResponse,
})

broker := NewBroker(mockBroker.Addr())
{
broker.requestRate = metrics.NilMeter{}
broker.outgoingByteRate = metrics.NilMeter{}
broker.incomingByteRate = metrics.NilMeter{}
broker.requestSize = metrics.NilHistogram{}
broker.responseSize = metrics.NilHistogram{}
broker.responseRate = metrics.NilMeter{}
broker.requestLatency = metrics.NilHistogram{}
broker.requestsInFlight = metrics.NilCounter{}
}

conf := NewConfig()
{
conf.Net.ReadTimeout = time.Millisecond
conf.Net.SASL.Mechanism = SASLTypePlaintext
conf.Net.SASL.User = "token"
conf.Net.SASL.Password = "password"
conf.Net.SASL.Version = SASLHandshakeV1
}

broker.conf = conf
broker.conf.Version = V1_0_0_0
dialer := net.Dialer{}

conn, err := dialer.Dial("tcp", mockBroker.listener.Addr().String())
if err != nil {
t.Fatal(err)
}

broker.conn = conn
err = broker.authenticateViaSASL()
if err == nil {
t.Errorf("should never happen - expected read timeout")
}
}

func TestGSSAPIKerberosAuth_Authorize(t *testing.T) {

testTable := []struct {
Expand Down Expand Up @@ -557,6 +610,7 @@ func TestGSSAPIKerberosAuth_Authorize(t *testing.T) {
broker.responseSize = metrics.NilHistogram{}
broker.responseRate = metrics.NilMeter{}
broker.requestLatency = metrics.NilHistogram{}
broker.requestsInFlight = metrics.NilCounter{}
conf := NewConfig()
conf.Net.SASL.Mechanism = SASLTypeGSSAPI
conf.Net.SASL.GSSAPI.ServiceName = "kafka"
Expand Down Expand Up @@ -924,6 +978,9 @@ func validateBrokerMetrics(t *testing.T, broker *Broker, mockBrokerMetrics broke
metricValidators.registerForAllBrokers(broker, countHistogramValidator("request-size", 1))
metricValidators.registerForAllBrokers(broker, minMaxHistogramValidator("request-size", mockBrokerBytesRead, mockBrokerBytesRead))

// Check that there is no more requests in flight
metricValidators.registerForAllBrokers(broker, counterValidator("requests-in-flight", 0))

// Run the validators
metricValidators.run(t, broker.conf.MetricRegistry)
}
3 changes: 3 additions & 0 deletions functional_producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,9 @@ func validateMetrics(t *testing.T, client Client) {
metricValidators.registerForBroker(broker, minValHistogramValidator("response-size", 1))
}

// There should be no requests in flight anymore
metricValidators.registerForAllBrokers(broker, counterValidator("requests-in-flight", 0))

// Run the validators
metricValidators.run(t, client.Config().MetricRegistry)
}
Expand Down
16 changes: 16 additions & 0 deletions metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,19 @@ func maxValHistogramValidator(name string, maxMax int) *metricValidator {
}
})
}

func counterValidator(name string, expectedCount int) *metricValidator {
return &metricValidator{
name: name,
validator: func(t *testing.T, metric interface{}) {
if counter, ok := metric.(metrics.Counter); !ok {
t.Errorf("Expected counter metric for '%s', got %T", name, metric)
} else {
count := counter.Count()
if count != int64(expectedCount) {
t.Errorf("Expected counter metric '%s' count = %d, got %d", name, expectedCount, count)
}
}
},
}
}
4 changes: 4 additions & 0 deletions sarama.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ Broker related metrics:
| response-rate-for-broker-<broker-id> | meter | Responses/second received from a given broker |
| response-size | histogram | Distribution of the response size in bytes for all brokers |
| response-size-for-broker-<broker-id> | histogram | Distribution of the response size in bytes for a given broker |
| requests-in-flight | counter | The current number of in-flight requests awaiting a response |
| | | for all brokers |
| requests-in-flight-for-broker-<broker-id> | counter | The current number of in-flight requests awaiting a response |
| | | for a given broker |
+----------------------------------------------+------------+---------------------------------------------------------------+
Note that we do not gather specific metrics for seed brokers but they are part of the "all brokers" metrics.
Expand Down
14 changes: 12 additions & 2 deletions tools/kafka-producer-performance/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ var (
0,
"The maximum number of messages to send per second (0 for no limit).",
)
maxOpenRequests = flag.Int(
"max-open-requests",
5,
"The maximum number of unacknowledged requests the client will send on a single connection before blocking (default: 5).",
)
maxMessageBytes = flag.Int(
"max-message-bytes",
1000000,
Expand Down Expand Up @@ -248,6 +253,7 @@ func main() {

config := sarama.NewConfig()

config.Net.MaxOpenRequests = *maxOpenRequests
config.Producer.MaxMessageBytes = *maxMessageBytes
config.Producer.RequiredAcks = sarama.RequiredAcks(*requiredAcks)
config.Producer.Timeout = *timeout
Expand Down Expand Up @@ -430,17 +436,20 @@ func printMetrics(w io.Writer, r metrics.Registry) {
recordSendRateMetric := r.Get("record-send-rate")
requestLatencyMetric := r.Get("request-latency-in-ms")
outgoingByteRateMetric := r.Get("outgoing-byte-rate")
requestsInFlightMetric := r.Get("requests-in-flight")

if recordSendRateMetric == nil || requestLatencyMetric == nil || outgoingByteRateMetric == nil {
if recordSendRateMetric == nil || requestLatencyMetric == nil || outgoingByteRateMetric == nil ||
requestsInFlightMetric == nil {
return
}
recordSendRate := recordSendRateMetric.(metrics.Meter).Snapshot()
requestLatency := requestLatencyMetric.(metrics.Histogram).Snapshot()
requestLatencyPercentiles := requestLatency.Percentiles([]float64{0.5, 0.75, 0.95, 0.99, 0.999})
outgoingByteRate := outgoingByteRateMetric.(metrics.Meter).Snapshot()
requestsInFlight := requestsInFlightMetric.(metrics.Counter).Count()
fmt.Fprintf(w, "%d records sent, %.1f records/sec (%.2f MiB/sec ingress, %.2f MiB/sec egress), "+
"%.1f ms avg latency, %.1f ms stddev, %.1f ms 50th, %.1f ms 75th, "+
"%.1f ms 95th, %.1f ms 99th, %.1f ms 99.9th\n",
"%.1f ms 95th, %.1f ms 99th, %.1f ms 99.9th, %d total req. in flight\n",
recordSendRate.Count(),
recordSendRate.RateMean(),
recordSendRate.RateMean()*float64(*messageSize)/1024/1024,
Expand All @@ -452,6 +461,7 @@ func printMetrics(w io.Writer, r metrics.Registry) {
requestLatencyPercentiles[2],
requestLatencyPercentiles[3],
requestLatencyPercentiles[4],
requestsInFlight,
)
}

Expand Down

0 comments on commit 1181889

Please sign in to comment.