diff --git a/build/deploy-redhat/Dockerfile.in b/build/deploy-redhat/Dockerfile.in
index a87142a11f6f..edbb19888dfe 100644
--- a/build/deploy-redhat/Dockerfile.in
+++ b/build/deploy-redhat/Dockerfile.in
@@ -1,6 +1,6 @@
FROM @repository@:@tag@
-RUN microdnf install yum && \
+RUN microdnf install -y yum && \
yum -v -y update --all && \
microdnf clean all && \
rm -rf /var/cache/yum
diff --git a/build/release/bincheck/bincheck b/build/release/bincheck/bincheck
index 9478e1a995d7..ee47b5b2fdee 100755
--- a/build/release/bincheck/bincheck
+++ b/build/release/bincheck/bincheck
@@ -57,7 +57,7 @@ EOF
diff -u expected actual
# Verify libgeos functionality on all platforms except MacOS ARM64 and Windows
-if [[ $(uname -om) == "Darwin arm64" ]]; then
+if [[ $(uname -om) == "Darwin arm64" || $(uname -o) == "Msys" ]]; then
echo "Skipping libgeos tests"
else
echo "Testing libgeos functionality"
diff --git a/build/release/bincheck/download_binary.sh b/build/release/bincheck/download_binary.sh
index 4a8449a71a9b..0a7d142480cb 100755
--- a/build/release/bincheck/download_binary.sh
+++ b/build/release/bincheck/download_binary.sh
@@ -17,8 +17,6 @@ download_and_extract() {
else
curl -sSfL "${binary_url}" > cockroach.zip
7z e -omnt cockroach.zip
- mkdir -p mnt/lib
- mv mnt/*.dll mnt/lib/
fi
echo "Downloaded ${binary_url}"
diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html
index e986a54fd8c8..5ad567ffe388 100644
--- a/docs/generated/metrics/metrics.html
+++ b/docs/generated/metrics/metrics.html
@@ -762,6 +762,7 @@
APPLICATION | changefeed.forwarded_resolved_messages | Resolved timestamps forwarded from the change aggregator to the change frontier | Messages | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | changefeed.frontier_updates | Number of change frontier updates across all feeds | Updates | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | changefeed.internal_retry_message_count | Number of messages for which an attempt to retry them within an aggregator node was made | Messages | GAUGE | COUNT | AVG | NONE |
+APPLICATION | changefeed.kafka_throttling_hist_nanos | Time spent in throttling due to exceeding kafka quota | Nanoseconds | HISTOGRAM | NANOSECONDS | AVG | NONE |
APPLICATION | changefeed.lagging_ranges | The number of ranges considered to be lagging behind | Ranges | GAUGE | COUNT | AVG | NONE |
APPLICATION | changefeed.max_behind_nanos | (Deprecated in favor of checkpoint_progress) The most any changefeed's persisted checkpoint is behind the present | Nanoseconds | GAUGE | NANOSECONDS | AVG | NONE |
APPLICATION | changefeed.message_size_hist | Message size histogram | Bytes | HISTOGRAM | BYTES | AVG | NONE |
@@ -924,6 +925,7 @@
APPLICATION | distsender.rpc.transferlease.sent | Number of TransferLease requests processed.
This counts the requests in batches handed to DistSender, not the RPCs sent to individual Ranges as a result. | RPCs | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | distsender.rpc.truncatelog.sent | Number of TruncateLog requests processed.
This counts the requests in batches handed to DistSender, not the RPCs sent to individual Ranges as a result. | RPCs | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | distsender.rpc.writebatch.sent | Number of WriteBatch requests processed.
This counts the requests in batches handed to DistSender, not the RPCs sent to individual Ranges as a result. | RPCs | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
+APPLICATION | distsender.slow.replicarpcs | Number of slow replica-bound RPCs.
Note that this is not a good signal for KV health. The remote side of the RPCs tracked here may experience contention, so an end user can easily cause values for this metric to be emitted by leaving a transaction open for a long time and contending with it using a second transaction. | Requests | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | jobs.adopt_iterations | number of job-adopt iterations performed by the registry | iterations | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | jobs.auto_config_env_runner.currently_idle | Number of auto_config_env_runner jobs currently considered Idle and can be freely shut down | jobs | GAUGE | COUNT | AVG | NONE |
APPLICATION | jobs.auto_config_env_runner.currently_paused | Number of auto_config_env_runner jobs currently considered Paused | jobs | GAUGE | COUNT | AVG | NONE |
@@ -1244,7 +1246,7 @@
APPLICATION | physical_replication.resolved_events_ingested | Resolved events ingested by all replication jobs | Events | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | physical_replication.running | Number of currently running replication streams | Replication Streams | GAUGE | COUNT | AVG | NONE |
APPLICATION | physical_replication.sst_bytes | SST bytes (compressed) sent to KV by all replication jobs | Bytes | COUNTER | BYTES | AVG | NON_NEGATIVE_DERIVATIVE |
-APPLICATION | requests.slow.distsender | Number of replica-bound RPCs currently stuck or retrying for a long time.
Note that this is not a good signal for KV health. The remote side of the RPCs tracked here may experience contention, so an end user can easily cause values for this metric to be emitted by leaving a transaction open for a long time and contending with it using a second transaction. | Requests | GAUGE | COUNT | AVG | NONE |
+APPLICATION | requests.slow.distsender | Number of range-bound RPCs currently stuck or retrying for a long time.
Note that this is not a good signal for KV health. The remote side of the RPCs tracked here may experience contention, so an end user can easily cause values for this metric to be emitted by leaving a transaction open for a long time and contending with it using a second transaction. | Requests | GAUGE | COUNT | AVG | NONE |
APPLICATION | round-trip-latency | Distribution of round-trip latencies with other nodes.
This only reflects successful heartbeats and measures gRPC overhead as well as possible head-of-line blocking. Elevated values in this metric may hint at network issues and/or saturation, but they are no proof of them. CPU overload can similarly elevate this metric. The operator should look towards OS-level metrics such as packet loss, retransmits, etc, to conclusively diagnose network issues. Heartbeats are not very frequent (~seconds), so they may not capture rare or short-lived degradations.
| Round-trip time | HISTOGRAM | NANOSECONDS | AVG | NONE |
APPLICATION | rpc.connection.avg_round_trip_latency | Sum of exponentially weighted moving average of round-trip latencies, as measured through a gRPC RPC.
Dividing this Gauge by rpc.connection.healthy gives an approximation of average latency, but the top-level round-trip-latency histogram is more useful. Instead, users should consult the label families of this metric if they are available (which requires prometheus and the cluster setting 'server.child_metrics.enabled'); these provide per-peer moving averages.
This metric does not track failed connection. A failed connection's contribution is reset to zero.
| Latency | GAUGE | NANOSECONDS | AVG | NONE |
APPLICATION | rpc.connection.failures | Counter of failed connections.
This includes both the event in which a healthy connection terminates as well as unsuccessful reconnection attempts.
Connections that are terminated as part of local node shutdown are excluded. Decommissioned peers are excluded.
| Connections | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel
index bed3829a41cb..1001137b55a2 100644
--- a/pkg/ccl/changefeedccl/BUILD.bazel
+++ b/pkg/ccl/changefeedccl/BUILD.bazel
@@ -163,6 +163,7 @@ go_library(
"@com_github_klauspost_compress//zstd",
"@com_github_klauspost_pgzip//:pgzip",
"@com_github_linkedin_goavro_v2//:goavro",
+ "@com_github_rcrowley_go_metrics//:go-metrics",
"@com_github_xdg_go_scram//:scram",
"@com_google_cloud_go_pubsub//:pubsub",
"@com_google_cloud_go_pubsub//apiv1",
diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go
index 74aa8eca110b..10d941642bd0 100644
--- a/pkg/ccl/changefeedccl/metrics.go
+++ b/pkg/ccl/changefeedccl/metrics.go
@@ -19,13 +19,16 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/multitenant"
+ "github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
+ "github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/metric/aggmetric"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
+ "github.com/rcrowley/go-metrics"
)
const (
@@ -35,6 +38,7 @@ const (
changefeedIOQueueMaxLatency = 5 * time.Minute
admitLatencyMaxValue = 1 * time.Minute
commitLatencyMaxValue = 10 * time.Minute
+ kafkaThrottlingTimeMaxValue = 5 * time.Minute
)
// max length for the scope name.
@@ -76,6 +80,7 @@ type AggMetrics struct {
CheckpointProgress *aggmetric.AggGauge
LaggingRanges *aggmetric.AggGauge
CloudstorageBufferedBytes *aggmetric.AggGauge
+ KafkaThrottlingNanos *aggmetric.AggHistogram
// There is always at least 1 sliMetrics created for defaultSLI scope.
mu struct {
@@ -106,6 +111,7 @@ type metricsRecorder interface {
newParallelIOMetricsRecorder() parallelIOMetricsRecorder
recordSinkIOInflightChange(int64)
makeCloudstorageFileAllocCallback() func(delta int64)
+ getKafkaThrottlingMetrics(*cluster.Settings) metrics.Histogram
}
var _ metricsRecorder = (*sliMetrics)(nil)
@@ -145,6 +151,7 @@ type sliMetrics struct {
CheckpointProgress *aggmetric.Gauge
LaggingRanges *aggmetric.Gauge
CloudstorageBufferedBytes *aggmetric.Gauge
+ KafkaThrottlingNanos *aggmetric.Histogram
mu struct {
syncutil.Mutex
@@ -325,6 +332,80 @@ func (m *sliMetrics) recordSizeBasedFlush() {
m.SizeBasedFlushes.Inc(1)
}
+type kafkaHistogramAdapter struct {
+ settings *cluster.Settings
+ wrapped *aggmetric.Histogram
+}
+
+var _ metrics.Histogram = (*kafkaHistogramAdapter)(nil)
+
+func (k *kafkaHistogramAdapter) Update(valueInMs int64) {
+ if k != nil {
+ // valueInMs is passed in from sarama with a unit of milliseconds. To
+ // convert this value to nanoseconds, valueInMs * 10^6 is recorded here.
+ k.wrapped.RecordValue(valueInMs * 1000000)
+ }
+}
+
+func (k *kafkaHistogramAdapter) Clear() {
+ logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Sum on kafkaHistogramAdapter")
+}
+
+func (k *kafkaHistogramAdapter) Count() (_ int64) {
+ logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Count on kafkaHistogramAdapter")
+ return
+}
+
+func (k *kafkaHistogramAdapter) Max() (_ int64) {
+ logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Max on kafkaHistogramAdapter")
+ return
+}
+
+func (k *kafkaHistogramAdapter) Mean() (_ float64) {
+ logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Mean on kafkaHistogramAdapter")
+ return
+}
+
+func (k *kafkaHistogramAdapter) Min() (_ int64) {
+ logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Min on kafkaHistogramAdapter")
+ return
+}
+
+func (k *kafkaHistogramAdapter) Percentile(float64) (_ float64) {
+ logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Percentile on kafkaHistogramAdapter")
+ return
+}
+
+func (k *kafkaHistogramAdapter) Percentiles([]float64) (_ []float64) {
+ logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Percentiles on kafkaHistogramAdapter")
+ return
+}
+
+func (k *kafkaHistogramAdapter) Sample() (_ metrics.Sample) {
+ logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Sample on kafkaHistogramAdapter")
+ return
+}
+
+func (k *kafkaHistogramAdapter) Snapshot() (_ metrics.Histogram) {
+ logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Snapshot on kafkaHistogramAdapter")
+ return
+}
+
+func (k *kafkaHistogramAdapter) StdDev() (_ float64) {
+ logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to StdDev on kafkaHistogramAdapter")
+ return
+}
+
+func (k *kafkaHistogramAdapter) Sum() (_ int64) {
+ logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Sum on kafkaHistogramAdapter")
+ return
+}
+
+func (k *kafkaHistogramAdapter) Variance() (_ float64) {
+ logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Variance on kafkaHistogramAdapter")
+ return
+}
+
type parallelIOMetricsRecorder interface {
recordPendingQueuePush(numKeys int64)
recordPendingQueuePop(numKeys int64, latency time.Duration)
@@ -380,6 +461,16 @@ func (m *sliMetrics) newParallelIOMetricsRecorder() parallelIOMetricsRecorder {
}
}
+func (m *sliMetrics) getKafkaThrottlingMetrics(settings *cluster.Settings) metrics.Histogram {
+ if m == nil {
+ return (*kafkaHistogramAdapter)(nil)
+ }
+ return &kafkaHistogramAdapter{
+ settings: settings,
+ wrapped: m.KafkaThrottlingNanos,
+ }
+}
+
func (m *sliMetrics) recordSinkIOInflightChange(delta int64) {
if m == nil {
return
@@ -470,6 +561,12 @@ func (w *wrappingCostController) newParallelIOMetricsRecorder() parallelIOMetric
return w.inner.newParallelIOMetricsRecorder()
}
+func (w *wrappingCostController) getKafkaThrottlingMetrics(
+ settings *cluster.Settings,
+) metrics.Histogram {
+ return w.inner.getKafkaThrottlingMetrics(settings)
+}
+
var (
metaChangefeedForwardedResolvedMessages = metric.Metadata{
Name: "changefeed.forwarded_resolved_messages",
@@ -721,6 +818,12 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
Measurement: "Bytes",
Unit: metric.Unit_COUNT,
}
+ metaChangefeedKafkaThrottlingNanos := metric.Metadata{
+ Name: "changefeed.kafka_throttling_hist_nanos",
+ Help: "Time spent in throttling due to exceeding kafka quota",
+ Measurement: "Nanoseconds",
+ Unit: metric.Unit_NANOSECONDS,
+ }
functionalGaugeMinFn := func(childValues []int64) int64 {
var min int64
@@ -813,6 +916,13 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
CheckpointProgress: b.FunctionalGauge(metaCheckpointProgress, functionalGaugeMinFn),
LaggingRanges: b.Gauge(metaLaggingRangePercentage),
CloudstorageBufferedBytes: b.Gauge(metaCloudstorageBufferedBytes),
+ KafkaThrottlingNanos: b.Histogram(metric.HistogramOptions{
+ Metadata: metaChangefeedKafkaThrottlingNanos,
+ Duration: histogramWindow,
+ MaxVal: kafkaThrottlingTimeMaxValue.Nanoseconds(),
+ SigFigs: 2,
+ BucketConfig: metric.BatchProcessLatencyBuckets,
+ }),
}
a.mu.sliMetrics = make(map[string]*sliMetrics)
_, err := a.getOrCreateScope(defaultSLIScope)
@@ -878,6 +988,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
SchemaRegistrations: a.SchemaRegistrations.AddChild(scope),
LaggingRanges: a.LaggingRanges.AddChild(scope),
CloudstorageBufferedBytes: a.CloudstorageBufferedBytes.AddChild(scope),
+ KafkaThrottlingNanos: a.KafkaThrottlingNanos.AddChild(scope),
}
sm.mu.resolved = make(map[int64]hlc.Timestamp)
sm.mu.checkpoint = make(map[int64]hlc.Timestamp)
diff --git a/pkg/ccl/changefeedccl/sink_kafka.go b/pkg/ccl/changefeedccl/sink_kafka.go
index 699fa7c73867..ce1eadb14a55 100644
--- a/pkg/ccl/changefeedccl/sink_kafka.go
+++ b/pkg/ccl/changefeedccl/sink_kafka.go
@@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
+ "github.com/rcrowley/go-metrics"
"golang.org/x/oauth2"
"golang.org/x/oauth2/clientcredentials"
)
@@ -1078,7 +1079,10 @@ func buildConfluentKafkaConfig(u sinkURL) (kafkaDialConfig, error) {
}
func buildKafkaConfig(
- ctx context.Context, u sinkURL, jsonStr changefeedbase.SinkSpecificJSONConfig,
+ ctx context.Context,
+ u sinkURL,
+ jsonStr changefeedbase.SinkSpecificJSONConfig,
+ kafkaThrottlingMetrics metrics.Histogram,
) (*sarama.Config, error) {
dialConfig, err := buildDialConfig(u)
if err != nil {
@@ -1090,6 +1094,7 @@ func buildKafkaConfig(
config.Producer.Partitioner = newChangefeedPartitioner
// Do not fetch metadata for all topics but just for the necessary ones.
config.Metadata.Full = false
+ config.MetricRegistry = newMetricsRegistryInterceptor(kafkaThrottlingMetrics)
if dialConfig.tlsEnabled {
config.Net.TLS.Enable = true
@@ -1176,7 +1181,8 @@ func makeKafkaSink(
return nil, errors.Errorf(`%s is not yet supported`, changefeedbase.SinkParamSchemaTopic)
}
- config, err := buildKafkaConfig(ctx, u, jsonStr)
+ m := mb(requiresResourceAccounting)
+ config, err := buildKafkaConfig(ctx, u, jsonStr, m.getKafkaThrottlingMetrics(settings))
if err != nil {
return nil, err
}
@@ -1195,7 +1201,7 @@ func makeKafkaSink(
ctx: ctx,
kafkaCfg: config,
bootstrapAddrs: u.Host,
- metrics: mb(requiresResourceAccounting),
+ metrics: m,
topics: topics,
disableInternalRetry: !internalRetryEnabled,
}
@@ -1234,3 +1240,26 @@ func (s *kafkaStats) String() string {
atomic.LoadInt64(&s.largestMessageSize),
)
}
+
+type metricsRegistryInterceptor struct {
+ metrics.Registry
+ kafkaThrottlingNanos metrics.Histogram
+}
+
+var _ metrics.Registry = (*metricsRegistryInterceptor)(nil)
+
+func newMetricsRegistryInterceptor(kafkaMetrics metrics.Histogram) *metricsRegistryInterceptor {
+ return &metricsRegistryInterceptor{
+ Registry: metrics.NewRegistry(),
+ kafkaThrottlingNanos: kafkaMetrics,
+ }
+}
+
+func (mri *metricsRegistryInterceptor) GetOrRegister(name string, i interface{}) interface{} {
+ const throttleTimeMsMetricsPrefix = "throttle-time-in-ms"
+ if strings.HasPrefix(name, throttleTimeMsMetricsPrefix) {
+ return mri.kafkaThrottlingNanos
+ } else {
+ return mri.Registry.GetOrRegister(name, i)
+ }
+}
diff --git a/pkg/ccl/changefeedccl/telemetry.go b/pkg/ccl/changefeedccl/telemetry.go
index 8ca6badcbad5..b4fd9c566f7e 100644
--- a/pkg/ccl/changefeedccl/telemetry.go
+++ b/pkg/ccl/changefeedccl/telemetry.go
@@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
+ "github.com/rcrowley/go-metrics"
)
type sinkTelemetryData struct {
@@ -216,6 +217,12 @@ func (r *telemetryMetricsRecorder) newParallelIOMetricsRecorder() parallelIOMetr
return r.inner.newParallelIOMetricsRecorder()
}
+func (r *telemetryMetricsRecorder) getKafkaThrottlingMetrics(
+ settings *cluster.Settings,
+) metrics.Histogram {
+ return r.inner.getKafkaThrottlingMetrics(settings)
+}
+
// continuousTelemetryInterval determines the interval at which each node emits telemetry events
// during the lifespan of each enterprise changefeed.
var continuousTelemetryInterval = settings.RegisterDurationSetting(
diff --git a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
index 885d6b886797..33319d015a83 100644
--- a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
+++ b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go
@@ -940,7 +940,7 @@ func TestSecondaryTenantFollowerReadsRouting(t *testing.T) {
defer log.Scope(t).Close(t)
defer utilccl.TestingEnableEnterprise()()
- skip.UnderStressRace(t, "times out")
+ skip.UnderRace(t, "times out")
skip.UnderDeadlock(t)
for _, testCase := range []struct {
diff --git a/pkg/jobs/registry_external_test.go b/pkg/jobs/registry_external_test.go
index f7c69622950e..726949fa5d99 100644
--- a/pkg/jobs/registry_external_test.go
+++ b/pkg/jobs/registry_external_test.go
@@ -797,7 +797,7 @@ func TestWaitWithRetryableError(t *testing.T) {
registry.WaitForJobs(
ctx, []jobspb.JobID{id},
))
- if !skip.Stress() {
+ if !skip.Duress() {
require.Equalf(t, int64(targetNumberOfRetries), numberOfTimesDetected.Load(), "jobs query did not retry")
} else {
// For stress be lenient since we are relying on timing for leasing
diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go
index 3473fbdd2fea..3c804b3bf9f5 100644
--- a/pkg/kv/kvclient/kvcoord/dist_sender.go
+++ b/pkg/kv/kvclient/kvcoord/dist_sender.go
@@ -34,6 +34,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util"
+ "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb"
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
@@ -159,7 +160,18 @@ var (
}
metaDistSenderSlowRPCs = metric.Metadata{
Name: "requests.slow.distsender",
- Help: `Number of replica-bound RPCs currently stuck or retrying for a long time.
+ Help: `Number of range-bound RPCs currently stuck or retrying for a long time.
+
+Note that this is not a good signal for KV health. The remote side of the
+RPCs tracked here may experience contention, so an end user can easily
+cause values for this metric to be emitted by leaving a transaction open
+for a long time and contending with it using a second transaction.`,
+ Measurement: "Requests",
+ Unit: metric.Unit_COUNT,
+ }
+ metaDistSenderSlowReplicaRPCs = metric.Metadata{
+ Name: "distsender.slow.replicarpcs",
+ Help: `Number of slow replica-bound RPCs.
Note that this is not a good signal for KV health. The remote side of the
RPCs tracked here may experience contention, so an end user can easily
@@ -310,6 +322,7 @@ type DistSenderMetrics struct {
InLeaseTransferBackoffs *metric.Counter
RangeLookups *metric.Counter
SlowRPCs *metric.Gauge
+ SlowReplicaRPCs *metric.Counter
MethodCounts [kvpb.NumMethods]*metric.Counter
ErrCounts [kvpb.NumErrors]*metric.Counter
DistSenderRangeFeedMetrics
@@ -342,6 +355,7 @@ func makeDistSenderMetrics() DistSenderMetrics {
InLeaseTransferBackoffs: metric.NewCounter(metaDistSenderInLeaseTransferBackoffsCount),
RangeLookups: metric.NewCounter(metaDistSenderRangeLookups),
SlowRPCs: metric.NewGauge(metaDistSenderSlowRPCs),
+ SlowReplicaRPCs: metric.NewCounter(metaDistSenderSlowReplicaRPCs),
DistSenderRangeFeedMetrics: makeDistSenderRangeFeedMetrics(),
}
for i := range m.MethodCounts {
@@ -1814,6 +1828,22 @@ func slowRangeRPCReturnWarningStr(s *redact.StringBuilder, dur time.Duration, at
s.Printf("slow RPC finished after %.2fs (%d attempts)", dur.Seconds(), attempts)
}
+func slowReplicaRPCWarningStr(
+ s *redact.StringBuilder,
+ ba *kvpb.BatchRequest,
+ dur time.Duration,
+ attempts int64,
+ err error,
+ br *kvpb.BatchResponse,
+) {
+ resp := interface{}(err)
+ if resp == nil {
+ resp = br
+ }
+ s.Printf("have been waiting %.2fs (%d attempts) for RPC %s to replica %s; resp: %s",
+ dur.Seconds(), attempts, ba, ba.Replica, resp)
+}
+
// sendPartialBatch sends the supplied batch to the range specified by the
// routing token.
//
@@ -1907,8 +1937,7 @@ func (ds *DistSender) sendPartialBatch(
prevTok = routingTok
reply, err = ds.sendToReplicas(ctx, ba, routingTok, withCommit)
- const slowDistSenderThreshold = time.Minute
- if dur := timeutil.Since(tBegin); dur > slowDistSenderThreshold && !tBegin.IsZero() {
+ if dur := timeutil.Since(tBegin); dur > slowDistSenderRangeThreshold && !tBegin.IsZero() {
{
var s redact.StringBuilder
slowRangeRPCWarningStr(&s, ba, dur, attempts, routingTok.Desc(), err, reply)
@@ -2189,6 +2218,15 @@ func noMoreReplicasErr(ambiguousErr, lastAttemptErr error) error {
return newSendError(errors.Wrap(lastAttemptErr, "sending to all replicas failed; last error"))
}
+// slowDistSenderRangeThreshold is a latency threshold for logging slow
+// requests to a range, potentially involving RPCs to multiple replicas
+// of the range.
+const slowDistSenderRangeThreshold = time.Minute
+
+// slowDistSenderReplicaThreshold is a latency threshold for logging a slow RPC
+// to a single replica.
+const slowDistSenderReplicaThreshold = 10 * time.Second
+
// defaultSendClosedTimestampPolicy is used when the closed timestamp policy
// is not known by the range cache. This choice prevents sending batch requests
// to only voters when a perfectly good non-voter may exist in the local
@@ -2325,7 +2363,8 @@ func (ds *DistSender) sendToReplicas(
// per-replica state and may succeed on other replicas.
var ambiguousError error
var br *kvpb.BatchResponse
- for first := true; ; first = false {
+ attempts := int64(0)
+ for first := true; ; first, attempts = false, attempts+1 {
if !first {
ds.metrics.NextReplicaErrCount.Inc(1)
}
@@ -2411,7 +2450,20 @@ func (ds *DistSender) sendToReplicas(
comparisonResult := ds.getLocalityComparison(ctx, ds.nodeIDGetter(), ba.Replica.NodeID)
ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchRequest(comparisonResult, int64(ba.Size()))
+ tBegin := timeutil.Now() // for slow log message
br, err = transport.SendNext(ctx, ba)
+ if dur := timeutil.Since(tBegin); dur > slowDistSenderReplicaThreshold {
+ var s redact.StringBuilder
+ slowReplicaRPCWarningStr(&s, ba, dur, attempts, err, br)
+ if admissionpb.WorkPriority(ba.AdmissionHeader.Priority) >= admissionpb.NormalPri {
+ // Note that these RPC may or may not have succeeded. Errors are counted separately below.
+ ds.metrics.SlowReplicaRPCs.Inc(1)
+ log.Warningf(ctx, "slow replica RPC: %v", &s)
+ } else {
+ log.Eventf(ctx, "slow replica RPC: %v", &s)
+ }
+ }
+
ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchResponse(comparisonResult, int64(br.Size()))
ds.maybeIncrementErrCounters(br, err)
diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go
index 68b5fa67f476..0a61afbee02a 100644
--- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go
+++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go
@@ -4546,6 +4546,11 @@ func TestDistSenderSlowLogMessage(t *testing.T) {
get.KeyLockingStrength = lock.Shared
get.KeyLockingDurability = lock.Unreplicated
ba.Add(get)
+ ba.Replica = roachpb.ReplicaDescriptor{
+ ReplicaID: 1,
+ NodeID: 2,
+ StoreID: 3,
+ }
br := &kvpb.BatchResponse{}
br.Error = kvpb.NewError(errors.New("boom"))
desc := &roachpb.RangeDescriptor{RangeID: 9, StartKey: roachpb.RKey("x"), EndKey: roachpb.RKey("z")}
@@ -4557,7 +4562,6 @@ func TestDistSenderSlowLogMessage(t *testing.T) {
act := s.RedactableString()
require.EqualValues(t, exp, act)
}
-
{
exp := `slow RPC finished after 8.16s (120 attempts)`
var s redact.StringBuilder
@@ -4565,6 +4569,14 @@ func TestDistSenderSlowLogMessage(t *testing.T) {
act := s.RedactableString()
require.EqualValues(t, exp, act)
}
+ {
+ exp := `have been waiting 8.16s (120 attempts) for RPC Get(Shared,Unreplicated) [‹"a"›] to` +
+ ` replica (n2,s3):1; resp: ‹(err: boom)›`
+ var s redact.StringBuilder
+ slowReplicaRPCWarningStr(&s, ba, dur, attempts, nil /* err */, br)
+ act := s.RedactableString()
+ require.EqualValues(t, exp, act)
+ }
}
// TestErrorIndexOnRangeSplit verifies that in case divideAndSendBatchToRanges
diff --git a/pkg/server/application_api/sql_stats_test.go b/pkg/server/application_api/sql_stats_test.go
index 83c53e6e5bf2..18294b71dc64 100644
--- a/pkg/server/application_api/sql_stats_test.go
+++ b/pkg/server/application_api/sql_stats_test.go
@@ -47,9 +47,9 @@ import (
"github.com/stretchr/testify/require"
)
-// additionalTimeoutUnderStress is the additional timeout to use for the http
+// additionalTimeoutUnderDuress is the additional timeout to use for the http
// client if under stress.
-const additionalTimeoutUnderStress = 30 * time.Second
+const additionalTimeoutUnderDuress = 30 * time.Second
func TestStatusAPICombinedTransactions(t *testing.T) {
defer leaktest.AfterTest(t)()
@@ -57,8 +57,8 @@ func TestStatusAPICombinedTransactions(t *testing.T) {
// Increase the timeout for the http client if under stress.
additionalTimeout := 0 * time.Second
- if skip.Stress() {
- additionalTimeout = additionalTimeoutUnderStress
+ if skip.Duress() {
+ additionalTimeout = additionalTimeoutUnderDuress
}
var params base.TestServerArgs
@@ -393,8 +393,8 @@ func TestStatusAPIStatements(t *testing.T) {
// Increase the timeout for the http client if under stress.
additionalTimeout := 0 * time.Second
- if skip.Stress() {
- additionalTimeout = additionalTimeoutUnderStress
+ if skip.Duress() {
+ additionalTimeout = additionalTimeoutUnderDuress
}
// Aug 30 2021 19:50:00 GMT+0000
@@ -520,8 +520,8 @@ func TestStatusAPICombinedStatementsTotalLatency(t *testing.T) {
skip.UnderRace(t, "test is too slow to run under race")
// Increase the timeout for the http client if under stress.
additionalTimeout := 0 * time.Second
- if skip.Stress() {
- additionalTimeout = additionalTimeoutUnderStress
+ if skip.Duress() {
+ additionalTimeout = additionalTimeoutUnderDuress
}
sqlStatsKnobs := sqlstats.CreateTestingKnobs()
@@ -676,8 +676,8 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) {
// Increase the timeout for the http client if under stress.
additionalTimeout := 0 * time.Second
- if skip.Stress() {
- additionalTimeout = additionalTimeoutUnderStress
+ if skip.Duress() {
+ additionalTimeout = additionalTimeoutUnderDuress
}
skip.UnderStressRace(t, "test is too slow to run under stressrace")
diff --git a/pkg/server/application_api/stmtdiag_test.go b/pkg/server/application_api/stmtdiag_test.go
index 283fc19b985c..1e31dcd3791c 100644
--- a/pkg/server/application_api/stmtdiag_test.go
+++ b/pkg/server/application_api/stmtdiag_test.go
@@ -126,7 +126,7 @@ func TestCreateStatementDiagnosticsReportWithViewActivityOptions(t *testing.T) {
require.Contains(t, err.Error(), "requesting statement bundle requires VIEWACTIVITY or ADMIN role option")
// Grant VIEWACTIVITY and all test should work.
- db.Exec(t, fmt.Sprintf("ALTER USER %s VIEWACTIVITY", apiconstants.TestingUserNameNoAdmin().Normalized()))
+ db.Exec(t, fmt.Sprintf("GRANT SYSTEM VIEWACTIVITY TO %s", apiconstants.TestingUserNameNoAdmin().Normalized()))
req := &serverpb.CreateStatementDiagnosticsReportRequest{
StatementFingerprint: "INSERT INTO test VALUES (_)",
}
@@ -159,7 +159,7 @@ func TestCreateStatementDiagnosticsReportWithViewActivityOptions(t *testing.T) {
`, [][]string{{"1"}})
// Grant VIEWACTIVITYREDACTED and all test should get permission errors.
- db.Exec(t, fmt.Sprintf("ALTER USER %s VIEWACTIVITYREDACTED", apiconstants.TestingUserNameNoAdmin().Normalized()))
+ db.Exec(t, fmt.Sprintf("GRANT SYSTEM VIEWACTIVITYREDACTED TO %s", apiconstants.TestingUserNameNoAdmin().Normalized()))
if err := srvtestutils.PostStatusJSONProtoWithAdminOption(ts, "stmtdiagreports", req, &resp, false); err != nil {
if !testutils.IsError(err, "status: 403") {
diff --git a/pkg/sql/crdb_internal_test.go b/pkg/sql/crdb_internal_test.go
index d2d2edc99b11..3e7acf154d3d 100644
--- a/pkg/sql/crdb_internal_test.go
+++ b/pkg/sql/crdb_internal_test.go
@@ -1004,22 +1004,35 @@ func causeContention(
insertValue)
require.NoError(t, errTxn)
wgTxnStarted.Done()
+ // Wait for the update to show up in cluster_queries.
+ testutils.SucceedsSoon(t, func() error {
+ row := tx.QueryRowContext(
+ ctx, "SELECT EXISTS (SELECT * FROM crdb_internal.cluster_queries WHERE query LIKE '%/* shuba */')",
+ )
+ var seen bool
+ if err := row.Scan(&seen); err != nil {
+ return err
+ }
+ if !seen {
+ return errors.Errorf("did not see update statement")
+ }
+ return nil
+ })
_, errTxn = tx.ExecContext(ctx, "select pg_sleep(.5);")
require.NoError(t, errTxn)
errTxn = tx.Commit()
require.NoError(t, errTxn)
}()
- start := timeutil.Now()
-
// Need to wait for the txn to start to ensure lock contention.
wgTxnStarted.Wait()
- // This will be blocked until the updateRowWithDelay finishes.
+ // This will be blocked until the insert txn finishes.
+ start := timeutil.Now()
_, errUpdate := conn.ExecContext(
- ctx, fmt.Sprintf("UPDATE %s SET s = $1 where id = 'test';", table), updateValue)
+ ctx, fmt.Sprintf("UPDATE %s SET s = $1 where id = 'test' /* shuba */;", table), updateValue)
require.NoError(t, errUpdate)
end := timeutil.Now()
- require.GreaterOrEqual(t, end.Sub(start), 499*time.Millisecond)
+ require.GreaterOrEqual(t, end.Sub(start), 500*time.Millisecond)
wgTxnDone.Wait()
}
diff --git a/pkg/sql/sem/builtins/BUILD.bazel b/pkg/sql/sem/builtins/BUILD.bazel
index f5fa1854d4fd..cc35f133e76f 100644
--- a/pkg/sql/sem/builtins/BUILD.bazel
+++ b/pkg/sql/sem/builtins/BUILD.bazel
@@ -74,7 +74,6 @@ go_library(
"//pkg/sql/pgwire/pgnotice",
"//pkg/sql/privilege",
"//pkg/sql/protoreflect",
- "//pkg/sql/roleoption",
"//pkg/sql/rowenc",
"//pkg/sql/rowenc/keyside",
"//pkg/sql/rowenc/valueside",
diff --git a/pkg/sql/sem/builtins/builtins.go b/pkg/sql/sem/builtins/builtins.go
index c644080c790c..fbf55ae57f1c 100644
--- a/pkg/sql/sem/builtins/builtins.go
+++ b/pkg/sql/sem/builtins/builtins.go
@@ -58,7 +58,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice"
"github.com/cockroachdb/cockroach/pkg/sql/privilege"
"github.com/cockroachdb/cockroach/pkg/sql/protoreflect"
- "github.com/cockroachdb/cockroach/pkg/sql/roleoption"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc/keyside"
"github.com/cockroachdb/cockroach/pkg/sql/sem/asof"
@@ -11521,8 +11520,8 @@ true, then any plan other then the specified gist will be used`
Types: typs,
ReturnType: tree.FixedReturnType(types.Bool),
Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) {
- hasViewActivity, err := evalCtx.SessionAccessor.HasRoleOption(
- ctx, roleoption.VIEWACTIVITY)
+ hasViewActivity, err := evalCtx.SessionAccessor.HasGlobalPrivilegeOrRoleOption(
+ ctx, privilege.VIEWACTIVITY)
if err != nil {
return nil, err
}
@@ -11537,8 +11536,8 @@ true, then any plan other then the specified gist will be used`
return nil, err
}
- hasViewActivityRedacted, err := evalCtx.SessionAccessor.HasRoleOption(
- ctx, roleoption.VIEWACTIVITYREDACTED)
+ hasViewActivityRedacted, err := evalCtx.SessionAccessor.HasGlobalPrivilegeOrRoleOption(
+ ctx, privilege.VIEWACTIVITYREDACTED)
if err != nil {
return nil, err
}