Skip to content

Commit

Permalink
changefeedccl: add observability metrics into sarama code
Browse files Browse the repository at this point in the history
Now that this patch has been merged, sarama now acknowledges and reacts to kafka
server's throttling messages by slowing down. To provide better observability
into sarama code, this patch adds a metrics registry interceptor and a new
metrics `changefeed.kafka_throttling_hist_nanos` which tracks time (in nanos)
spent in sarama's throttling when cockroachdb exceed the kafka quota.

Release note: changefeed.kafka_throttling_hist_nanos has now been added to
metrics to monitor sarama throttling behavior resulting from exceeding kafka
quota.

Fixes: cockroachdb#117618
  • Loading branch information
wenyihu6 committed Feb 2, 2024
1 parent f71b21d commit c662b6f
Show file tree
Hide file tree
Showing 5 changed files with 161 additions and 3 deletions.
1 change: 1 addition & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,7 @@
<tr><td>APPLICATION</td><td>changefeed.forwarded_resolved_messages</td><td>Resolved timestamps forwarded from the change aggregator to the change frontier</td><td>Messages</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.frontier_updates</td><td>Number of change frontier updates across all feeds</td><td>Updates</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.internal_retry_message_count</td><td>Number of messages for which an attempt to retry them within an aggregator node was made</td><td>Messages</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.kafka_throttling_hist_nanos</td><td>Time spent in throttling due to exceeding kafka quota</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.lagging_ranges</td><td>The number of ranges considered to be lagging behind</td><td>Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.max_behind_nanos</td><td>(Deprecated in favor of checkpoint_progress) The most any changefeed&#39;s persisted checkpoint is behind the present</td><td>Nanoseconds</td><td>GAUGE</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.message_size_hist</td><td>Message size histogram</td><td>Bytes</td><td>HISTOGRAM</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ go_library(
"@com_github_klauspost_compress//zstd",
"@com_github_klauspost_pgzip//:pgzip",
"@com_github_linkedin_goavro_v2//:goavro",
"@com_github_rcrowley_go_metrics//:go-metrics",
"@com_github_xdg_go_scram//:scram",
"@com_google_cloud_go_pubsub//:pubsub",
"@com_google_cloud_go_pubsub//apiv1",
Expand Down
123 changes: 123 additions & 0 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/metric/aggmetric"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/rcrowley/go-metrics"
)

