Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: handle Kafka quota messages #103226

Open
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

HonoreDB
Copy link
Contributor

Fixes #92290

Sarama notes, but does not honor, throttling requests from Kafka. The async producer doesn't give a simple way of letting them apply backpressure to changefeeds, so we end up buffering a bunch of messages until either we decide the sink has become unavailable or we release a burst of messages and promptly get throttled again.

This PR addresses that by injecting a hook into Sarama metrics to add a cdc.Throttler to the Kafka sink. This results in backpressure in EmitRow, as with other slow sink situations, which should lead to better behavior.

Ready for review, but not merge yet as I haven't tested the roachtest and have a few unit tests I'd like to add.

Release note (enterprise change): Changefeeds to a Kafka sink now honor the producer_byte_rate configuration if set in the Kafka cluster.

@HonoreDB HonoreDB requested review from a team as code owners May 12, 2023 20:35
@HonoreDB HonoreDB requested review from herkolategan, smg260 and miretskiy and removed request for a team May 12, 2023 20:35
@cockroach-teamcity
Copy link
Member

This change is Reviewable

@shermanCRL
Copy link
Contributor

Nice. Probably want a feature flag as it’s on a hot path, and probably metamorphic testing.

@miretskiy
Copy link
Contributor

This implementation, and reliance on un-exported, metrics based mechanism makes me sad.
Sarama library does provide a way to retrieve quota information:

	admin, err := sarama.NewClusterAdminFromClient(client)
	quotas, err := admin.DescribeClientQuotas(nil, false)
        ...

The question is: can we figure out the correct args to DescribeClientQuotas (I think nil would return all configured quotas -- whatever that means), and then use this information to throttle.
Just like we periodically refresh metadata, we could also, periodically refresh quota information.

Sarama notes, but does not honor, throttling requests
from Kafka. The async producer doesn't give a simple
way of letting them apply backpressure to changefeeds,
so we end up buffering a bunch of messages until either
we decide the sink has become unavailable or we release
a burst of messages and promptly get throttled again.

This PR addresses that by injecting a hook into Sarama
metrics to add a cdc.Throttler to the Kafka sink. This
results in backpressure in EmitRow, as with other slow
sink situations, which should lead to better behavior.

Ready for review, but not merge yet as I haven't tested
the roachtest and have a few unit tests I'd like to add.

Release note (enterprise change): Changefeeds to a Kafka sink now honor the producer_byte_rate configuration if set in the Kafka cluster.
@HonoreDB HonoreDB force-pushed the kafka_quota_handling_via_metric_spy branch from fdd4df5 to 40d51cf Compare May 23, 2023 18:10
@shermanCRL
Copy link
Contributor

shermanCRL commented Jun 1, 2023

Nice!

I see two regimes here. The first is the “dumb” version, where we simply do what the protocol says -- it says pause for X, we pause for X.

The second is the dynamic version where we seek the right byte rate. This is likely to be helpful in the real world -- CRDB adapts, and we optimize for not requiring operator intervention.

My 2¢ is that the latter should be opt-in? I think it is likely to take a while to get right, and is more likely to be harder to predict by operators.

We also talked about how to ramp back up. “Seeking” needs to be ongoing, and both up and down.

@miretskiy
Copy link
Contributor

Nice!

I see two regimes here. The first is the “dumb” version, where we simply do what the protocol says -- it says pause for X, we pause for X.

The second is the dynamic version where we seek the right byte rate. This is likely to be helpful in the real world -- CRDB adapts, and we optimize for not requiring operator intervention.

My 2¢ is that the latter should be opt-in? I think it is likely to take a while to get right, and is more likely to be harder to predict by operators.

We also talked about how to ramp back up. “Seeking” needs to be ongoing, and both up and down.

I'm pretty sure we want to support this dynamic version. We already have fixed rate version by using node throttler.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

changefeedccl: better handle kafka quotas
4 participants