Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: handle Kafka quota messages #104446

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

HonoreDB
Copy link
Contributor

@HonoreDB HonoreDB commented Jun 6, 2023

Sarama notes, but does not honor, throttling requests from Kafka. The async producer doesn't give a simple way of letting them apply backpressure to changefeeds, so we end up buffering a bunch of messages until either we decide the sink has become unavailable or we release a burst of messages and promptly get throttled again.

This PR addresses that by injecting a hook into Sarama to add a cdc.Throttler to the Kafka sink. This
results in backpressure in EmitRow, as with other slow sink situations, which should lead to better behavior.

sarama.AsyncProducer makes it difficult to inspect the ProduceResponse.ThrottleTime, so this PR includes a temporary fork of Sarama to make it visible.

Addresses #92290

Release note (enterprise change): Changefeeds to a Kafka sink now honor the producer_byte_rate configuration if set in the Kafka cluster.

@HonoreDB HonoreDB requested review from a team as code owners June 6, 2023 20:59
@HonoreDB HonoreDB requested review from smg260, renatolabs and jayshrivastava and removed request for a team June 6, 2023 20:59
@cockroach-teamcity
Copy link
Member

This change is Reviewable

Copy link
Contributor

@miretskiy miretskiy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, I like the direction of this PR; have many comments mostly due to many questions I have.

Reviewed 8 of 13 files at r1, all commit messages.
Reviewable status: :shipit: complete! 0 of 0 LGTMs obtained (waiting on @HonoreDB, @jayshrivastava, @renatolabs, and @smg260)


-- commits line 20 at r1:
do we want to have a setting to turn it off in case of something?


go.mod line 252 at r1 (raw file):

	github.com/Azure/go-autorest/tracing v0.6.0 // indirect
	github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 // indirect
	github.com/HonoreDB/sarama v0.0.0-20230606201949-ac24661e0077 // indirect

I would prefer if you've made a fork to a new repository under cockroachdb; we do have
prior art for this (errors package, apd package, etc).

Your commits to that repository should also be reviewed (I did look at your changes and they are
very reasonable.)


pkg/ccl/changefeedccl/sink_kafka.go line 86 at r1 (raw file):

