Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: handle Kafka quota messages #104446

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -4425,6 +4425,16 @@ def go_deps():
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/hetznercloud/hcloud-go/com_github_hetznercloud_hcloud_go-v1.32.0.zip",
],
)
go_repository(
name = "com_github_honoredb_sarama",
build_file_proto_mode = "disable_global",
importpath = "github.com/HonoreDB/sarama",
sha256 = "a542c811c9a8a554b6b329e5e4ba75025a1a524cf625f386d272d75564922f8c",
strip_prefix = "github.com/HonoreDB/[email protected]",
urls = [
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/HonoreDB/sarama/com_github_honoredb_sarama-v0.0.0-20230606201949-ac24661e0077.zip",
],
)
go_repository(
name = "com_github_howeyc_gopass",
build_file_proto_mode = "disable_global",
Expand Down
1 change: 1 addition & 0 deletions build/bazelutil/distdir_files.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ DISTDIR_FILES = {
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/GeertJohan/go.rice/com_github_geertjohan_go_rice-v1.0.0.zip": "2fc48b9422bf356c18ed3fe32ec52f6a8b87ac168f83d2eed249afaebcc3eeb8",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/GoogleCloudPlatform/cloudsql-proxy/com_github_googlecloudplatform_cloudsql_proxy-v0.0.0-20190129172621-c8b1d7a94ddf.zip": "d18ff41309efc943c71d5c8faa5b1dd792700a79fa4f61508c5e50f17fc9ca6f",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/HdrHistogram/hdrhistogram-go/com_github_hdrhistogram_hdrhistogram_go-v1.1.2.zip": "bbc1d64d3179248c78ffa3729ad2ab696ed1ff14874f37d8d4fc4a5a235fa77f",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/HonoreDB/sarama/com_github_honoredb_sarama-v0.0.0-20230606201949-ac24661e0077.zip": "a542c811c9a8a554b6b329e5e4ba75025a1a524cf625f386d272d75564922f8c",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/JohnCGriffin/overflow/com_github_johncgriffin_overflow-v0.0.0-20211019200055-46fa312c352c.zip": "8ad4da840214861386d243127290666cc54eb914d1f4a8856523481876af2a09",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/Joker/hpp/com_github_joker_hpp-v1.0.0.zip": "790dc3cfb8e51ff22f29d74b5b58782999e267e86290bc2b52485ccf9c8d2792",
"https://storage.googleapis.com/cockroach-godeps/gomod/github.com/Julusian/godocdown/com_github_julusian_godocdown-v0.0.0-20170816220326-6d19f8ff2df8.zip": "1bd26f1d29b20d40b3eb0a5678691a2e6e153c473efe079b8b1bbd97a7cc1f57",
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@ require (
github.com/Azure/go-autorest/logger v0.2.1 // indirect
github.com/Azure/go-autorest/tracing v0.6.0 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 // indirect
github.com/HonoreDB/sarama v0.0.0-20230606201949-ac24661e0077 // indirect
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
github.com/Masterminds/goutils v1.1.0 // indirect
github.com/Masterminds/semver v1.5.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,8 @@ github.com/GeertJohan/go.incremental v1.0.0/go.mod h1:6fAjUhbVuX1KcMD3c8TEgVUqmo
github.com/GeertJohan/go.rice v1.0.0/go.mod h1:eH6gbSOAUv07dQuZVnBmoDP8mgsM1rtixis4Tib9if0=
github.com/GoogleCloudPlatform/cloudsql-proxy v0.0.0-20190129172621-c8b1d7a94ddf/go.mod h1:aJ4qN3TfrelA6NZ6AXsXRfmEVaYin3EDbSPJrKS8OXo=
github.com/HdrHistogram/hdrhistogram-go v1.1.0/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo=
github.com/HonoreDB/sarama v0.0.0-20230606201949-ac24661e0077 h1:H8IUAu2Y/CWomK6sAXgjR00oDXZxRhUmSAcpYkADIX0=
github.com/HonoreDB/sarama v0.0.0-20230606201949-ac24661e0077/go.mod h1:4WPhIdtb3uqlDcD3D8rMrPcS1s6bEHLJ3QfKjDZoOtw=
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c h1:RGWPOewvKIROun94nF7v2cua9qP+thov/7M50KEoeSU=
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0CRv0ky0k6m906ixxpzmDRLvX58TFUKS2eePweuyxk=
github.com/Joker/hpp v1.0.0/go.mod h1:8x5n+M1Hp5hC0g8okX3sR3vFQwynaX/UgSOM9MeBKzY=
Expand Down
6 changes: 4 additions & 2 deletions pkg/ccl/changefeedccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,11 @@ go_library(
"@com_github_gogo_protobuf//jsonpb",
"@com_github_gogo_protobuf//types",
"@com_github_google_btree//:btree",
"@com_github_honoredb_sarama//:sarama",
"@com_github_klauspost_compress//zstd",
"@com_github_klauspost_pgzip//:pgzip",
"@com_github_linkedin_goavro_v2//:goavro",
"@com_github_shopify_sarama//:sarama",
"@com_github_rcrowley_go_metrics//:go-metrics",
"@com_github_xdg_go_scram//:scram",
"@com_google_cloud_go_pubsub//:pubsub",
"@com_google_cloud_go_pubsub//apiv1",
Expand Down Expand Up @@ -311,9 +312,10 @@ go_test(
"@com_github_cockroachdb_errors//:errors",
"@com_github_dustin_go_humanize//:go-humanize",
"@com_github_gogo_protobuf//types",
"@com_github_honoredb_sarama//:sarama",
"@com_github_jackc_pgx_v4//:pgx",
"@com_github_lib_pq//:pq",
"@com_github_shopify_sarama//:sarama",
"@com_github_rcrowley_go_metrics//:go-metrics",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@com_google_cloud_go_pubsub//apiv1",
Expand Down
5 changes: 5 additions & 0 deletions pkg/ccl/changefeedccl/cdcutils/throttle.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ func (t *Throttler) AcquireFlushQuota(ctx context.Context) error {
return waitQuota(ctx, 1, t.flushLimiter, t.metrics.FlushPushbackNanos)
}

// UpdateBytePerSecondRate updates only the ByteRate of t.
func (t *Throttler) UpdateBytePerSecondRate(newRate float64) {
t.byteLimiter.UpdateLimit(quotapool.Limit(newRate), int64(newRate))
}

func (t *Throttler) updateConfig(config changefeedbase.SinkThrottleConfig) {
setLimits := func(rl *quotapool.RateLimiter, rate, burst float64) {
// set rateBudget to unlimited if rate is 0.
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/changefeed_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/HonoreDB/sarama"
"github.com/cockroachdb/cockroach-go/v2/crdb"
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdceval"
Expand Down
17 changes: 17 additions & 0 deletions pkg/ccl/changefeedccl/changefeedbase/settings.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,23 @@ var BatchReductionRetryEnabled = settings.RegisterBoolSetting(
false,
).WithPublic()

// BatchReductionRetryEnabled enables the temporary reduction of batch sizes upon kafka message too large errors
var EnableBackpressureFromKafkaThrottle = settings.RegisterBoolSetting(
settings.TenantWritable,
"changefeed.enable_backpressure_from_kafka_throttle",
"if true, kafka changefeeds will slow down and apply backpressure upon getting a Throttle response from a Kafka broker",
util.ConstantWithMetamorphicTestBool("changefeed.enable_backpressure_from_kafka_throttle", true),
).WithPublic()

// BatchReductionRetryEnabled enables the temporary reduction of batch sizes upon kafka message too large errors
var KafkaThrottlingSensitivity = settings.RegisterFloatSetting(
settings.TenantWritable,
"changefeed.kafka_throttling_sensitivity",
"controls how aggressively kafka changefeeds adjust their throughput in response to repeated throttle responses",
0.01,
settings.NonNegativeFloat,
).WithPublic()

// UseMuxRangeFeed enables the use of MuxRangeFeed RPC.
var UseMuxRangeFeed = settings.RegisterBoolSetting(
settings.TenantWritable,
Expand Down
2 changes: 1 addition & 1 deletion pkg/ccl/changefeedccl/scram_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"crypto/sha256"
"crypto/sha512"

"github.com/Shopify/sarama"
"github.com/HonoreDB/sarama"
"github.com/xdg-go/scram"
)

Expand Down
184 changes: 173 additions & 11 deletions pkg/ccl/changefeedccl/sink_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ import (
"fmt"
"hash/fnv"
"math"
"math/rand"
"net/url"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/Shopify/sarama"
"github.com/HonoreDB/sarama"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdcutils"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase"
"github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
Expand All @@ -36,6 +38,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
saramaMetrics "github.com/rcrowley/go-metrics"
"golang.org/x/oauth2"
"golang.org/x/oauth2/clientcredentials"
)
Expand Down Expand Up @@ -86,6 +89,11 @@ func init() {
ctx = logtags.AddTag(ctx, "kafka-producer", nil)
sarama.Logger = &kafkaLogAdapter{ctx: ctx}

// go-metrics has a global singleton goroutine for sampling rates, which
// gets started lazily and can't be stopped. Trigger it here so it doesn't
// look like a leaked goroutine in tests.
saramaMetrics.NewMeter().Stop()

// Sarama should not be rejecting messages based on some arbitrary limits.
// This sink already manages its resource usage. Sarama should attempt to deliver
// messages, no matter their size. Of course, the downstream kafka may reject
Expand Down Expand Up @@ -119,10 +127,12 @@ type kafkaSink struct {

lastMetadataRefresh time.Time

stopWorkerCh chan struct{}
worker sync.WaitGroup
scratch bufalloc.ByteAllocator
metrics metricsRecorder
stopWorkerCh chan struct{}
worker sync.WaitGroup
scratch bufalloc.ByteAllocator
metrics metricsRecorder
isThrottlingBackpressureEnabled func() bool
throttlingAdjustmentSensitivity float64

knobs kafkaSinkKnobs

Expand All @@ -136,6 +146,17 @@ type kafkaSink struct {
flushCh chan struct{}
}

// Enforces throttling from the broker by passing along backpressure to the
// changefeed.
throttler struct {
syncutil.RWMutex
throttleUntil time.Time
sampler saramaMetrics.Meter
rateLimiter *cdcutils.Throttler
previousByteRate float64
fudgeMultiplier float64
}

disableInternalRetry bool
}

Expand Down Expand Up @@ -256,6 +277,8 @@ func (s *kafkaSink) Dial() error {
s.client = client
s.producer = producer

addOnThrottleCallback(s)

// Start the worker
s.stopWorkerCh = make(chan struct{})
s.worker.Add(1)
Expand Down Expand Up @@ -320,6 +343,7 @@ func (s *kafkaSink) Close() error {
// down or beginning to retry regardless
_ = s.producer.Close()
}

// s.client is only nil in tests.
if s.client != nil {
return s.client.Close()
Expand Down Expand Up @@ -458,6 +482,9 @@ func (s *kafkaSink) startInflightMessage(ctx context.Context) error {
}

func (s *kafkaSink) emitMessage(ctx context.Context, msg *sarama.ProducerMessage) error {
if err := s.maybeThrottle(ctx, msg); err != nil {
return err
}
if err := s.startInflightMessage(ctx); err != nil {
return err
}
Expand All @@ -471,6 +498,108 @@ func (s *kafkaSink) emitMessage(ctx context.Context, msg *sarama.ProducerMessage
return nil
}

func (s *kafkaSink) throttleForMs(t int64) {
s.throttler.Lock()

// All new EmitRow() calls will block for the next t milliseconds.
throttleUntil := time.Now().Add(time.Millisecond * time.Duration(t))
if throttleUntil.After(s.throttler.throttleUntil) {
s.throttler.throttleUntil = throttleUntil
if log.V(1) {
log.Infof(s.ctx, "No more Kafka messages until %s", throttleUntil.String())
}
}

// (re-)calculate the byte rate we should limit ourselves to in order
// to keep the changefeed as a whole within quota. We only know our
// own byte rate at the point where we were throttled, but if every
// processor uses that as its estimate for the overall byte rate,
// we'll average out to the correct rate in aggregate.
// Kafka throttling messages give the amount of time we need to send
// no messages in order to get back under its per-second quota,
// so in addition to adhering to that, we want our future byte rate to be
// equal to the bytes we sent over the past second divided by the time
// we should have spent sending them, one second plus the throttle time.
recentBytesPerSecond := s.throttler.sampler.Rate1()
// There's a potential race condition where we get the throttling message
// before we've recorded metrics. Make an effort to block until we see the
// relevant metrics show up.
ctxWithDeadline, cancel := context.WithDeadline(s.ctx, time.Now().Add(time.Minute))
for recentBytesPerSecond == 0 {
select {
case <-ctxWithDeadline.Done():
log.Warning(s.ctx, "Recieved throttling message from Kafka but could not determine recent bytes per second")
recentBytesPerSecond = math.Inf(1)
default:
recentBytesPerSecond = s.throttler.sampler.Rate1()
}
}
cancel()

targetByteRate := (recentBytesPerSecond / float64(t+1000)) * 1000

if s.throttler.rateLimiter == nil {
metrics := cdcutils.MakeMetrics(time.Minute)
s.throttler.rateLimiter = cdcutils.NewThrottler(
"kafka-producer-throttling",
changefeedbase.SinkThrottleConfig{ByteRate: targetByteRate},
&metrics,
)
} else {
if s.throttler.previousByteRate == 0 || s.throttler.previousByteRate > targetByteRate {
s.throttler.rateLimiter.UpdateBytePerSecondRate(targetByteRate)
s.throttler.previousByteRate = targetByteRate
if log.V(1) {
log.Infof(s.ctx, "This changefeed emitter is now targeting a byte rate of %f/sec", targetByteRate)
}
} else {
// According to our metrics, we honored the previous throttling request but still got
// throttled again. Increase the fudge factor as it's empirically too low.
if s.throttler.fudgeMultiplier == 0 {
s.throttler.fudgeMultiplier = 1
}
s.throttler.fudgeMultiplier += s.throttlingAdjustmentSensitivity
if log.V(1) {
log.Infof(s.ctx, "Throttled multiple times at byte rate %v, so increasing byte rate multiplier to %v",
s.throttler.previousByteRate, s.throttler.fudgeMultiplier)
}

}
}
s.throttler.Unlock()
}

func (s *kafkaSink) maybeThrottle(ctx context.Context, msg *sarama.ProducerMessage) error {
if !s.isThrottlingBackpressureEnabled() {
return nil
}
s.throttler.RLock()
throttleFor := time.Until(s.throttler.throttleUntil)
s.throttler.RUnlock()
if throttleFor > 0 {
throttleFor *= time.Duration(1 + rand.Float64()/4)
log.Warningf(ctx, "throttling due to broker response for %v", throttleFor)
select {
case <-ctx.Done():
case <-time.After(throttleFor):
}
}
s.throttler.RLock()
defer s.throttler.RUnlock()
if s.throttler.rateLimiter != nil {
sz := msg.Key.Length() + msg.Value.Length()
if s.throttler.fudgeMultiplier > 0 {
sz = int(float64(sz) * s.throttler.fudgeMultiplier)
}
if log.V(2) {
log.Infof(ctx, "Kafka throttler may need to wait for rateLimiter quota...")
defer log.Infof(ctx, "Kafka throttler done waiting for quota")
}
return s.throttler.rateLimiter.AcquireMessageQuota(ctx, sz)
}
return nil
}

// isInternallyRetryable returns true if the sink should attempt to re-emit the
// messages with a non-batching config first rather than surfacing the error to
// the overarching feed.
Expand Down Expand Up @@ -1032,6 +1161,7 @@ func buildKafkaConfig(
if err := saramaCfg.Apply(config); err != nil {
return nil, errors.Wrap(err, "failed to apply kafka client configuration")
}

return config, nil
}

Expand Down Expand Up @@ -1065,13 +1195,21 @@ func makeKafkaSink(
internalRetryEnabled := settings != nil && changefeedbase.BatchReductionRetryEnabled.Get(&settings.SV)

sink := &kafkaSink{
ctx: ctx,
kafkaCfg: config,
bootstrapAddrs: u.Host,
metrics: mb(requiresResourceAccounting),
topics: topics,
disableInternalRetry: !internalRetryEnabled,
ctx: ctx,
kafkaCfg: config,
bootstrapAddrs: u.Host,
metrics: mb(requiresResourceAccounting),
topics: topics,
disableInternalRetry: !internalRetryEnabled,
isThrottlingBackpressureEnabled: func() bool { return changefeedbase.EnableBackpressureFromKafkaThrottle.Get(&settings.SV) },
throttlingAdjustmentSensitivity: changefeedbase.KafkaThrottlingSensitivity.Get(&settings.SV),
}

sampler, ok := config.MetricRegistry.GetOrRegister(string(saramaMetricOutgoingBytes), saramaMetrics.NewMeter).(saramaMetrics.Meter)
if !ok {
return nil, errors.AssertionFailedf("Expected outgoing-byte-rate to be a Meter but found %T", sampler)
}
sink.throttler.sampler = sampler

if unknownParams := u.remainingQueryParams(); len(unknownParams) > 0 {
return nil, errors.Errorf(
Expand All @@ -1081,6 +1219,30 @@ func makeKafkaSink(
return sink, nil
}

type saramaMetricName string

const saramaMetricOutgoingBytes saramaMetricName = "outgoing-byte-rate"

var logThrottle = log.Every(time.Minute)

func addOnThrottleCallback(s *kafkaSink) {
s.producer.AddCallback(func(pr *sarama.ProduceResponse, err error) {
if err == nil && pr != nil && pr.ThrottleTime > 0 {
if s.isThrottlingBackpressureEnabled() {
if logThrottle.ShouldLog() {
log.Warningf(s.ctx, "Honoring broker request to pause for %d ms", pr.ThrottleTime.Milliseconds())
}
s.throttleForMs(pr.ThrottleTime.Milliseconds())
} else {
if logThrottle.ShouldLog() {
log.Warningf(s.ctx, "Ignoring broker request to pause for %d ms. To honor, enable %s",
pr.ThrottleTime.Milliseconds(), changefeedbase.EnableBackpressureFromKafkaThrottle.Key())
}
}
}
})
}

type kafkaStats struct {
outstandingBytes int64
outstandingMessages int64
Expand Down
Loading