Skip to content

Commit

Permalink
changefeedccl: add observability metrics into sarama code
Browse files Browse the repository at this point in the history
Now that this patch (cockroachdb#117544) has been merged, sarama now acknowledges and
reacts to kafka server's throttling messages by slowing down. To provide better
observability into sarama code, this patch adds a metrics registry interceptor
and a new metrics `changefeed.kafka_throttling_hist_nanos` which tracks time (in
nanos) spent in sarama's throttling when cockroachdb exceed the kafka quota.

Fixes: cockroachdb#117618

Release note: changefeed.kafka_throttling_hist_nanos has now been added to
metrics to monitor sarama throttling behavior resulting from exceeding kafka
quota.
  • Loading branch information
wenyihu6 committed Feb 5, 2024
1 parent 0778173 commit a5bcfca
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 3 deletions.
1 change: 1 addition & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,7 @@
<tr><td>APPLICATION</td><td>changefeed.forwarded_resolved_messages</td><td>Resolved timestamps forwarded from the change aggregator to the change frontier</td><td>Messages</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.frontier_updates</td><td>Number of change frontier updates across all feeds</td><td>Updates</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.internal_retry_message_count</td><td>Number of messages for which an attempt to retry them within an aggregator node was made</td><td>Messages</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.kafka_throttling_hist_nanos</td><td>Time spent in throttling due to exceeding kafka quota</td><td>Nanoseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.lagging_ranges</td><td>The number of ranges considered to be lagging behind</td><td>Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.max_behind_nanos</td><td>(Deprecated in favor of checkpoint_progress) The most any changefeed&#39;s persisted checkpoint is behind the present</td><td>Nanoseconds</td><td>GAUGE</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.message_size_hist</td><td>Message size histogram</td><td>Bytes</td><td>HISTOGRAM</td><td>BYTES</td><td>AVG</td><td>NONE</td></tr>
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ go_library(
"@com_github_klauspost_compress//zstd",
"@com_github_klauspost_pgzip//:pgzip",
"@com_github_linkedin_goavro_v2//:goavro",
"@com_github_rcrowley_go_metrics//:go-metrics",
"@com_github_xdg_go_scram//:scram",
"@com_google_cloud_go_pubsub//:pubsub",
"@com_google_cloud_go_pubsub//apiv1",
Expand Down
111 changes: 111 additions & 0 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/multitenant"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log/logcrash"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/metric/aggmetric"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/rcrowley/go-metrics"
)

const (
Expand All @@ -35,6 +38,7 @@ const (
changefeedIOQueueMaxLatency = 5 * time.Minute
admitLatencyMaxValue = 1 * time.Minute
commitLatencyMaxValue = 10 * time.Minute
kafkaThrottlingTimeMaxValue = 5 * time.Minute
)

// max length for the scope name.
Expand Down Expand Up @@ -76,6 +80,7 @@ type AggMetrics struct {
CheckpointProgress *aggmetric.AggGauge
LaggingRanges *aggmetric.AggGauge
CloudstorageBufferedBytes *aggmetric.AggGauge
KafkaThrottlingNanos *aggmetric.AggHistogram

// There is always at least 1 sliMetrics created for defaultSLI scope.
mu struct {
Expand Down Expand Up @@ -106,6 +111,7 @@ type metricsRecorder interface {
newParallelIOMetricsRecorder() parallelIOMetricsRecorder
recordSinkIOInflightChange(int64)
makeCloudstorageFileAllocCallback() func(delta int64)
getKafkaThrottlingMetrics(*cluster.Settings) metrics.Histogram
}

var _ metricsRecorder = (*sliMetrics)(nil)
Expand Down Expand Up @@ -145,6 +151,7 @@ type sliMetrics struct {
CheckpointProgress *aggmetric.Gauge
LaggingRanges *aggmetric.Gauge
CloudstorageBufferedBytes *aggmetric.Gauge
KafkaThrottlingNanos *aggmetric.Histogram

mu struct {
syncutil.Mutex
Expand Down Expand Up @@ -325,6 +332,80 @@ func (m *sliMetrics) recordSizeBasedFlush() {
m.SizeBasedFlushes.Inc(1)
}

type kafkaHistogramAdapter struct {
settings *cluster.Settings
wrapped *aggmetric.Histogram
}

var _ metrics.Histogram = (*kafkaHistogramAdapter)(nil)

func (k *kafkaHistogramAdapter) Update(valueInMs int64) {
if k != nil {
// valueInMs is passed in from sarama with a unit of milliseconds. To
// convert this value to nanoseconds, valueInMs * 10^6 is recorded here.
k.wrapped.RecordValue(valueInMs * 1000000)
}
}

func (k *kafkaHistogramAdapter) Clear() {
logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Sum on kafkaHistogramAdapter")
}

func (k *kafkaHistogramAdapter) Count() (_ int64) {
logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Count on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) Max() (_ int64) {
logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Max on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) Mean() (_ float64) {
logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Mean on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) Min() (_ int64) {
logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Min on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) Percentile(float64) (_ float64) {
logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Percentile on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) Percentiles([]float64) (_ []float64) {
logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Percentiles on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) Sample() (_ metrics.Sample) {
logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Sample on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) Snapshot() (_ metrics.Histogram) {
logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Snapshot on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) StdDev() (_ float64) {
logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to StdDev on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) Sum() (_ int64) {
logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Sum on kafkaHistogramAdapter")
return
}

func (k *kafkaHistogramAdapter) Variance() (_ float64) {
logcrash.ReportOrPanic(context.Background(), &k.settings.SV /*settings.Values*/, "unexpected call to Variance on kafkaHistogramAdapter")
return
}

type parallelIOMetricsRecorder interface {
recordPendingQueuePush(numKeys int64)
recordPendingQueuePop(numKeys int64, latency time.Duration)
Expand Down Expand Up @@ -380,6 +461,16 @@ func (m *sliMetrics) newParallelIOMetricsRecorder() parallelIOMetricsRecorder {
}
}

func (m *sliMetrics) getKafkaThrottlingMetrics(settings *cluster.Settings) metrics.Histogram {
if m == nil {
return (*kafkaHistogramAdapter)(nil)
}
return &kafkaHistogramAdapter{
settings: settings,
wrapped: m.KafkaThrottlingNanos,
}
}

func (m *sliMetrics) recordSinkIOInflightChange(delta int64) {
if m == nil {
return
Expand Down Expand Up @@ -470,6 +561,12 @@ func (w *wrappingCostController) newParallelIOMetricsRecorder() parallelIOMetric
return w.inner.newParallelIOMetricsRecorder()
}

func (w *wrappingCostController) getKafkaThrottlingMetrics(
settings *cluster.Settings,
) metrics.Histogram {
return w.inner.getKafkaThrottlingMetrics(settings)
}

var (
metaChangefeedForwardedResolvedMessages = metric.Metadata{
Name: "changefeed.forwarded_resolved_messages",
Expand Down Expand Up @@ -721,6 +818,12 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
Measurement: "Bytes",
Unit: metric.Unit_COUNT,
}
metaChangefeedKafkaThrottlingNanos := metric.Metadata{
Name: "changefeed.kafka_throttling_hist_nanos",
Help: "Time spent in throttling due to exceeding kafka quota",
Measurement: "Nanoseconds",
Unit: metric.Unit_NANOSECONDS,
}

functionalGaugeMinFn := func(childValues []int64) int64 {
var min int64
Expand Down Expand Up @@ -813,6 +916,13 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
CheckpointProgress: b.FunctionalGauge(metaCheckpointProgress, functionalGaugeMinFn),
LaggingRanges: b.Gauge(metaLaggingRangePercentage),
CloudstorageBufferedBytes: b.Gauge(metaCloudstorageBufferedBytes),
KafkaThrottlingNanos: b.Histogram(metric.HistogramOptions{
Metadata: metaChangefeedKafkaThrottlingNanos,
Duration: histogramWindow,
MaxVal: kafkaThrottlingTimeMaxValue.Nanoseconds(),
SigFigs: 2,
BucketConfig: metric.BatchProcessLatencyBuckets,
}),
}
a.mu.sliMetrics = make(map[string]*sliMetrics)
_, err := a.getOrCreateScope(defaultSLIScope)
Expand Down Expand Up @@ -878,6 +988,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
SchemaRegistrations: a.SchemaRegistrations.AddChild(scope),
LaggingRanges: a.LaggingRanges.AddChild(scope),
CloudstorageBufferedBytes: a.CloudstorageBufferedBytes.AddChild(scope),
KafkaThrottlingNanos: a.KafkaThrottlingNanos.AddChild(scope),
}
sm.mu.resolved = make(map[int64]hlc.Timestamp)
sm.mu.checkpoint = make(map[int64]hlc.Timestamp)
Expand Down
35 changes: 32 additions & 3 deletions pkg/ccl/changefeedccl/sink_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/rcrowley/go-metrics"
"golang.org/x/oauth2"
"golang.org/x/oauth2/clientcredentials"
)
Expand Down Expand Up @@ -1078,7 +1079,10 @@ func buildConfluentKafkaConfig(u sinkURL) (kafkaDialConfig, error) {
}

func buildKafkaConfig(
ctx context.Context, u sinkURL, jsonStr changefeedbase.SinkSpecificJSONConfig,
ctx context.Context,
u sinkURL,
jsonStr changefeedbase.SinkSpecificJSONConfig,
kafkaThrottlingMetrics metrics.Histogram,
) (*sarama.Config, error) {
dialConfig, err := buildDialConfig(u)
if err != nil {
Expand All @@ -1090,6 +1094,7 @@ func buildKafkaConfig(
config.Producer.Partitioner = newChangefeedPartitioner
// Do not fetch metadata for all topics but just for the necessary ones.
config.Metadata.Full = false
config.MetricRegistry = newMetricsRegistryInterceptor(kafkaThrottlingMetrics)

if dialConfig.tlsEnabled {
config.Net.TLS.Enable = true
Expand Down Expand Up @@ -1176,7 +1181,8 @@ func makeKafkaSink(
return nil, errors.Errorf(`%s is not yet supported`, changefeedbase.SinkParamSchemaTopic)
}

config, err := buildKafkaConfig(ctx, u, jsonStr)
m := mb(requiresResourceAccounting)
config, err := buildKafkaConfig(ctx, u, jsonStr, m.getKafkaThrottlingMetrics(settings))
if err != nil {
return nil, err
}
Expand All @@ -1195,7 +1201,7 @@ func makeKafkaSink(
ctx: ctx,
kafkaCfg: config,
bootstrapAddrs: u.Host,
metrics: mb(requiresResourceAccounting),
metrics: m,
topics: topics,
disableInternalRetry: !internalRetryEnabled,
}
Expand Down Expand Up @@ -1234,3 +1240,26 @@ func (s *kafkaStats) String() string {
atomic.LoadInt64(&s.largestMessageSize),
)
}

type metricsRegistryInterceptor struct {
metrics.Registry
kafkaThrottlingNanos metrics.Histogram
}

var _ metrics.Registry = (*metricsRegistryInterceptor)(nil)

func newMetricsRegistryInterceptor(kafkaMetrics metrics.Histogram) *metricsRegistryInterceptor {
return &metricsRegistryInterceptor{
Registry: metrics.NewRegistry(),
kafkaThrottlingNanos: kafkaMetrics,
}
}

func (mri *metricsRegistryInterceptor) GetOrRegister(name string, i interface{}) interface{} {
const throttleTimeMsMetricsPrefix = "throttle-time-in-ms"
if strings.HasPrefix(name, throttleTimeMsMetricsPrefix) {
return mri.kafkaThrottlingNanos
} else {
return mri.Registry.GetOrRegister(name, i)
}
}
7 changes: 7 additions & 0 deletions pkg/ccl/changefeedccl/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/log/eventpb"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/rcrowley/go-metrics"
)

type sinkTelemetryData struct {
Expand Down Expand Up @@ -216,6 +217,12 @@ func (r *telemetryMetricsRecorder) newParallelIOMetricsRecorder() parallelIOMetr
return r.inner.newParallelIOMetricsRecorder()
}

func (r *telemetryMetricsRecorder) getKafkaThrottlingMetrics(
settings *cluster.Settings,
) metrics.Histogram {
return r.inner.getKafkaThrottlingMetrics(settings)
}

// continuousTelemetryInterval determines the interval at which each node emits telemetry events
// during the lifespan of each enterprise changefeed.
var continuousTelemetryInterval = settings.RegisterDurationSetting(
Expand Down

0 comments on commit a5bcfca

Please sign in to comment.