diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html
index 68a9c27d3e3b..5ad567ffe388 100644
--- a/docs/generated/metrics/metrics.html
+++ b/docs/generated/metrics/metrics.html
@@ -762,6 +762,7 @@
APPLICATION | changefeed.forwarded_resolved_messages | Resolved timestamps forwarded from the change aggregator to the change frontier | Messages | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | changefeed.frontier_updates | Number of change frontier updates across all feeds | Updates | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
APPLICATION | changefeed.internal_retry_message_count | Number of messages for which an attempt to retry them within an aggregator node was made | Messages | GAUGE | COUNT | AVG | NONE |
+APPLICATION | changefeed.kafka_throttling_hist_nanos | Time spent in throttling due to exceeding kafka quota | Nanoseconds | HISTOGRAM | NANOSECONDS | AVG | NONE |
APPLICATION | changefeed.lagging_ranges | The number of ranges considered to be lagging behind | Ranges | GAUGE | COUNT | AVG | NONE |
APPLICATION | changefeed.max_behind_nanos | (Deprecated in favor of checkpoint_progress) The most any changefeed's persisted checkpoint is behind the present | Nanoseconds | GAUGE | NANOSECONDS | AVG | NONE |
APPLICATION | changefeed.message_size_hist | Message size histogram | Bytes | HISTOGRAM | BYTES | AVG | NONE |
diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel
index 42cd38404dd0..c854a46a0943 100644
--- a/pkg/ccl/changefeedccl/BUILD.bazel
+++ b/pkg/ccl/changefeedccl/BUILD.bazel
@@ -163,6 +163,7 @@ go_library(
"@com_github_klauspost_pgzip//:pgzip",
"@com_github_linkedin_goavro_v2//:goavro",
"@com_github_shopify_sarama//:sarama",
+ "@com_github_rcrowley_go_metrics//:go-metrics",
"@com_github_xdg_go_scram//:scram",
"@com_google_cloud_go_pubsub//:pubsub",
"@com_google_cloud_go_pubsub//apiv1",
diff --git a/pkg/ccl/changefeedccl/metrics.go b/pkg/ccl/changefeedccl/metrics.go
index 74aa8eca110b..10d941642bd0 100644
--- a/pkg/ccl/changefeedccl/metrics.go
+++ b/pkg/ccl/changefeedccl/metrics.go
@@ -19,13 +19,16 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/multitenant"
+ "github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
+ "github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/metric/aggmetric"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
+ "github.com/rcrowley/go-metrics"
)
const (
@@ -35,6 +38,7 @@ const (
changefeedIOQueueMaxLatency = 5 * time.Minute
admitLatencyMaxValue = 1 * time.Minute
commitLatencyMaxValue = 10 * time.Minute
+ kafkaThrottlingTimeMaxValue = 5 * time.Minute
)
// max length for the scope name.
@@ -76,6 +80,7 @@ type AggMetrics struct {
CheckpointProgress *aggmetric.AggGauge
LaggingRanges *aggmetric.AggGauge
CloudstorageBufferedBytes *aggmetric.AggGauge
+ KafkaThrottlingNanos *aggmetric.AggHistogram
// There is always at least 1 sliMetrics created for defaultSLI scope.
mu struct {
@@ -106,6 +111,7 @@ type metricsRecorder interface {
newParallelIOMetricsRecorder() parallelIOMetricsRecorder
recordSinkIOInflightChange(int64)
makeCloudstorageFileAllocCallback() func(delta int64)
+ getKafkaThrottlingMetrics(*cluster.Settings) metrics.Histogram
}
var _ metricsRecorder = (*sliMetrics)(nil)
@@ -145,6 +151,7 @@ type sliMetrics struct {
CheckpointProgress *aggmetric.Gauge
LaggingRanges *aggmetric.Gauge
CloudstorageBufferedBytes *aggmetric.Gauge
+ KafkaThrottlingNanos *aggmetric.Histogram
mu struct {
syncutil.Mutex
@@ -325,6 +332,80 @@ func (m *sliMetrics) recordSizeBasedFlush() {
m.SizeBasedFlushes.Inc(1)
}
+type kafkaHistogramAdapter struct {
+ settings *cluster.Settings
+ wrapped *aggmetric.Histogram
+}
+
+var _ metrics.Histogram = (*kafkaHistogramAdapter)(nil)
+
+func (k *kafkaHistogramAdapter) Update(valueInMs int64) {
+ if k != nil {
+ // valueInMs is passed in from sarama with a unit of milliseconds. To
+ // convert this value to nanoseconds, valueInMs * 10^6 is recorded here.
+ k.wrapped.RecordValue(valueInMs * 1000000)
+ }
+}
+
+func (k *kafkaHistogramAdapter) Clear() {
+ logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Sum on kafkaHistogramAdapter")
+}
+
+func (k *kafkaHistogramAdapter) Count() (_ int64) {
+ logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Count on kafkaHistogramAdapter")
+ return
+}
+
+func (k *kafkaHistogramAdapter) Max() (_ int64) {
+ logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Max on kafkaHistogramAdapter")
+ return
+}
+
+func (k *kafkaHistogramAdapter) Mean() (_ float64) {
+ logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Mean on kafkaHistogramAdapter")
+ return
+}
+
+func (k *kafkaHistogramAdapter) Min() (_ int64) {
+ logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Min on kafkaHistogramAdapter")
+ return
+}
+
+func (k *kafkaHistogramAdapter) Percentile(float64) (_ float64) {
+ logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Percentile on kafkaHistogramAdapter")
+ return
+}
+
+func (k *kafkaHistogramAdapter) Percentiles([]float64) (_ []float64) {
+ logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Percentiles on kafkaHistogramAdapter")
+ return
+}
+
+func (k *kafkaHistogramAdapter) Sample() (_ metrics.Sample) {
+ logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Sample on kafkaHistogramAdapter")
+ return
+}
+
+func (k *kafkaHistogramAdapter) Snapshot() (_ metrics.Histogram) {
+ logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Snapshot on kafkaHistogramAdapter")
+ return
+}
+
+func (k *kafkaHistogramAdapter) StdDev() (_ float64) {
+ logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to StdDev on kafkaHistogramAdapter")
+ return
+}
+
+func (k *kafkaHistogramAdapter) Sum() (_ int64) {
+ logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Sum on kafkaHistogramAdapter")
+ return
+}
+
+func (k *kafkaHistogramAdapter) Variance() (_ float64) {
+ logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Variance on kafkaHistogramAdapter")
+ return
+}
+
type parallelIOMetricsRecorder interface {
recordPendingQueuePush(numKeys int64)
recordPendingQueuePop(numKeys int64, latency time.Duration)
@@ -380,6 +461,16 @@ func (m *sliMetrics) newParallelIOMetricsRecorder() parallelIOMetricsRecorder {
}
}
+func (m *sliMetrics) getKafkaThrottlingMetrics(settings *cluster.Settings) metrics.Histogram {
+ if m == nil {
+ return (*kafkaHistogramAdapter)(nil)
+ }
+ return &kafkaHistogramAdapter{
+ settings: settings,
+ wrapped: m.KafkaThrottlingNanos,
+ }
+}
+
func (m *sliMetrics) recordSinkIOInflightChange(delta int64) {
if m == nil {
return
@@ -470,6 +561,12 @@ func (w *wrappingCostController) newParallelIOMetricsRecorder() parallelIOMetric
return w.inner.newParallelIOMetricsRecorder()
}
+func (w *wrappingCostController) getKafkaThrottlingMetrics(
+ settings *cluster.Settings,
+) metrics.Histogram {
+ return w.inner.getKafkaThrottlingMetrics(settings)
+}
+
var (
metaChangefeedForwardedResolvedMessages = metric.Metadata{
Name: "changefeed.forwarded_resolved_messages",
@@ -721,6 +818,12 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
Measurement: "Bytes",
Unit: metric.Unit_COUNT,
}
+ metaChangefeedKafkaThrottlingNanos := metric.Metadata{
+ Name: "changefeed.kafka_throttling_hist_nanos",
+ Help: "Time spent in throttling due to exceeding kafka quota",
+ Measurement: "Nanoseconds",
+ Unit: metric.Unit_NANOSECONDS,
+ }
functionalGaugeMinFn := func(childValues []int64) int64 {
var min int64
@@ -813,6 +916,13 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
CheckpointProgress: b.FunctionalGauge(metaCheckpointProgress, functionalGaugeMinFn),
LaggingRanges: b.Gauge(metaLaggingRangePercentage),
CloudstorageBufferedBytes: b.Gauge(metaCloudstorageBufferedBytes),
+ KafkaThrottlingNanos: b.Histogram(metric.HistogramOptions{
+ Metadata: metaChangefeedKafkaThrottlingNanos,
+ Duration: histogramWindow,
+ MaxVal: kafkaThrottlingTimeMaxValue.Nanoseconds(),
+ SigFigs: 2,
+ BucketConfig: metric.BatchProcessLatencyBuckets,
+ }),
}
a.mu.sliMetrics = make(map[string]*sliMetrics)
_, err := a.getOrCreateScope(defaultSLIScope)
@@ -878,6 +988,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
SchemaRegistrations: a.SchemaRegistrations.AddChild(scope),
LaggingRanges: a.LaggingRanges.AddChild(scope),
CloudstorageBufferedBytes: a.CloudstorageBufferedBytes.AddChild(scope),
+ KafkaThrottlingNanos: a.KafkaThrottlingNanos.AddChild(scope),
}
sm.mu.resolved = make(map[int64]hlc.Timestamp)
sm.mu.checkpoint = make(map[int64]hlc.Timestamp)
diff --git a/pkg/ccl/changefeedccl/sink_kafka.go b/pkg/ccl/changefeedccl/sink_kafka.go
index 74327b98ca96..5881454773e3 100644
--- a/pkg/ccl/changefeedccl/sink_kafka.go
+++ b/pkg/ccl/changefeedccl/sink_kafka.go
@@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
+ "github.com/rcrowley/go-metrics"
"golang.org/x/oauth2"
"golang.org/x/oauth2/clientcredentials"
)
@@ -1078,7 +1079,10 @@ func buildConfluentKafkaConfig(u sinkURL) (kafkaDialConfig, error) {
}
func buildKafkaConfig(
- ctx context.Context, u sinkURL, jsonStr changefeedbase.SinkSpecificJSONConfig,
+ ctx context.Context,
+ u sinkURL,
+ jsonStr changefeedbase.SinkSpecificJSONConfig,
+ kafkaThrottlingMetrics metrics.Histogram,
) (*sarama.Config, error) {
dialConfig, err := buildDialConfig(u)
if err != nil {
@@ -1090,6 +1094,7 @@ func buildKafkaConfig(
config.Producer.Partitioner = newChangefeedPartitioner
// Do not fetch metadata for all topics but just for the necessary ones.
config.Metadata.Full = false
+ config.MetricRegistry = newMetricsRegistryInterceptor(kafkaThrottlingMetrics)
if dialConfig.tlsEnabled {
config.Net.TLS.Enable = true
@@ -1176,7 +1181,8 @@ func makeKafkaSink(
return nil, errors.Errorf(`%s is not yet supported`, changefeedbase.SinkParamSchemaTopic)
}
- config, err := buildKafkaConfig(ctx, u, jsonStr)
+ m := mb(requiresResourceAccounting)
+ config, err := buildKafkaConfig(ctx, u, jsonStr, m.getKafkaThrottlingMetrics(settings))
if err != nil {
return nil, err
}
@@ -1195,7 +1201,7 @@ func makeKafkaSink(
ctx: ctx,
kafkaCfg: config,
bootstrapAddrs: u.Host,
- metrics: mb(requiresResourceAccounting),
+ metrics: m,
topics: topics,
disableInternalRetry: !internalRetryEnabled,
}
@@ -1234,3 +1240,26 @@ func (s *kafkaStats) String() string {
atomic.LoadInt64(&s.largestMessageSize),
)
}
+
+type metricsRegistryInterceptor struct {
+ metrics.Registry
+ kafkaThrottlingNanos metrics.Histogram
+}
+
+var _ metrics.Registry = (*metricsRegistryInterceptor)(nil)
+
+func newMetricsRegistryInterceptor(kafkaMetrics metrics.Histogram) *metricsRegistryInterceptor {
+ return &metricsRegistryInterceptor{
+ Registry: metrics.NewRegistry(),
+ kafkaThrottlingNanos: kafkaMetrics,
+ }
+}
+
+func (mri *metricsRegistryInterceptor) GetOrRegister(name string, i interface{}) interface{} {
+ const throttleTimeMsMetricsPrefix = "throttle-time-in-ms"
+ if strings.HasPrefix(name, throttleTimeMsMetricsPrefix) {
+ return mri.kafkaThrottlingNanos
+ } else {
+ return mri.Registry.GetOrRegister(name, i)
+ }
+}
diff --git a/pkg/ccl/changefeedccl/telemetry.go b/pkg/ccl/changefeedccl/telemetry.go
index 8ca6badcbad5..b4fd9c566f7e 100644
--- a/pkg/ccl/changefeedccl/telemetry.go
+++ b/pkg/ccl/changefeedccl/telemetry.go
@@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
+ "github.com/rcrowley/go-metrics"
)
type sinkTelemetryData struct {
@@ -216,6 +217,12 @@ func (r *telemetryMetricsRecorder) newParallelIOMetricsRecorder() parallelIOMetr
return r.inner.newParallelIOMetricsRecorder()
}
+func (r *telemetryMetricsRecorder) getKafkaThrottlingMetrics(
+ settings *cluster.Settings,
+) metrics.Histogram {
+ return r.inner.getKafkaThrottlingMetrics(settings)
+}
+
// continuousTelemetryInterval determines the interval at which each node emits telemetry events
// during the lifespan of each enterprise changefeed.
var continuousTelemetryInterval = settings.RegisterDurationSetting(