Skip to content

Commit

Permalink
changefeedccl: add observability metrics into sarama code
Browse files Browse the repository at this point in the history
This patch injects crdb cdc metrics into sarama code to provide more
observability into throttling behaviour from kafka.

Fixes: cockroachdb#117618
Release note: (to add later)

TODO(wenyihu6): discuss if this is a good approach + what other sarama metrics
are useful and should be added as well
  • Loading branch information
wenyihu6 committed Jan 24, 2024
1 parent e08886f commit b05a16e
Show file tree
Hide file tree
Showing 7 changed files with 151 additions and 9 deletions.
8 changes: 3 additions & 5 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -4527,11 +4527,9 @@ def go_deps():
name = "com_github_ibm_sarama",
build_file_proto_mode = "disable_global",
importpath = "github.com/IBM/sarama",
sha256 = "8d4bc76db8c94cd58f8ccb962892bab41d373623296cf18e5df7117bf3ec0546",
strip_prefix = "github.com/IBM/[email protected]",
urls = [
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/IBM/sarama/com_github_ibm_sarama-v1.42.1.zip",
],
# TODO: mirror this repo (to fix, run `./dev generate bazel --mirror`)
sum = "h1:wugyWa15TDEHh2kvq2gAy1IHLjEjuYOYgXz/ruC/OSQ=",
version = "v1.42.1",
)
go_repository(
name = "com_github_icrowley_fake",
Expand Down
1 change: 0 additions & 1 deletion build/bazelutil/distdir_files.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,6 @@ DISTDIR_FILES = {
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/GeertJohan/go.rice/com_github_geertjohan_go_rice-v1.0.0.zip": "2fc48b9422bf356c18ed3fe32ec52f6a8b87ac168f83d2eed249afaebcc3eeb8",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/GoogleCloudPlatform/cloudsql-proxy/com_github_googlecloudplatform_cloudsql_proxy-v0.0.0-20190129172621-c8b1d7a94ddf.zip": "d18ff41309efc943c71d5c8faa5b1dd792700a79fa4f61508c5e50f17fc9ca6f",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/HdrHistogram/hdrhistogram-go/com_github_hdrhistogram_hdrhistogram_go-v1.1.2.zip": "bbc1d64d3179248c78ffa3729ad2ab696ed1ff14874f37d8d4fc4a5a235fa77f",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/IBM/sarama/com_github_ibm_sarama-v1.42.1.zip": "8d4bc76db8c94cd58f8ccb962892bab41d373623296cf18e5df7117bf3ec0546",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/JohnCGriffin/overflow/com_github_johncgriffin_overflow-v0.0.0-20211019200055-46fa312c352c.zip": "8ad4da840214861386d243127290666cc54eb914d1f4a8856523481876af2a09",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/Joker/hpp/com_github_joker_hpp-v1.0.0.zip": "790dc3cfb8e51ff22f29d74b5b58782999e267e86290bc2b52485ccf9c8d2792",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/Joker/jade/com_github_joker_jade-v1.1.3.zip": "33ab19f851ef3c58983eeb66f608c01be312ebac0f2cea61df5218490d6b5043",
Expand Down
1 change: 1 addition & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,7 @@
<tr><td>APPLICATION</td><td>changefeed.sink_batch_hist_nanos</td><td>Time spent batched in the sink buffer before being flushed and acknowledged</td><td>Changefeeds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.sink_io_inflight</td><td>The number of keys currently inflight as IO requests being sent to the sink</td><td>Messages</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.size_based_flushes</td><td>Total size based flushes across all feeds</td><td>Flushes</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>APPLICATION</td><td>changefeed.throttle_time_in_ms</td><td>Throttling tims spent in ms due to kafka quota</td><td>Milliseconds</td><td>HISTOGRAM</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>clock-offset.meannanos</td><td>Mean clock offset with other nodes</td><td>Clock Offset</td><td>GAUGE</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>clock-offset.stddevnanos</td><td>Stddev clock offset with other nodes</td><td>Clock Offset</td><td>GAUGE</td><td>NANOSECONDS</td><td>AVG</td><td>NONE</td></tr>
<tr><td>APPLICATION</td><td>cloud.conns_opened</td><td>HTTP connections opened by cloud operations</td><td>Connections</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
Expand Down
1 change: 1 addition & 0 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ go_library(
"@com_github_klauspost_compress//zstd",
"@com_github_klauspost_pgzip//:pgzip",
"@com_github_linkedin_goavro_v2//:goavro",
"@com_github_rcrowley_go_metrics//:go-metrics",
"@com_github_xdg_go_scram//:scram",
"@com_google_cloud_go_pubsub//:pubsub",
"@com_google_cloud_go_pubsub//apiv1",
Expand Down
109 changes: 109 additions & 0 deletions pkg/ccl/changefeedccl/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/metric/aggmetric"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/rcrowley/go-metrics"
)

const (
Expand All @@ -47,6 +48,7 @@ const defaultSLIScope = "default"
// AggMetrics are aggregated metrics keeping track of aggregated changefeed performance
// indicators, combined with a limited number of per-changefeed indicators.
type AggMetrics struct {
ThrottlingTimeMs *aggmetric.AggHistogram
EmittedMessages *aggmetric.AggCounter
EmittedBatchSizes *aggmetric.AggHistogram
FilteredMessages *aggmetric.AggCounter
Expand Down Expand Up @@ -106,6 +108,7 @@ type metricsRecorder interface {
newParallelIOMetricsRecorder() parallelIOMetricsRecorder
recordSinkIOInflightChange(int64)
makeCloudstorageFileAllocCallback() func(delta int64)
newKafkaMetricsGetter() KafkaMetricsGetter
}

var _ metricsRecorder = (*sliMetrics)(nil)
Expand All @@ -116,6 +119,7 @@ func (a *AggMetrics) MetricStruct() {}

// sliMetrics holds all SLI related metrics aggregated into AggMetrics.
type sliMetrics struct {
ThrottlingTimeMs *aggmetric.Histogram
EmittedMessages *aggmetric.Counter
EmittedBatchSizes *aggmetric.Histogram
FilteredMessages *aggmetric.Counter
Expand Down Expand Up @@ -325,6 +329,81 @@ func (m *sliMetrics) recordSizeBasedFlush() {
m.SizeBasedFlushes.Inc(1)
}

type kafkaHistogramAdapter struct {
wrapped *aggmetric.Histogram
}

func (k *kafkaHistogramAdapter) Clear() {
panic("clear is not expected to be called on kafkaHistogramAdapter")
}

func (k *kafkaHistogramAdapter) Count() int64 {
panic("count is not expected to be called on kafkaHistogramAdapter")
}

func (k *kafkaHistogramAdapter) Max() int64 {
panic("max is not expected to be called on kafkaHistogramAdapter")
}

func (k *kafkaHistogramAdapter) Mean() float64 {
panic("mean is not expected to be called on kafkaHistogramAdapter")
}

func (k *kafkaHistogramAdapter) Min() int64 {
panic("min is not expected to be called on kafkaHistogramAdapter")
}

func (k *kafkaHistogramAdapter) Percentile(float64) float64 {
panic("percentile is not expected to be called on kafkaHistogramAdapter")
}

func (k *kafkaHistogramAdapter) Percentiles([]float64) []float64 {
panic("percentiles is not expected to be called on kafkaHistogramAdapter")
}

func (k *kafkaHistogramAdapter) Sample() metrics.Sample {
panic("sample is not expected to be called on kafkaHistogramAdapter")
}

func (k *kafkaHistogramAdapter) Snapshot() metrics.Histogram {
panic("snapshot is not expected to be called on kafkaHistogramAdapter")
}

func (k *kafkaHistogramAdapter) StdDev() float64 {
panic("stdDev is not expected to be called on kafkaHistogramAdapter")
}

func (k *kafkaHistogramAdapter) Sum() int64 {
panic("sum is not expected to be called on kafkaHistogramAdapter")
}

func (k *kafkaHistogramAdapter) Variance() float64 {
panic("variance is not expected to be called on kafkaHistogramAdapter")
}

var _ metrics.Histogram = (*kafkaHistogramAdapter)(nil)

func (k *kafkaHistogramAdapter) Update(v int64) {
if k != nil {
k.wrapped.RecordValue(v)
}
}

type KafkaMetricsGetter interface {
GetThrottlingTimeInMs() *kafkaHistogramAdapter
}

type kafkaMetricsGetterImpl struct {
throttlingTimeInMs *kafkaHistogramAdapter
}

func (kg *kafkaMetricsGetterImpl) GetThrottlingTimeInMs() *kafkaHistogramAdapter {
if kg == nil {
return (*kafkaHistogramAdapter)(nil)
}
return kg.throttlingTimeInMs
}

type parallelIOMetricsRecorder interface {
recordPendingQueuePush(numKeys int64)
recordPendingQueuePop(numKeys int64, latency time.Duration)
Expand Down Expand Up @@ -380,6 +459,17 @@ func (m *sliMetrics) newParallelIOMetricsRecorder() parallelIOMetricsRecorder {
}
}

func (m *sliMetrics) newKafkaMetricsGetter() KafkaMetricsGetter {
if m == nil {
return (*kafkaMetricsGetterImpl)(nil)
}
return &kafkaMetricsGetterImpl{
throttlingTimeInMs: &kafkaHistogramAdapter{
wrapped: m.ThrottlingTimeMs,
},
}
}

func (m *sliMetrics) recordSinkIOInflightChange(delta int64) {
if m == nil {
return
Expand Down Expand Up @@ -470,6 +560,10 @@ func (w *wrappingCostController) newParallelIOMetricsRecorder() parallelIOMetric
return w.inner.newParallelIOMetricsRecorder()
}

func (w *wrappingCostController) newKafkaMetricsGetter() KafkaMetricsGetter {
return w.inner.newKafkaMetricsGetter()
}

var (
metaChangefeedForwardedResolvedMessages = metric.Metadata{
Name: "changefeed.forwarded_resolved_messages",
Expand Down Expand Up @@ -548,6 +642,13 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
Measurement: "Messages",
Unit: metric.Unit_COUNT,
}
metaThrottleTimeInMs := metric.Metadata{
// TODO(wenyihu): add ms to ns conversion
Name: "changefeed.throttle_time_in_ms",
Help: "Throttling tims spent in ms due to kafka quota",
Measurement: "Milliseconds",
Unit: metric.Unit_NANOSECONDS,
}
metaChangefeedEmittedBatchSizes := metric.Metadata{
Name: "changefeed.emitted_batch_sizes",
Help: "Size of batches emitted emitted by all feeds",
Expand Down Expand Up @@ -736,6 +837,13 @@ func newAggregateMetrics(histogramWindow time.Duration) *AggMetrics {
// retain significant figures of 2.
b := aggmetric.MakeBuilder("scope")
a := &AggMetrics{
ThrottlingTimeMs: b.Histogram(metric.HistogramOptions{
Metadata: metaThrottleTimeInMs,
Duration: histogramWindow,
MaxVal: 16e6, // TODO(wenyihu): check the options here
SigFigs: 1,
BucketConfig: metric.DataCount16MBuckets,
}),
ErrorRetries: b.Counter(metaChangefeedErrorRetries),
EmittedMessages: b.Counter(metaChangefeedEmittedMessages),
EmittedBatchSizes: b.Histogram(metric.HistogramOptions{
Expand Down Expand Up @@ -851,6 +959,7 @@ func (a *AggMetrics) getOrCreateScope(scope string) (*sliMetrics, error) {
}

sm := &sliMetrics{
ThrottlingTimeMs: a.ThrottlingTimeMs.AddChild(scope),
EmittedMessages: a.EmittedMessages.AddChild(scope),
EmittedBatchSizes: a.EmittedBatchSizes.AddChild(scope),
FilteredMessages: a.FilteredMessages.AddChild(scope),
Expand Down
36 changes: 33 additions & 3 deletions pkg/ccl/changefeedccl/sink_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
"github.com/rcrowley/go-metrics"
"golang.org/x/oauth2"
"golang.org/x/oauth2/clientcredentials"
)
Expand Down Expand Up @@ -1078,7 +1079,10 @@ func buildConfluentKafkaConfig(u sinkURL) (kafkaDialConfig, error) {
}

func buildKafkaConfig(
ctx context.Context, u sinkURL, jsonStr changefeedbase.SinkSpecificJSONConfig,
ctx context.Context,
u sinkURL,
jsonStr changefeedbase.SinkSpecificJSONConfig,
kafkaMetricsGetter KafkaMetricsGetter,
) (*sarama.Config, error) {
dialConfig, err := buildDialConfig(u)
if err != nil {
Expand All @@ -1090,6 +1094,7 @@ func buildKafkaConfig(
config.Producer.Partitioner = newChangefeedPartitioner
// Do not fetch metadata for all topics but just for the necessary ones.
config.Metadata.Full = false
config.MetricRegistry = newMetricsRegistryInterceptor(kafkaMetricsGetter)

if dialConfig.tlsEnabled {
config.Net.TLS.Enable = true
Expand Down Expand Up @@ -1176,7 +1181,8 @@ func makeKafkaSink(
return nil, errors.Errorf(`%s is not yet supported`, changefeedbase.SinkParamSchemaTopic)
}

config, err := buildKafkaConfig(ctx, u, jsonStr)
m := mb(requiresResourceAccounting)
config, err := buildKafkaConfig(ctx, u, jsonStr, m.newKafkaMetricsGetter())
if err != nil {
return nil, err
}
Expand All @@ -1195,7 +1201,7 @@ func makeKafkaSink(
ctx: ctx,
kafkaCfg: config,
bootstrapAddrs: u.Host,
metrics: mb(requiresResourceAccounting),
metrics: m,
topics: topics,
disableInternalRetry: !internalRetryEnabled,
}
Expand Down Expand Up @@ -1234,3 +1240,27 @@ func (s *kafkaStats) String() string {
atomic.LoadInt64(&s.largestMessageSize),
)
}

type metricsRegistryInterceptor struct {
metrics.Registry
// do we want to implement the histogram interface so that it gets called when update is called
throttleTimeMs metrics.Histogram
}

var _ metrics.Registry = (*metricsRegistryInterceptor)(nil)

func newMetricsRegistryInterceptor(kafkaMetrics KafkaMetricsGetter) *metricsRegistryInterceptor {
return &metricsRegistryInterceptor{
Registry: metrics.NewRegistry(),
throttleTimeMs: kafkaMetrics.GetThrottlingTimeInMs(),
}
}

func (mri *metricsRegistryInterceptor) GetOrRegister(name string, i interface{}) interface{} {
const throttleTimeMsMetricsPrefix = "throttle-time-in-ms"
if strings.HasPrefix(name, throttleTimeMsMetricsPrefix) {
return mri.throttleTimeMs
} else {
return mri.Registry.GetOrRegister(name, i)
}
}
4 changes: 4 additions & 0 deletions pkg/ccl/changefeedccl/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,10 @@ func (r *telemetryMetricsRecorder) newParallelIOMetricsRecorder() parallelIOMetr
return r.inner.newParallelIOMetricsRecorder()
}

func (r *telemetryMetricsRecorder) newKafkaMetricsGetter() KafkaMetricsGetter {
return r.inner.newKafkaMetricsGetter()
}

// continuousTelemetryInterval determines the interval at which each node emits telemetry events
// during the lifespan of each enterprise changefeed.
var continuousTelemetryInterval = settings.RegisterDurationSetting(
Expand Down

0 comments on commit b05a16e

Please sign in to comment.