func init() {

nit: drop \n?


pkg/ccl/changefeedccl/sink_kafka.go line 96 at r1 (raw file):

	// gets started lazily and can't be stopped. Trigger it here so it doesn't
	// look like a leaked goroutine in tests.
	saramaMetrics.NewMeter().Stop()

we still want to muck w/ sarama metrics?


pkg/ccl/changefeedccl/sink_kafka.go line 484 at r1 (raw file):

func (s *kafkaSink) emitMessage(ctx context.Context, msg *sarama.ProducerMessage) error {
	if err := s.maybeThrottle(ctx, msg); err != nil {

do we want to add a fail safe cluster setting to disable throttling?


pkg/ccl/changefeedccl/sink_kafka.go line 521 at r1 (raw file):

	// so in addition to adhering to that, we want our future byte rate to be
	// equal to the bytes we sent over the past second divided by the time
	// we should have spent sending them, one second plus the throttle time.

Would be nice to elevate this comment to the top of the function;
Also, please talk about the fact that throttling is per topic (broker), but we have global throttling across topics (that's fine -- and I think the simplicity is justified; just want to make sure we document this choice)


pkg/ccl/changefeedccl/sink_kafka.go line 527 at r1 (raw file):

	// relevant metrics show up.
	ctxWithDeadline, cancel := context.WithDeadline(s.ctx, time.Now().Add(time.Minute))
	for recentBytesPerSecond == 0 {

took me a second to understand what's going on. Even though this is described as a race, the above will be a busy
loop (until we get non-zero metrics).

I think you should be a bit more explicit in this race description -- namely that the collection is based on 5 second timer.
(Or at least that's what it looks to me)

One question is: is it guaranteed that we'll ever get non-zero?

At any rate, I think a better way to wait for this is to use something like:

retry.Options{
   InitialBackoff: 1 * time.Second,
   MaxBackoff:    2 * time.Second,
   MaxRetries:     30,
}.Do(func(ctx context.Context) error {
   // Your select goes here.
})

pkg/ccl/changefeedccl/sink_kafka.go line 541 at r1 (raw file):

	if s.throttler.rateLimiter == nil {
		metrics := cdcutils.MakeMetrics(time.Minute)

this is not going to work as expected as those metrics would not be added to the metrics registry.
You can either use https://github.com/cockroachdb/cockroach/blob/83da76bb9d08beedcbdac1e1ee93a0f06a5505d4/pkg/ccl/changefeedccl/metrics.go#L691-L691 or add kafka specific metrics; but metrics must be created once and added to the metrics registry...


pkg/ccl/changefeedccl/sink_kafka.go line 543 at r1 (raw file):

		metrics := cdcutils.MakeMetrics(time.Minute)
		s.throttler.rateLimiter = cdcutils.NewThrottler(
			"kafka-broker-throttling",

technically, we're not throttling per broker, are we?


pkg/ccl/changefeedccl/sink_kafka.go line 551 at r1 (raw file):

			s.throttler.rateLimiter.UpdateBytePerSecondRate(targetByteRate)
			s.throttler.previousByteRate = targetByteRate
			//if log.V(1) {

uncomment?


pkg/ccl/changefeedccl/sink_kafka.go line 560 at r1 (raw file):

				s.throttler.fudgeMultiplier = 1
			}
			s.throttler.fudgeMultiplier += 0.01

I think these increases (sensitivity?) should be cluster setting ...


pkg/ccl/changefeedccl/sink_kafka.go line 568 at r1 (raw file):

		}
	}
	s.throttler.Unlock()

This depends on the answer on whether or not this method is called many times for each message once we reach some limit (see question below)...

You should consider having this callback be much simpler and just store the the throttle time in an atomic.Int64.
The emit row code path is where I think we should apply this throttling and execute the above logic to initialize/update
limiters. Doing so would likely result in removal of this lock; and I think having everything done in 1 code path is easier to read and follow.


pkg/ccl/changefeedccl/sink_kafka.go line 574 at r1 (raw file):

	s.throttler.RLock()
	throttleFor := time.Until(s.throttler.throttleUntil)
	s.throttler.RUnlock()

please add a comment here explaining why locking/unlocking is safe.
Namely, this whole logic works only because kafkaSink allows single EmitRow at a time (otherwise, as soon as we unlock, we'd get more messages coming in -- sleeping for different amount of time, and producing all sorts of out of order events).


pkg/ccl/changefeedccl/sink_kafka.go line 576 at r1 (raw file):

	s.throttler.RUnlock()
	if throttleFor > 0 {
		throttleFor *= time.Duration(1 + rand.Float64())

should we allow for smaller hysteresis (maybe 20-25% max)? doubling might be too aggressive.


pkg/ccl/changefeedccl/sink_kafka.go line 577 at r1 (raw file):

	if throttleFor > 0 {
		throttleFor *= time.Duration(1 + rand.Float64())
		log.Warningf(ctx, "throttling due to broker response for %v", throttleFor)

should we use log.Every here?


pkg/ccl/changefeedccl/sink_kafka.go line 580 at r1 (raw file):

		select {
		case <-ctx.Done():
		case <-time.After(throttleFor):

should we use timer instead? time.After will not be GCed until timer fires - (ctx.Done could happen before, leaving this timer around for a while). Also, timerI at least gives you an option to dep inject timer implementations for testing.


pkg/ccl/changefeedccl/sink_kafka.go line 582 at r1 (raw file):

		case <-time.After(throttleFor):
		}
	}

Took a second to understand why we have both wait above (throttleFor), plus potential wait below (rate limiter).
I think this deserves a comment (honor throttle requests form the broker; make sure we don't overwhelm in the future).


pkg/ccl/changefeedccl/sink_kafka.go line 585 at r1 (raw file):

	s.throttler.RLock()
	defer s.throttler.RUnlock()
	if s.throttler.rateLimiter != nil {

are we worried that when we start, we slam kafka with everything we got?
I think it was one of the concerns -- particularly applicable during initial scans.
I understand you effectively lower the throughput if you get throttle signal; but perhaps
we should also start from a non-zero configuration, and keep increasing it while we can...


pkg/ccl/changefeedccl/sink_kafka.go line 1218 at r1 (raw file):

type saramaMetricName string

const saramaMetricOutgoingBytes saramaMetricName = "outgoing-byte-rate"

I have a mild preference of not defining this constant here. Maybe make changes in your fork to expose this constant?


pkg/ccl/changefeedccl/sink_kafka.go line 1221 at r1 (raw file):

func addOnThrottleCallback(s *kafkaSink) {
	s.producer.AddCallback(func(pr *sarama.ProduceResponse, err error) {

What are the guarantees around this method being called many times?

I..e we emit bunch of messages to 1 or more topics. Will each message response carry throttle information
once we enter throttling regime? Will only the first message that put kafka (broker) over the limit carry this info? What
happens to the other messages ? Do they return an error (and how is this error handled)? Something else?


pkg/ccl/changefeedccl/sink_kafka.go line 1223 at r1 (raw file):

	s.producer.AddCallback(func(pr *sarama.ProduceResponse, err error) {
		if err == nil && pr != nil && pr.ThrottleTime > 0 {
			log.Warningf(s.ctx, "Honoring broker request to pause for %d ms", pr.ThrottleTime.Milliseconds())

I think we should be a bit defensive and print with log.Every -- I understand that log spam shouldn't happen
because we'd throttle ourselves, but i'd feel safer if we used log.Every.


pkg/ccl/changefeedccl/sink_kafka.go line 1224 at r1 (raw file):

		if err == nil && pr != nil && pr.ThrottleTime > 0 {
			log.Warningf(s.ctx, "Honoring broker request to pause for %d ms", pr.ThrottleTime.Milliseconds())
			s.throttleForMs(pr.ThrottleTime.Milliseconds())

do we want to add our own metric for throttling/pushback time?
nit: perhaps throttleForMs should be renamed? I read this method as something that unconditionally waits...
Maybe something like notifyThrottle or some such?


pkg/ccl/changefeedccl/sink_test.go line 421 at r1 (raw file):

	// Because the sink has now seen a throttle-for-1-second response, the next call
	// to EmitRow() should block for a second. We assert half a second here to avoid
	// having to synchronize.

if you used timerI as I suggested in the kafka_sink; you could inject a no-wait implementation, but still assert that we've waited for 1 second.

Sarama notes, but does not honor, throttling requests
from Kafka. The async producer doesn't give a simple
way of letting them apply backpressure to changefeeds,
so we end up buffering a bunch of messages until either
we decide the sink has become unavailable or we release
a burst of messages and promptly get throttled again.

This PR addresses that by injecting a hook into Sarama
to add a cdc.Throttler to the Kafka sink. This
results in backpressure in EmitRow, as with other slow
sink situations, which should lead to better behavior.

sarama.AsyncProducer makes it difficult to inspect
the ProduceResponse.ThrottleTime, so this PR includes
a temporary fork of Sarama to make it visible.

Release note (enterprise change): Changefeeds to a Kafka sink now honor the producer_byte_rate configuration if set in the Kafka cluster.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants