From 199a58678465d309a3b2be165b17f3baf164c0bd Mon Sep 17 00:00:00 2001 From: Michael Erickson Date: Fri, 26 Jan 2024 14:07:50 -0800 Subject: [PATCH 1/8] sql: fix flake in TestTxnContentionEventsTable In causeContention we deliberately hold a transaction open using pg_sleep to block an update statement. The timing we're trying to achieve is: 1. transaction insert 2. update starts and blocks 3. transaction held open using pg_sleep We were using a WaitGroup to order (2) after (1), but there was no synchronization to ensure (3) came after (2). This commit adds a retry loop that checks `crdb_internal.cluster_queries` to ensure (3) comes after (2). Fixes: #118236 Release note: None --- pkg/sql/crdb_internal_test.go | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/pkg/sql/crdb_internal_test.go b/pkg/sql/crdb_internal_test.go index d2d2edc99b11..3e7acf154d3d 100644 --- a/pkg/sql/crdb_internal_test.go +++ b/pkg/sql/crdb_internal_test.go @@ -1004,22 +1004,35 @@ func causeContention( insertValue) require.NoError(t, errTxn) wgTxnStarted.Done() + // Wait for the update to show up in cluster_queries. + testutils.SucceedsSoon(t, func() error { + row := tx.QueryRowContext( + ctx, "SELECT EXISTS (SELECT * FROM crdb_internal.cluster_queries WHERE query LIKE '%/* shuba */')", + ) + var seen bool + if err := row.Scan(&seen); err != nil { + return err + } + if !seen { + return errors.Errorf("did not see update statement") + } + return nil + }) _, errTxn = tx.ExecContext(ctx, "select pg_sleep(.5);") require.NoError(t, errTxn) errTxn = tx.Commit() require.NoError(t, errTxn) }() - start := timeutil.Now() - // Need to wait for the txn to start to ensure lock contention. wgTxnStarted.Wait() - // This will be blocked until the updateRowWithDelay finishes. + // This will be blocked until the insert txn finishes. + start := timeutil.Now() _, errUpdate := conn.ExecContext( - ctx, fmt.Sprintf("UPDATE %s SET s = $1 where id = 'test';", table), updateValue) + ctx, fmt.Sprintf("UPDATE %s SET s = $1 where id = 'test' /* shuba */;", table), updateValue) require.NoError(t, errUpdate) end := timeutil.Now() - require.GreaterOrEqual(t, end.Sub(start), 499*time.Millisecond) + require.GreaterOrEqual(t, end.Sub(start), 500*time.Millisecond) wgTxnDone.Wait() } From 88080b18082dc66d9079636b3bddc723a3fd9ef9 Mon Sep 17 00:00:00 2001 From: Rail Aliiev Date: Mon, 5 Feb 2024 12:09:57 -0500 Subject: [PATCH 2/8] release: confirm yum install This adds `-y` flag to install `yum` without user prompt. Epic: none Release note: None --- build/deploy-redhat/Dockerfile.in | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build/deploy-redhat/Dockerfile.in b/build/deploy-redhat/Dockerfile.in index a87142a11f6f..edbb19888dfe 100644 --- a/build/deploy-redhat/Dockerfile.in +++ b/build/deploy-redhat/Dockerfile.in @@ -1,6 +1,6 @@ FROM @repository@:@tag@ -RUN microdnf install yum && \ +RUN microdnf install -y yum && \ yum -v -y update --all && \ microdnf clean all && \ rm -rf /var/cache/yum From a5bcfca8b06416cee4d5ba2171a5df32de3efb0b Mon Sep 17 00:00:00 2001 From: Wenyi Hu Date: Thu, 11 Jan 2024 14:59:49 -0500 Subject: [PATCH 3/8] changefeedccl: add observability metrics into sarama code Now that this patch (#117544) has been merged, sarama now acknowledges and reacts to kafka server's throttling messages by slowing down. To provide better observability into sarama code, this patch adds a metrics registry interceptor and a new metrics `changefeed.kafka_throttling_hist_nanos` which tracks time (in nanos) spent in sarama's throttling when cockroachdb exceed the kafka quota. Fixes: https://github.com/cockroachdb/cockroach/issues/117618 Release note: changefeed.kafka_throttling_hist_nanos has now been added to metrics to monitor sarama throttling behavior resulting from exceeding kafka quota. --- docs/generated/metrics/metrics.html | 1 + pkg/ccl/changefeedccl/BUILD.bazel | 1 + pkg/ccl/changefeedccl/metrics.go | 111 ++++++++++++++++++++++++++++ pkg/ccl/changefeedccl/sink_kafka.go | 35 ++++++++- pkg/ccl/changefeedccl/telemetry.go | 7 ++ 5 files changed, 152 insertions(+), 3 deletions(-) diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html index e986a54fd8c8..787b1c6f32c8 100644 --- a/docs/generated/metrics/metrics.html +++ b/docs/generated/metrics/metrics.html @@ -762,6 +762,7 @@ APPLICATIONchangefeed.forwarded_resolved_messagesResolved timestamps forwarded from the change aggregator to the change frontierMessagesCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE APPLICATIONchangefeed.frontier_updatesNumber of change frontier updates across all feedsUpdatesCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE APPLICATIONchangefeed.internal_retry_message_countNumber of messages for which an attempt to retry them within an aggregator node was madeMessagesGAUGECOUNTAVGNONE +APPLICATIONchangefeed.kafka_throttling_hist_nanosTime spent in throttling due to exceeding kafka quotaNanosecondsHISTOGRAMNANOSECONDSAVGNONE APPLICATIONchangefeed.lagging_rangesThe number of ranges considered to be lagging behindRangesGAUGECOUNTAVGNONE APPLICATIONchangefeed.max_behind_nanos(Deprecated in favor of checkpoint_progress) The most any changefeed's persisted checkpoint is behind the presentNanosecondsGAUGENANOSECONDSAVGNONE APPLICATIONchangefeed.message_size_histMessage size histogramBytesHISTOGRAMBYTESAVGNONE diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 2a2f73852a92..73afd7a04578 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -163,6 +163,7 @@ go_library( "@com_github_klauspost_compress//zstd", "@com_github_klauspost_pgzip//:pgzip", "@com_github_linkedin_goavro_v2//:goavro", + "@com_github_rcrowley_go_metrics//:go-metrics", "@com_github_xdg_go_scram//:scram", "@com_google_cloud_go_pubsub//:pubsub", "@com_google_cloud_go_pubsub//apiv1", diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go index 74aa8eca110b..10d941642bd0 100644 --- a/pkg/ccl/changefeedccl/metrics.go +++ b/pkg/ccl/changefeedccl/metrics.go @@ -19,13 +19,16 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed" "github.com/cockroachdb/cockroach/pkg/jobs" "github.com/cockroachdb/cockroach/pkg/multitenant" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/log/logcrash" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/metric/aggmetric" "github.com/cockroachdb/cockroach/pkg/util/syncutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/rcrowley/go-metrics" ) const ( @@ -35,6 +38,7 @@ const ( changefeedIOQueueMaxLatency = 5 * time.Minute admitLatencyMaxValue = 1 * time.Minute commitLatencyMaxValue = 10 * time.Minute + kafkaThrottlingTimeMaxValue = 5 * time.Minute ) // max length for the scope name. @@ -76,6 +80,7 @@ type AggMetrics struct { CheckpointProgress *aggmetric.AggGauge LaggingRanges *aggmetric.AggGauge CloudstorageBufferedBytes *aggmetric.AggGauge + KafkaThrottlingNanos *aggmetric.AggHistogram // There is always at least 1 sliMetrics created for defaultSLI scope. mu struct { @@ -106,6 +111,7 @@ type metricsRecorder interface { newParallelIOMetricsRecorder() parallelIOMetricsRecorder recordSinkIOInflightChange(int64) makeCloudstorageFileAllocCallback() func(delta int64) + getKafkaThrottlingMetrics(*cluster.Settings) metrics.Histogram } var _ metricsRecorder = (*sliMetrics)(nil) @@ -145,6 +151,7 @@ type sliMetrics struct { CheckpointProgress *aggmetric.Gauge LaggingRanges *aggmetric.Gauge CloudstorageBufferedBytes *aggmetric.Gauge + KafkaThrottlingNanos *aggmetric.Histogram mu struct { syncutil.Mutex @@ -325,6 +332,80 @@ func (m *sliMetrics) recordSizeBasedFlush() { m.SizeBasedFlushes.Inc(1) } +type kafkaHistogramAdapter struct { + settings *cluster.Settings + wrapped *aggmetric.Histogram +} + +var _ metrics.Histogram = (*kafkaHistogramAdapter)(nil) + +func (k *kafkaHistogramAdapter) Update(valueInMs int64) { + if k != nil { + // valueInMs is passed in from sarama with a unit of milliseconds. To + // convert this value to nanoseconds, valueInMs * 10^6 is recorded here. + k.wrapped.RecordValue(valueInMs * 1000000) + } +} + +func (k *kafkaHistogramAdapter) Clear() { + logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Sum on kafkaHistogramAdapter") +} + +func (k *kafkaHistogramAdapter) Count() (_ int64) { + logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Count on kafkaHistogramAdapter") + return +} + +func (k *kafkaHistogramAdapter) Max() (_ int64) { + logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Max on kafkaHistogramAdapter") + return +} + +func (k *kafkaHistogramAdapter) Mean() (_ float64) { + logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Mean on kafkaHistogramAdapter") + return +} + +func (k *kafkaHistogramAdapter) Min() (_ int64) { + logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Min on kafkaHistogramAdapter") + return +} + +func (k *kafkaHistogramAdapter) Percentile(float64) (_ float64) { + logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Percentile on kafkaHistogramAdapter") + return +} + +func (k *kafkaHistogramAdapter) Percentiles([]float64) (_ []float64) { + logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Percentiles on kafkaHistogramAdapter") + return +} + +func (k *kafkaHistogramAdapter) Sample() (_ metrics.Sample) { + logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Sample on kafkaHistogramAdapter") + return +} + +func (k *kafkaHistogramAdapter) Snapshot() (_ metrics.Histogram) { + logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Snapshot on kafkaHistogramAdapter") + return +} + +func (k *kafkaHistogramAdapter) StdDev() (_ float64) { + logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to StdDev on kafkaHistogramAdapter") + return +} + +func (k *kafkaHistogramAdapter) Sum() (_ int64) { + logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Sum on kafkaHistogramAdapter") + return +} + +func (k *kafkaHistogramAdapter) Variance() (_ float64) { + logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Variance on kafkaHistogramAdapter") + return +} + type parallelIOMetricsRecorder interface { recordPendingQueuePush(numKeys int64) recordPendingQueuePop(numKeys int64, latency time.Duration) @@ -380,6 +461,16 @@ func (m *sliMetrics) newParallelIOMetricsRecorder() parallelIOMetricsRecorder { } } +func (m *sliMetrics) getKafkaThrottlingMetrics(settings *cluster.Settings) metrics.Histogram { + if m == nil { + return (*kafkaHistogramAdapter)(nil) + } + return &kafkaHistogramAdapter{ + settings: settings, + wrapped: m.KafkaThrottlingNanos, + } +} + func (m *sliMetrics) recordSinkIOInflightChange(delta int64) { if m == nil { return @@ -470,6 +561,12 @@ func (w *wrappingCostController) newParallelIOMetricsRecorder() parallelIOMetric return w.inner.newParallelIOMetricsRecorder() } +func (w *wrappingCostController) getKafkaThrottlingMetrics( + settings *cluster.Settings, +) metrics.Histogram { + return w.inner.getKafkaThrottlingMetrics(settings) +} + var ( metaChangefeedForwardedResolvedMessages = metric.Metadata{ Name: "changefeed.forwarded_resolved_messages", @@ -721,6 +818,12 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics { Measurement: "Bytes", Unit: metric.Unit_COUNT, } + metaChangefeedKafkaThrottlingNanos := metric.Metadata{ + Name: "changefeed.kafka_throttling_hist_nanos", + Help: "Time spent in throttling due to exceeding kafka quota", + Measurement: "Nanoseconds", + Unit: metric.Unit_NANOSECONDS, + } functionalGaugeMinFn := func(childValues []int64) int64 { var min int64 @@ -813,6 +916,13 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics { CheckpointProgress: b.FunctionalGauge(metaCheckpointProgress, functionalGaugeMinFn), LaggingRanges: b.Gauge(metaLaggingRangePercentage), CloudstorageBufferedBytes: b.Gauge(metaCloudstorageBufferedBytes), + KafkaThrottlingNanos: b.Histogram(metric.HistogramOptions{ + Metadata: metaChangefeedKafkaThrottlingNanos, + Duration: histogramWindow, + MaxVal: kafkaThrottlingTimeMaxValue.Nanoseconds(), + SigFigs: 2, + BucketConfig: metric.BatchProcessLatencyBuckets, + }), } a.mu.sliMetrics = make(map[string]*sliMetrics) _, err := a.getOrCreateScope(defaultSLIScope) @@ -878,6 +988,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) { SchemaRegistrations: a.SchemaRegistrations.AddChild(scope), LaggingRanges: a.LaggingRanges.AddChild(scope), CloudstorageBufferedBytes: a.CloudstorageBufferedBytes.AddChild(scope), + KafkaThrottlingNanos: a.KafkaThrottlingNanos.AddChild(scope), } sm.mu.resolved = make(map[int64]hlc.Timestamp) sm.mu.checkpoint = make(map[int64]hlc.Timestamp) diff --git a/pkg/ccl/changefeedccl/sink_kafka.go b/pkg/ccl/changefeedccl/sink_kafka.go index 699fa7c73867..ce1eadb14a55 100644 --- a/pkg/ccl/changefeedccl/sink_kafka.go +++ b/pkg/ccl/changefeedccl/sink_kafka.go @@ -36,6 +36,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" "github.com/cockroachdb/logtags" + "github.com/rcrowley/go-metrics" "golang.org/x/oauth2" "golang.org/x/oauth2/clientcredentials" ) @@ -1078,7 +1079,10 @@ func buildConfluentKafkaConfig(u sinkURL) (kafkaDialConfig, error) { } func buildKafkaConfig( - ctx context.Context, u sinkURL, jsonStr changefeedbase.SinkSpecificJSONConfig, + ctx context.Context, + u sinkURL, + jsonStr changefeedbase.SinkSpecificJSONConfig, + kafkaThrottlingMetrics metrics.Histogram, ) (*sarama.Config, error) { dialConfig, err := buildDialConfig(u) if err != nil { @@ -1090,6 +1094,7 @@ func buildKafkaConfig( config.Producer.Partitioner = newChangefeedPartitioner // Do not fetch metadata for all topics but just for the necessary ones. config.Metadata.Full = false + config.MetricRegistry = newMetricsRegistryInterceptor(kafkaThrottlingMetrics) if dialConfig.tlsEnabled { config.Net.TLS.Enable = true @@ -1176,7 +1181,8 @@ func makeKafkaSink( return nil, errors.Errorf(`%s is not yet supported`, changefeedbase.SinkParamSchemaTopic) } - config, err := buildKafkaConfig(ctx, u, jsonStr) + m := mb(requiresResourceAccounting) + config, err := buildKafkaConfig(ctx, u, jsonStr, m.getKafkaThrottlingMetrics(settings)) if err != nil { return nil, err } @@ -1195,7 +1201,7 @@ func makeKafkaSink( ctx: ctx, kafkaCfg: config, bootstrapAddrs: u.Host, - metrics: mb(requiresResourceAccounting), + metrics: m, topics: topics, disableInternalRetry: !internalRetryEnabled, } @@ -1234,3 +1240,26 @@ func (s *kafkaStats) String() string { atomic.LoadInt64(&s.largestMessageSize), ) } + +type metricsRegistryInterceptor struct { + metrics.Registry + kafkaThrottlingNanos metrics.Histogram +} + +var _ metrics.Registry = (*metricsRegistryInterceptor)(nil) + +func newMetricsRegistryInterceptor(kafkaMetrics metrics.Histogram) *metricsRegistryInterceptor { + return &metricsRegistryInterceptor{ + Registry: metrics.NewRegistry(), + kafkaThrottlingNanos: kafkaMetrics, + } +} + +func (mri *metricsRegistryInterceptor) GetOrRegister(name string, i interface{}) interface{} { + const throttleTimeMsMetricsPrefix = "throttle-time-in-ms" + if strings.HasPrefix(name, throttleTimeMsMetricsPrefix) { + return mri.kafkaThrottlingNanos + } else { + return mri.Registry.GetOrRegister(name, i) + } +} diff --git a/pkg/ccl/changefeedccl/telemetry.go b/pkg/ccl/changefeedccl/telemetry.go index 8ca6badcbad5..b4fd9c566f7e 100644 --- a/pkg/ccl/changefeedccl/telemetry.go +++ b/pkg/ccl/changefeedccl/telemetry.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/log/eventpb" "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/rcrowley/go-metrics" ) type sinkTelemetryData struct { @@ -216,6 +217,12 @@ func (r *telemetryMetricsRecorder) newParallelIOMetricsRecorder() parallelIOMetr return r.inner.newParallelIOMetricsRecorder() } +func (r *telemetryMetricsRecorder) getKafkaThrottlingMetrics( + settings *cluster.Settings, +) metrics.Histogram { + return r.inner.getKafkaThrottlingMetrics(settings) +} + // continuousTelemetryInterval determines the interval at which each node emits telemetry events // during the lifespan of each enterprise changefeed. var continuousTelemetryInterval = settings.RegisterDurationSetting( From ce81ca1d71073bc0855fde593c4387bfaa3a3a2e Mon Sep 17 00:00:00 2001 From: Xin Hao Zhang Date: Mon, 5 Feb 2024 10:19:08 -0500 Subject: [PATCH 4/8] builtins: allow VIEWACTIVITY priv to use crdb_internal.request_statement_bundle Previously only those with the VIEWACTIVITY role could use the crdb_internal.request_statement_bundle builtin. We should allow the VIEWACTIVITY privilege as well since role options are now deprecated. This allow also allow stmt bundle requests to be made from db-console for users with this granted privilege. Epic: none Fixes: #118759 Release note (bug fix): Those with VIEWACTIVITY privilege can now request statement bundles using crdb_internal.requets_statement_bundle or via db-console's sql activity page. --- pkg/server/application_api/stmtdiag_test.go | 4 ++-- pkg/sql/sem/builtins/BUILD.bazel | 1 - pkg/sql/sem/builtins/builtins.go | 9 ++++----- 3 files changed, 6 insertions(+), 8 deletions(-) diff --git a/pkg/server/application_api/stmtdiag_test.go b/pkg/server/application_api/stmtdiag_test.go index 283fc19b985c..1e31dcd3791c 100644 --- a/pkg/server/application_api/stmtdiag_test.go +++ b/pkg/server/application_api/stmtdiag_test.go @@ -126,7 +126,7 @@ func TestCreateStatementDiagnosticsReportWithViewActivityOptions(t *testing.T) { require.Contains(t, err.Error(), "requesting statement bundle requires VIEWACTIVITY or ADMIN role option") // Grant VIEWACTIVITY and all test should work. - db.Exec(t, fmt.Sprintf("ALTER USER %s VIEWACTIVITY", apiconstants.TestingUserNameNoAdmin().Normalized())) + db.Exec(t, fmt.Sprintf("GRANT SYSTEM VIEWACTIVITY TO %s", apiconstants.TestingUserNameNoAdmin().Normalized())) req := &serverpb.CreateStatementDiagnosticsReportRequest{ StatementFingerprint: "INSERT INTO test VALUES (_)", } @@ -159,7 +159,7 @@ func TestCreateStatementDiagnosticsReportWithViewActivityOptions(t *testing.T) { `, [][]string{{"1"}}) // Grant VIEWACTIVITYREDACTED and all test should get permission errors. - db.Exec(t, fmt.Sprintf("ALTER USER %s VIEWACTIVITYREDACTED", apiconstants.TestingUserNameNoAdmin().Normalized())) + db.Exec(t, fmt.Sprintf("GRANT SYSTEM VIEWACTIVITYREDACTED TO %s", apiconstants.TestingUserNameNoAdmin().Normalized())) if err := srvtestutils.PostStatusJSONProtoWithAdminOption(ts, "stmtdiagreports", req, &resp, false); err != nil { if !testutils.IsError(err, "status: 403") { diff --git a/pkg/sql/sem/builtins/BUILD.bazel b/pkg/sql/sem/builtins/BUILD.bazel index f5fa1854d4fd..cc35f133e76f 100644 --- a/pkg/sql/sem/builtins/BUILD.bazel +++ b/pkg/sql/sem/builtins/BUILD.bazel @@ -74,7 +74,6 @@ go_library( "//pkg/sql/pgwire/pgnotice", "//pkg/sql/privilege", "//pkg/sql/protoreflect", - "//pkg/sql/roleoption", "//pkg/sql/rowenc", "//pkg/sql/rowenc/keyside", "//pkg/sql/rowenc/valueside", diff --git a/pkg/sql/sem/builtins/builtins.go b/pkg/sql/sem/builtins/builtins.go index c644080c790c..fbf55ae57f1c 100644 --- a/pkg/sql/sem/builtins/builtins.go +++ b/pkg/sql/sem/builtins/builtins.go @@ -58,7 +58,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgnotice" "github.com/cockroachdb/cockroach/pkg/sql/privilege" "github.com/cockroachdb/cockroach/pkg/sql/protoreflect" - "github.com/cockroachdb/cockroach/pkg/sql/roleoption" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/rowenc/keyside" "github.com/cockroachdb/cockroach/pkg/sql/sem/asof" @@ -11521,8 +11520,8 @@ true, then any plan other then the specified gist will be used` Types: typs, ReturnType: tree.FixedReturnType(types.Bool), Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) { - hasViewActivity, err := evalCtx.SessionAccessor.HasRoleOption( - ctx, roleoption.VIEWACTIVITY) + hasViewActivity, err := evalCtx.SessionAccessor.HasGlobalPrivilegeOrRoleOption( + ctx, privilege.VIEWACTIVITY) if err != nil { return nil, err } @@ -11537,8 +11536,8 @@ true, then any plan other then the specified gist will be used` return nil, err } - hasViewActivityRedacted, err := evalCtx.SessionAccessor.HasRoleOption( - ctx, roleoption.VIEWACTIVITYREDACTED) + hasViewActivityRedacted, err := evalCtx.SessionAccessor.HasGlobalPrivilegeOrRoleOption( + ctx, privilege.VIEWACTIVITYREDACTED) if err != nil { return nil, err } From b03471454b4bc2bf2ff8217126259c0a17f6b15c Mon Sep 17 00:00:00 2001 From: shralex Date: Wed, 27 Dec 2023 09:26:18 -0800 Subject: [PATCH 5/8] kv: log slow requests on replica level in addition to range level Previously, slow requests were only logged at the range level, but the logs did not indicate which replica is slow. Moreover, the SlowRPC metric attempted to represent the number of requests currently being retried, however it was done on the range level and therefore missed a second level of replica-level retries being done underneath. This PR adds logging on the replica level, removes a confusing log line, and changes the metric to count the number of slow requests in a simpler manner. Epic: https://cockroachlabs.atlassian.net/browse/CRDB-33510 Fixes: https://github.com/cockroachdb/cockroach/issues/114431 --- docs/generated/metrics/metrics.html | 3 +- pkg/kv/kvclient/kvcoord/dist_sender.go | 60 +++++++++++++++++++-- pkg/kv/kvclient/kvcoord/dist_sender_test.go | 14 ++++- 3 files changed, 71 insertions(+), 6 deletions(-) diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html index e986a54fd8c8..68a9c27d3e3b 100644 --- a/docs/generated/metrics/metrics.html +++ b/docs/generated/metrics/metrics.html @@ -924,6 +924,7 @@ APPLICATIONdistsender.rpc.transferlease.sentNumber of TransferLease requests processed.

This counts the requests in batches handed to DistSender, not the RPCs
sent to individual Ranges as a result.RPCsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE APPLICATIONdistsender.rpc.truncatelog.sentNumber of TruncateLog requests processed.

This counts the requests in batches handed to DistSender, not the RPCs
sent to individual Ranges as a result.RPCsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE APPLICATIONdistsender.rpc.writebatch.sentNumber of WriteBatch requests processed.

This counts the requests in batches handed to DistSender, not the RPCs
sent to individual Ranges as a result.RPCsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE +APPLICATIONdistsender.slow.replicarpcsNumber of slow replica-bound RPCs.

Note that this is not a good signal for KV health. The remote side of the
RPCs tracked here may experience contention, so an end user can easily
cause values for this metric to be emitted by leaving a transaction open
for a long time and contending with it using a second transaction.RequestsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE APPLICATIONjobs.adopt_iterationsnumber of job-adopt iterations performed by the registryiterationsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE APPLICATIONjobs.auto_config_env_runner.currently_idleNumber of auto_config_env_runner jobs currently considered Idle and can be freely shut downjobsGAUGECOUNTAVGNONE APPLICATIONjobs.auto_config_env_runner.currently_pausedNumber of auto_config_env_runner jobs currently considered PausedjobsGAUGECOUNTAVGNONE @@ -1244,7 +1245,7 @@ APPLICATIONphysical_replication.resolved_events_ingestedResolved events ingested by all replication jobsEventsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE APPLICATIONphysical_replication.runningNumber of currently running replication streamsReplication StreamsGAUGECOUNTAVGNONE APPLICATIONphysical_replication.sst_bytesSST bytes (compressed) sent to KV by all replication jobsBytesCOUNTERBYTESAVGNON_NEGATIVE_DERIVATIVE -APPLICATIONrequests.slow.distsenderNumber of replica-bound RPCs currently stuck or retrying for a long time.

Note that this is not a good signal for KV health. The remote side of the
RPCs tracked here may experience contention, so an end user can easily
cause values for this metric to be emitted by leaving a transaction open
for a long time and contending with it using a second transaction.RequestsGAUGECOUNTAVGNONE +APPLICATIONrequests.slow.distsenderNumber of range-bound RPCs currently stuck or retrying for a long time.

Note that this is not a good signal for KV health. The remote side of the
RPCs tracked here may experience contention, so an end user can easily
cause values for this metric to be emitted by leaving a transaction open
for a long time and contending with it using a second transaction.RequestsGAUGECOUNTAVGNONE APPLICATIONround-trip-latencyDistribution of round-trip latencies with other nodes.

This only reflects successful heartbeats and measures gRPC overhead as well as
possible head-of-line blocking. Elevated values in this metric may hint at
network issues and/or saturation, but they are no proof of them. CPU overload
can similarly elevate this metric. The operator should look towards OS-level
metrics such as packet loss, retransmits, etc, to conclusively diagnose network
issues. Heartbeats are not very frequent (~seconds), so they may not capture
rare or short-lived degradations.
Round-trip timeHISTOGRAMNANOSECONDSAVGNONE APPLICATIONrpc.connection.avg_round_trip_latencySum of exponentially weighted moving average of round-trip latencies, as measured through a gRPC RPC.

Dividing this Gauge by rpc.connection.healthy gives an approximation of average
latency, but the top-level round-trip-latency histogram is more useful. Instead,
users should consult the label families of this metric if they are available
(which requires prometheus and the cluster setting 'server.child_metrics.enabled');
these provide per-peer moving averages.

This metric does not track failed connection. A failed connection's contribution
is reset to zero.
LatencyGAUGENANOSECONDSAVGNONE APPLICATIONrpc.connection.failuresCounter of failed connections.

This includes both the event in which a healthy connection terminates as well as
unsuccessful reconnection attempts.

Connections that are terminated as part of local node shutdown are excluded.
Decommissioned peers are excluded.
ConnectionsCOUNTERCOUNTAVGNON_NEGATIVE_DERIVATIVE diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index 3473fbdd2fea..3c804b3bf9f5 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -34,6 +34,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" "github.com/cockroachdb/cockroach/pkg/util/grpcutil" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -159,7 +160,18 @@ var ( } metaDistSenderSlowRPCs = metric.Metadata{ Name: "requests.slow.distsender", - Help: `Number of replica-bound RPCs currently stuck or retrying for a long time. + Help: `Number of range-bound RPCs currently stuck or retrying for a long time. + +Note that this is not a good signal for KV health. The remote side of the +RPCs tracked here may experience contention, so an end user can easily +cause values for this metric to be emitted by leaving a transaction open +for a long time and contending with it using a second transaction.`, + Measurement: "Requests", + Unit: metric.Unit_COUNT, + } + metaDistSenderSlowReplicaRPCs = metric.Metadata{ + Name: "distsender.slow.replicarpcs", + Help: `Number of slow replica-bound RPCs. Note that this is not a good signal for KV health. The remote side of the RPCs tracked here may experience contention, so an end user can easily @@ -310,6 +322,7 @@ type DistSenderMetrics struct { InLeaseTransferBackoffs *metric.Counter RangeLookups *metric.Counter SlowRPCs *metric.Gauge + SlowReplicaRPCs *metric.Counter MethodCounts [kvpb.NumMethods]*metric.Counter ErrCounts [kvpb.NumErrors]*metric.Counter DistSenderRangeFeedMetrics @@ -342,6 +355,7 @@ func makeDistSenderMetrics() DistSenderMetrics { InLeaseTransferBackoffs: metric.NewCounter(metaDistSenderInLeaseTransferBackoffsCount), RangeLookups: metric.NewCounter(metaDistSenderRangeLookups), SlowRPCs: metric.NewGauge(metaDistSenderSlowRPCs), + SlowReplicaRPCs: metric.NewCounter(metaDistSenderSlowReplicaRPCs), DistSenderRangeFeedMetrics: makeDistSenderRangeFeedMetrics(), } for i := range m.MethodCounts { @@ -1814,6 +1828,22 @@ func slowRangeRPCReturnWarningStr(s *redact.StringBuilder, dur time.Duration, at s.Printf("slow RPC finished after %.2fs (%d attempts)", dur.Seconds(), attempts) } +func slowReplicaRPCWarningStr( + s *redact.StringBuilder, + ba *kvpb.BatchRequest, + dur time.Duration, + attempts int64, + err error, + br *kvpb.BatchResponse, +) { + resp := interface{}(err) + if resp == nil { + resp = br + } + s.Printf("have been waiting %.2fs (%d attempts) for RPC %s to replica %s; resp: %s", + dur.Seconds(), attempts, ba, ba.Replica, resp) +} + // sendPartialBatch sends the supplied batch to the range specified by the // routing token. // @@ -1907,8 +1937,7 @@ func (ds *DistSender) sendPartialBatch( prevTok = routingTok reply, err = ds.sendToReplicas(ctx, ba, routingTok, withCommit) - const slowDistSenderThreshold = time.Minute - if dur := timeutil.Since(tBegin); dur > slowDistSenderThreshold && !tBegin.IsZero() { + if dur := timeutil.Since(tBegin); dur > slowDistSenderRangeThreshold && !tBegin.IsZero() { { var s redact.StringBuilder slowRangeRPCWarningStr(&s, ba, dur, attempts, routingTok.Desc(), err, reply) @@ -2189,6 +2218,15 @@ func noMoreReplicasErr(ambiguousErr, lastAttemptErr error) error { return newSendError(errors.Wrap(lastAttemptErr, "sending to all replicas failed; last error")) } +// slowDistSenderRangeThreshold is a latency threshold for logging slow +// requests to a range, potentially involving RPCs to multiple replicas +// of the range. +const slowDistSenderRangeThreshold = time.Minute + +// slowDistSenderReplicaThreshold is a latency threshold for logging a slow RPC +// to a single replica. +const slowDistSenderReplicaThreshold = 10 * time.Second + // defaultSendClosedTimestampPolicy is used when the closed timestamp policy // is not known by the range cache. This choice prevents sending batch requests // to only voters when a perfectly good non-voter may exist in the local @@ -2325,7 +2363,8 @@ func (ds *DistSender) sendToReplicas( // per-replica state and may succeed on other replicas. var ambiguousError error var br *kvpb.BatchResponse - for first := true; ; first = false { + attempts := int64(0) + for first := true; ; first, attempts = false, attempts+1 { if !first { ds.metrics.NextReplicaErrCount.Inc(1) } @@ -2411,7 +2450,20 @@ func (ds *DistSender) sendToReplicas( comparisonResult := ds.getLocalityComparison(ctx, ds.nodeIDGetter(), ba.Replica.NodeID) ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchRequest(comparisonResult, int64(ba.Size())) + tBegin := timeutil.Now() // for slow log message br, err = transport.SendNext(ctx, ba) + if dur := timeutil.Since(tBegin); dur > slowDistSenderReplicaThreshold { + var s redact.StringBuilder + slowReplicaRPCWarningStr(&s, ba, dur, attempts, err, br) + if admissionpb.WorkPriority(ba.AdmissionHeader.Priority) >= admissionpb.NormalPri { + // Note that these RPC may or may not have succeeded. Errors are counted separately below. + ds.metrics.SlowReplicaRPCs.Inc(1) + log.Warningf(ctx, "slow replica RPC: %v", &s) + } else { + log.Eventf(ctx, "slow replica RPC: %v", &s) + } + } + ds.metrics.updateCrossLocalityMetricsOnReplicaAddressedBatchResponse(comparisonResult, int64(br.Size())) ds.maybeIncrementErrCounters(br, err) diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_test.go index 68b5fa67f476..0a61afbee02a 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_test.go @@ -4546,6 +4546,11 @@ func TestDistSenderSlowLogMessage(t *testing.T) { get.KeyLockingStrength = lock.Shared get.KeyLockingDurability = lock.Unreplicated ba.Add(get) + ba.Replica = roachpb.ReplicaDescriptor{ + ReplicaID: 1, + NodeID: 2, + StoreID: 3, + } br := &kvpb.BatchResponse{} br.Error = kvpb.NewError(errors.New("boom")) desc := &roachpb.RangeDescriptor{RangeID: 9, StartKey: roachpb.RKey("x"), EndKey: roachpb.RKey("z")} @@ -4557,7 +4562,6 @@ func TestDistSenderSlowLogMessage(t *testing.T) { act := s.RedactableString() require.EqualValues(t, exp, act) } - { exp := `slow RPC finished after 8.16s (120 attempts)` var s redact.StringBuilder @@ -4565,6 +4569,14 @@ func TestDistSenderSlowLogMessage(t *testing.T) { act := s.RedactableString() require.EqualValues(t, exp, act) } + { + exp := `have been waiting 8.16s (120 attempts) for RPC Get(Shared,Unreplicated) [‹"a"›] to` + + ` replica (n2,s3):1; resp: ‹(err: boom)›` + var s redact.StringBuilder + slowReplicaRPCWarningStr(&s, ba, dur, attempts, nil /* err */, br) + act := s.RedactableString() + require.EqualValues(t, exp, act) + } } // TestErrorIndexOnRangeSplit verifies that in case divideAndSendBatchToRanges From 63cd42399c364fd0913d322f9d910e57bea14382 Mon Sep 17 00:00:00 2001 From: Ricky Stewart Date: Mon, 5 Feb 2024 17:26:10 -0600 Subject: [PATCH 6/8] kvfollowerreadsccl: skip test under `race` not `stressrace` Epic: CRDB-8308 Release note: None --- pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go index 885d6b886797..33319d015a83 100644 --- a/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go +++ b/pkg/ccl/kvccl/kvfollowerreadsccl/followerreads_test.go @@ -940,7 +940,7 @@ func TestSecondaryTenantFollowerReadsRouting(t *testing.T) { defer log.Scope(t).Close(t) defer utilccl.TestingEnableEnterprise()() - skip.UnderStressRace(t, "times out") + skip.UnderRace(t, "times out") skip.UnderDeadlock(t) for _, testCase := range []struct { From d6238445161d5d3f98df262d1130f868a411265a Mon Sep 17 00:00:00 2001 From: Rail Aliiev Date: Mon, 5 Feb 2024 18:42:19 -0500 Subject: [PATCH 7/8] bincheck: do not run geos tests on Windows In #106642 we stopped shipping libgeos on Windows, but didn't update the bincheck test to reflect the change. Epic: none Release note: None --- build/release/bincheck/bincheck | 2 +- build/release/bincheck/download_binary.sh | 2 -- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/build/release/bincheck/bincheck b/build/release/bincheck/bincheck index 9478e1a995d7..ee47b5b2fdee 100755 --- a/build/release/bincheck/bincheck +++ b/build/release/bincheck/bincheck @@ -57,7 +57,7 @@ EOF diff -u expected actual # Verify libgeos functionality on all platforms except MacOS ARM64 and Windows -if [[ $(uname -om) == "Darwin arm64" ]]; then +if [[ $(uname -om) == "Darwin arm64" || $(uname -o) == "Msys" ]]; then echo "Skipping libgeos tests" else echo "Testing libgeos functionality" diff --git a/build/release/bincheck/download_binary.sh b/build/release/bincheck/download_binary.sh index 3fba8260dd58..c0b08c44e921 100755 --- a/build/release/bincheck/download_binary.sh +++ b/build/release/bincheck/download_binary.sh @@ -17,8 +17,6 @@ download_and_extract() { else curl -sSfL "${binary_url}" > cockroach.zip 7z e -omnt cockroach.zip - mkdir -p mnt/lib - mv mnt/*.dll mnt/lib/ fi echo "Downloaded ${binary_url}" From c211e8f5df890372b2732b6f92f52d9dd5f85382 Mon Sep 17 00:00:00 2001 From: Ricky Stewart Date: Mon, 5 Feb 2024 15:16:59 -0600 Subject: [PATCH 8/8] jobs,application_api: replace calls to `skip.Stress` with `skip.Duress` `skip.Duress()` seems like it should have been used in this case as it gives more time under both `race` and `deadlock`. This will give these tests some extra time if they run in a heavyweight configuration but not "under stress". Epic: CRDB-8308 Release note: None --- pkg/jobs/registry_external_test.go | 2 +- pkg/server/application_api/sql_stats_test.go | 20 ++++++++++---------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/pkg/jobs/registry_external_test.go b/pkg/jobs/registry_external_test.go index f7c69622950e..726949fa5d99 100644 --- a/pkg/jobs/registry_external_test.go +++ b/pkg/jobs/registry_external_test.go @@ -797,7 +797,7 @@ func TestWaitWithRetryableError(t *testing.T) { registry.WaitForJobs( ctx, []jobspb.JobID{id}, )) - if !skip.Stress() { + if !skip.Duress() { require.Equalf(t, int64(targetNumberOfRetries), numberOfTimesDetected.Load(), "jobs query did not retry") } else { // For stress be lenient since we are relying on timing for leasing diff --git a/pkg/server/application_api/sql_stats_test.go b/pkg/server/application_api/sql_stats_test.go index 83c53e6e5bf2..18294b71dc64 100644 --- a/pkg/server/application_api/sql_stats_test.go +++ b/pkg/server/application_api/sql_stats_test.go @@ -47,9 +47,9 @@ import ( "github.com/stretchr/testify/require" ) -// additionalTimeoutUnderStress is the additional timeout to use for the http +// additionalTimeoutUnderDuress is the additional timeout to use for the http // client if under stress. -const additionalTimeoutUnderStress = 30 * time.Second +const additionalTimeoutUnderDuress = 30 * time.Second func TestStatusAPICombinedTransactions(t *testing.T) { defer leaktest.AfterTest(t)() @@ -57,8 +57,8 @@ func TestStatusAPICombinedTransactions(t *testing.T) { // Increase the timeout for the http client if under stress. additionalTimeout := 0 * time.Second - if skip.Stress() { - additionalTimeout = additionalTimeoutUnderStress + if skip.Duress() { + additionalTimeout = additionalTimeoutUnderDuress } var params base.TestServerArgs @@ -393,8 +393,8 @@ func TestStatusAPIStatements(t *testing.T) { // Increase the timeout for the http client if under stress. additionalTimeout := 0 * time.Second - if skip.Stress() { - additionalTimeout = additionalTimeoutUnderStress + if skip.Duress() { + additionalTimeout = additionalTimeoutUnderDuress } // Aug 30 2021 19:50:00 GMT+0000 @@ -520,8 +520,8 @@ func TestStatusAPICombinedStatementsTotalLatency(t *testing.T) { skip.UnderRace(t, "test is too slow to run under race") // Increase the timeout for the http client if under stress. additionalTimeout := 0 * time.Second - if skip.Stress() { - additionalTimeout = additionalTimeoutUnderStress + if skip.Duress() { + additionalTimeout = additionalTimeoutUnderDuress } sqlStatsKnobs := sqlstats.CreateTestingKnobs() @@ -676,8 +676,8 @@ func TestStatusAPICombinedStatementsWithFullScans(t *testing.T) { // Increase the timeout for the http client if under stress. additionalTimeout := 0 * time.Second - if skip.Stress() { - additionalTimeout = additionalTimeoutUnderStress + if skip.Duress() { + additionalTimeout = additionalTimeoutUnderDuress } skip.UnderStressRace(t, "test is too slow to run under stressrace")