Skip to content

Commit

Permalink
kv: log slow requests on replica level in addition to range level
Browse files Browse the repository at this point in the history
Previously, slow requests were only logged at the range level, but
the logs did not indicate which replica is slow. Moreover, the
SlowRPC metric attempted to represent the number of requests
currently being retried, however it was done on the range level
and therefore missed a second level of replica-level retries being
done underneath.

This PR adds logging on the replica level, removes a confusing log
line, and changes the metric to count the number of slow requests
in a simpler manner.

Epic: https://cockroachlabs.atlassian.net/browse/CRDB-33510
Fixes: cockroachdb#114431
  • Loading branch information
shralex committed Jan 4, 2024
1 parent 53e4efd commit 66af98c
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 5 deletions.
53 changes: 50 additions & 3 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package kvcoord
import (
"context"
"fmt"
"github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"runtime"
"runtime/pprof"
"strings"
Expand Down Expand Up @@ -161,6 +162,17 @@ var (
Name: "requests.slow.distsender",
Help: `Number of replica-bound RPCs currently stuck or retrying for a long time.
Note that this is not a good signal for KV health. The remote side of the
RPCs tracked here may experience contention, so an end user can easily
cause values for this metric to be emitted by leaving a transaction open
for a long time and contending with it using a second transaction.`,
Measurement: "Requests",
Unit: metric.Unit_COUNT,
}
metaDistSenderSlowReplicaRPCs = metric.Metadata{
Name: "distsender.slow.replicarpcs",
Help: `Number of slow replica-bound RPCs.
Note that this is not a good signal for KV health. The remote side of the
RPCs tracked here may experience contention, so an end user can easily
cause values for this metric to be emitted by leaving a transaction open
Expand Down Expand Up @@ -310,6 +322,7 @@ type DistSenderMetrics struct {
InLeaseTransferBackoffs *metric.Counter
RangeLookups *metric.Counter
SlowRPCs *metric.Gauge
SlowReplicaRPCs *metric.Counter
MethodCounts [kvpb.NumMethods]*metric.Counter
ErrCounts [kvpb.NumErrors]*metric.Counter
DistSenderRangeFeedMetrics
Expand Down Expand Up @@ -342,6 +355,7 @@ func makeDistSenderMetrics() DistSenderMetrics {
InLeaseTransferBackoffs: metric.NewCounter(metaDistSenderInLeaseTransferBackoffsCount),
RangeLookups: metric.NewCounter(metaDistSenderRangeLookups),
SlowRPCs: metric.NewGauge(metaDistSenderSlowRPCs),
SlowReplicaRPCs: metric.NewCounter(metaDistSenderSlowReplicaRPCs),
DistSenderRangeFeedMetrics: makeDistSenderRangeFeedMetrics(),
}
for i := range m.MethodCounts {
Expand Down Expand Up @@ -1814,6 +1828,22 @@ func slowRangeRPCReturnWarningStr(s *redact.StringBuilder, dur time.Duration, at
s.Printf("slow RPC finished after %.2fs (%d attempts)", dur.Seconds(), attempts)
}

func slowReplicaRPCWarningStr(
s *redact.StringBuilder,
ba *kvpb.BatchRequest,
dur time.Duration,
attempts int64,
err error,
br *kvpb.BatchResponse,
) {
resp := interface{}(err)
if resp == nil {
resp = br
}
s.Printf("have been waiting %.2fs (%d attempts) for RPC %s to replica %s; resp: %s",
dur.Seconds(), attempts, ba, ba.Replica, resp)
}

// sendPartialBatch sends the supplied batch to the range specified by the
// routing token.
//
Expand Down Expand Up @@ -1907,8 +1937,7 @@ func (ds *DistSender) sendPartialBatch(
prevTok = routingTok
reply, err = ds.sendToReplicas(ctx, ba, routingTok, withCommit)

const slowDistSenderThreshold = time.Minute
if dur := timeutil.Since(tBegin); dur > slowDistSenderThreshold && !tBegin.IsZero() {
if dur := timeutil.Since(tBegin); dur > slowDistSenderRangeThreshold && !tBegin.IsZero() {
{
var s redact.StringBuilder
slowRangeRPCWarningStr(&s, ba, dur, attempts, routingTok.Desc(), err, reply)
Expand Down Expand Up @@ -2185,6 +2214,13 @@ func noMoreReplicasErr(ambiguousErr, lastAttemptErr error) error {
return newSendError(errors.Wrap(lastAttemptErr, "sending to all replicas failed; last error"))
}

// slowDistSenderRangeThreshold is a latency threshold for logging slow requests to a range,
// potentially involving RPCs to multiple replicas of the range.
const slowDistSenderRangeThreshold = time.Minute

// slowDistSenderReplicaThreshold is a latency threshold for logging a slow RPC to a single replica.
const slowDistSenderReplicaThreshold = 3 * time.Second

// defaultSendClosedTimestampPolicy is used when the closed timestamp policy
// is not known by the range cache. This choice prevents sending batch requests
// to only voters when a perfectly good non-voter may exist in the local
Expand Down Expand Up @@ -2318,7 +2354,8 @@ func (ds *DistSender) sendToReplicas(
// per-replica state and may succeed on other replicas.
var ambiguousError error
var br *kvpb.BatchResponse
for first := true; ; first = false {
attempts := int64(0)
for first := true; ; first, attempts = false, attempts+1 {
if !first {
ds.metrics.NextReplicaErrCount.Inc(1)
}
Expand Down Expand Up @@ -2404,7 +2441,17 @@ func (ds *DistSender) sendToReplicas(
comparisonResult := ds.getLocalityComparison(ctx, ds.nodeIDGetter(), ba.Replica.NodeID)
ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchRequest(comparisonResult, int64(ba.Size()))

tBegin := timeutil.Now() // for slow log message
br, err = transport.SendNext(ctx, ba)
if admissionpb.WorkPriority(ba.AdmissionHeader.Priority) >= admissionpb.NormalPri {
if dur := timeutil.Since(tBegin); dur > slowDistSenderReplicaThreshold && !tBegin.IsZero() {
var s redact.StringBuilder
// Note that these RPC may or may not have succeeded. Errors are counted separately below.
ds.metrics.SlowReplicaRPCs.Inc(1)
slowReplicaRPCWarningStr(&s, ba, dur, attempts, err, br)
log.Warningf(ctx, "slow replica RPC: %v", &s)
}
}
ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchResponse(comparisonResult, int64(br.Size()))
ds.maybeIncrementErrCounters(br, err)

Expand Down
10 changes: 8 additions & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4435,6 +4435,11 @@ func TestDistSenderSlowLogMessage(t *testing.T) {
get.KeyLockingStrength = lock.Shared
get.KeyLockingDurability = lock.Unreplicated
ba.Add(get)
ba.Replica = roachpb.ReplicaDescriptor{
ReplicaID: 1,
NodeID: 2,
StoreID: 3,
}
br := &kvpb.BatchResponse{}
br.Error = kvpb.NewError(errors.New("boom"))
desc := &roachpb.RangeDescriptor{RangeID: 9, StartKey: roachpb.RKey("x"), EndKey: roachpb.RKey("z")}
Expand All @@ -4448,9 +4453,10 @@ func TestDistSenderSlowLogMessage(t *testing.T) {
}

{
exp := `slow RPC finished after 8.16s (120 attempts)`
exp := `have been waiting 8.16s (120 attempts) for RPC Get(Shared,Unreplicated) [‹"a"›] to` +
` replica (n2,s3):1; resp: ‹(err: boom)›`
var s redact.StringBuilder
slowRangeRPCReturnWarningStr(&s, dur, attempts)
slowReplicaRPCWarningStr(&s, ba, dur, attempts, nil /* err */, br)
act := s.RedactableString()
require.EqualValues(t, exp, act)
}
Expand Down

0 comments on commit 66af98c

Please sign in to comment.