const (
Expand All @@ -35,6 +37,7 @@ const (
changefeedIOQueueMaxLatency = 5 * time.Minute
admitLatencyMaxValue = 1 * time.Minute
commitLatencyMaxValue = 10 * time.Minute
kafkaThrottlingTimeMaxValue = 5 * time.Minute
)

// max length for the scope name.
Expand Down Expand Up @@ -76,6 +79,7 @@ type AggMetrics struct {
CheckpointProgress *aggmetric.AggGauge
LaggingRanges *aggmetric.AggGauge
CloudstorageBufferedBytes *aggmetric.AggGauge
KafkaThrottlingNanos *aggmetric.AggHistogram

// There is always at least 1 sliMetrics created for defaultSLI scope.
mu struct {
Expand Down Expand Up @@ -106,6 +110,7 @@ type metricsRecorder interface {
newParallelIOMetricsRecorder() parallelIOMetricsRecorder
recordSinkIOInflightChange(int64)
makeCloudstorageFileAllocCallback() func(delta int64)
newKafkaMetricsGetter() KafkaMetricsGetter
}

var _ metricsRecorder = (*sliMetrics)(nil)
Expand Down Expand Up @@ -145,6 +150,7 @@ type sliMetrics struct {
CheckpointProgress *aggmetric.Gauge
LaggingRanges *aggmetric.Gauge
CloudstorageBufferedBytes *aggmetric.Gauge
KafkaThrottlingNanos *aggmetric.Histogram

mu struct {
syncutil.Mutex
Expand Down Expand Up @@ -325,6 +331,94 @@ func (m *sliMetrics) recordSizeBasedFlush() {
m.SizeBasedFlushes.Inc(1)
}

type kafkaHistogramAdapter struct {
wrapped *aggmetric.Histogram
}

var _ metrics.Histogram = (*kafkaHistogramAdapter)(nil)

func (k *kafkaHistogramAdapter) Update(valueInMs int64) {
if k != nil {
// valueInMs is passed in from sarama with a unit of milliseconds. To
// convert this value to nanoseconds, valueInMs * 10^6 is recorded here.
k.wrapped.RecordValue(valueInMs * 1000000)
}
}

func (k *kafkaHistogramAdapter) Clear() {
log.Error(context.Background(), "clear is not expected to be called on kafkaHistogramAdapter")
}

func (k *kafkaHistogramAdapter) Count() (_ int64) {
log.Error(context.Background(), "count is not expected to be called on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) Max() (_ int64) {
log.Error(context.Background(), "max is not expected to be called on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) Mean() (_ float64) {
log.Error(context.Background(), "mean is not expected to be called on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) Min() (_ int64) {
log.Error(context.Background(), "min is not expected to be called on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) Percentile(float64) (_ float64) {
log.Error(context.Background(), "percentile is not expected to be called on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) Percentiles([]float64) (_ []float64) {
log.Error(context.Background(), "percentiles is not expected to be called on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) Sample() (_ metrics.Sample) {
log.Error(context.Background(), "sample is not expected to be called on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) Snapshot() (_ metrics.Histogram) {
log.Error(context.Background(), "snapshot is not expected to be called on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) StdDev() (_ float64) {
log.Error(context.Background(), "stdDev is not expected to be called on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) Sum() (_ int64) {
log.Error(context.Background(), "sum is not expected to be called on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) Variance() (_ float64) {
log.Error(context.Background(), "variance is not expected to be called on kafkaHistogramAdapter")
return
}

type KafkaMetricsGetter interface {
GetKafkaThrottlingNanos() *kafkaHistogramAdapter
}

type kafkaMetricsGetterImpl struct {
kafkaThrottlingNanos *kafkaHistogramAdapter
}

func (kg *kafkaMetricsGetterImpl) GetKafkaThrottlingNanos() *kafkaHistogramAdapter {
if kg == nil {
return (*kafkaHistogramAdapter)(nil)
}
return kg.kafkaThrottlingNanos
}

type parallelIOMetricsRecorder interface {
recordPendingQueuePush(numKeys int64)
recordPendingQueuePop(numKeys int64, latency time.Duration)
Expand Down Expand Up @@ -380,6 +474,17 @@ func (m *sliMetrics) newParallelIOMetricsRecorder() parallelIOMetricsRecorder {
}
}

func (m *sliMetrics) newKafkaMetricsGetter() KafkaMetricsGetter {
if m == nil {
return (*kafkaMetricsGetterImpl)(nil)
}
return &kafkaMetricsGetterImpl{
kafkaThrottlingNanos: &kafkaHistogramAdapter{
wrapped: m.KafkaThrottlingNanos,
},
}
}

func (m *sliMetrics) recordSinkIOInflightChange(delta int64) {
if m == nil {
return
Expand Down Expand Up @@ -470,6 +575,10 @@ func (w *wrappingCostController) newParallelIOMetricsRecorder() parallelIOMetric
return w.inner.newParallelIOMetricsRecorder()
}

func (w *wrappingCostController) newKafkaMetricsGetter() KafkaMetricsGetter {
return w.inner.newKafkaMetricsGetter()
}

var (
metaChangefeedForwardedResolvedMessages = metric.Metadata{
Name: "changefeed.forwarded_resolved_messages",
Expand Down Expand Up @@ -721,6 +830,12 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
Measurement: "Bytes",
Unit: metric.Unit_COUNT,
}
metaChangefeedKafkaThrottlingNanos := metric.Metadata{
Name: "changefeed.kafka_throttling_hist_nanos",
Help: "Time spent in throttling due to exceeding kafka quota",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}

functionalGaugeMinFn := func(childValues []int64) int64 {
var min int64
Expand Down Expand Up @@ -813,6 +928,13 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
CheckpointProgress: b.FunctionalGauge(metaCheckpointProgress, functionalGaugeMinFn),
LaggingRanges: b.Gauge(metaLaggingRangePercentage),
CloudstorageBufferedBytes: b.Gauge(metaCloudstorageBufferedBytes),
KafkaThrottlingNanos: b.Histogram(metric.HistogramOptions{
Metadata: metaChangefeedKafkaThrottlingNanos,
Duration: histogramWindow,
MaxVal: kafkaThrottlingTimeMaxValue.Nanoseconds(),
SigFigs: 2,
BucketConfig: metric.BatchProcessLatencyBuckets,
}),
}
a.mu.sliMetrics = make(map[string]*sliMetrics)
_, err := a.getOrCreateScope(defaultSLIScope)
Expand Down Expand Up @@ -878,6 +1000,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
SchemaRegistrations: a.SchemaRegistrations.AddChild(scope),
LaggingRanges: a.LaggingRanges.AddChild(scope),
CloudstorageBufferedBytes: a.CloudstorageBufferedBytes.AddChild(scope),
KafkaThrottlingNanos: a.KafkaThrottlingNanos.AddChild(scope),
}
sm.mu.resolved = make(map[int64]hlc.Timestamp)
sm.mu.checkpoint = make(map[int64]hlc.Timestamp)
Expand Down
35 changes: 32 additions & 3 deletions pkg/ccl/changefeedccl/sink_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/rcrowley/go-metrics"
"golang.org/x/oauth2"
"golang.org/x/oauth2/clientcredentials"
)
Expand Down Expand Up @@ -1078,7 +1079,10 @@ func buildConfluentKafkaConfig(u sinkURL) (kafkaDialConfig, error) {
}

func buildKafkaConfig(
ctx context.Context, u sinkURL, jsonStr changefeedbase.SinkSpecificJSONConfig,
ctx context.Context,
u sinkURL,
jsonStr changefeedbase.SinkSpecificJSONConfig,
kafkaMetricsGetter KafkaMetricsGetter,
) (*sarama.Config, error) {
dialConfig, err := buildDialConfig(u)
if err != nil {
Expand All @@ -1090,6 +1094,7 @@ func buildKafkaConfig(
config.Producer.Partitioner = newChangefeedPartitioner
// Do not fetch metadata for all topics but just for the necessary ones.
config.Metadata.Full = false
config.MetricRegistry = newMetricsRegistryInterceptor(kafkaMetricsGetter)

if dialConfig.tlsEnabled {
config.Net.TLS.Enable = true
Expand Down Expand Up @@ -1176,7 +1181,8 @@ func makeKafkaSink(
return nil, errors.Errorf(`%s is not yet supported`, changefeedbase.SinkParamSchemaTopic)
}

config, err := buildKafkaConfig(ctx, u, jsonStr)
m := mb(requiresResourceAccounting)
config, err := buildKafkaConfig(ctx, u, jsonStr, m.newKafkaMetricsGetter())
if err != nil {
return nil, err
}
Expand All @@ -1195,7 +1201,7 @@ func makeKafkaSink(
ctx: ctx,
kafkaCfg: config,
bootstrapAddrs: u.Host,
metrics: mb(requiresResourceAccounting),
metrics: m,
topics: topics,
disableInternalRetry: !internalRetryEnabled,
}
Expand Down Expand Up @@ -1234,3 +1240,26 @@ func (s *kafkaStats) String() string {
atomic.LoadInt64(&s.largestMessageSize),
)
}

type metricsRegistryInterceptor struct {
metrics.Registry
kafkaThrottlingNanos metrics.Histogram
}

var _ metrics.Registry = (*metricsRegistryInterceptor)(nil)

func newMetricsRegistryInterceptor(kafkaMetrics KafkaMetricsGetter) *metricsRegistryInterceptor {
return &metricsRegistryInterceptor{
Registry: metrics.NewRegistry(),
kafkaThrottlingNanos: kafkaMetrics.GetKafkaThrottlingNanos(),
}
}

func (mri *metricsRegistryInterceptor) GetOrRegister(name string, i interface{}) interface{} {
const throttleTimeMsMetricsPrefix = "throttle-time-in-ms"
if strings.HasPrefix(name, throttleTimeMsMetricsPrefix) {
return mri.kafkaThrottlingNanos
} else {
return mri.Registry.GetOrRegister(name, i)
}
}
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ func (r *telemetryMetricsRecorder) newParallelIOMetricsRecorder() parallelIOMetr
return r.inner.newParallelIOMetricsRecorder()
}

func (r *telemetryMetricsRecorder) newKafkaMetricsGetter() KafkaMetricsGetter {
return r.inner.newKafkaMetricsGetter()
}

// continuousTelemetryInterval determines the interval at which each node emits telemetry events
// during the lifespan of each enterprise changefeed.
var continuousTelemetryInterval = settings.RegisterDurationSetting(
Expand Down

0 comments on commit c662b6f

Please sign in to comment.