Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeedccl: better handle kafka quotas #92290

Closed
amruss opened this issue Nov 21, 2022 · 12 comments · Fixed by #118643 · May be fixed by #103226
Closed

changefeedccl: better handle kafka quotas #92290

amruss opened this issue Nov 21, 2022 · 12 comments · Fixed by #118643 · May be fixed by #103226
Assignees
Labels
C-enhancement Solution expected to add code/behavior + preserve backward-compat (pg compat issues are exception) T-cdc
Milestone

Comments

@amruss
Copy link
Contributor

amruss commented Nov 21, 2022

Kafka users can set a data quota in kafka. As a high throughput, high scale database, we can easily run up on those quotas. Customers cannot have an arbitrarily large kafka cluster to handle traffic bursts.

Likely what we want to do is:

  • exponentially backoff when we see a quota exceeded error from kafka (for the length of time kafka tells us to?)
  • implement pushback in this case ^

We should also add end-to-end testing for the scenario where kafka is purposely slow

Jira issue: CRDB-21692

Epic CRDB-21691

@amruss amruss added the C-enhancement Solution expected to add code/behavior + preserve backward-compat (pg compat issues are exception) label Nov 21, 2022
@blathers-crl blathers-crl bot added the T-cdc label Nov 21, 2022
@blathers-crl
Copy link

blathers-crl bot commented Nov 21, 2022

cc @cockroachdb/cdc

@exalate-issue-sync exalate-issue-sync bot added T-cdc and removed T-cdc labels Nov 21, 2022
@blathers-crl
Copy link

blathers-crl bot commented Nov 21, 2022

cc @cockroachdb/cdc

@shermanCRL
Copy link
Contributor

Overall, the answer seems be first to design a throttling system for changefeeds, similar in spirit to memory monitoring / admission control / quota pools.

Consider scope per pool/bucket: is it per-cluster, per-database or per-changefeed? Actually, perhaps changefeeds could be grouped into arbitrary pools.

@miretskiy
Copy link
Contributor

@HonoreDB , I believe you're working on this right now.

@miretskiy
Copy link
Contributor

Not going to do this in 23.2.
Assigning to myself to attempt to create an upstream sarama patch to add the capability required for quota pushback implementation.

@wenyihu6
Copy link
Contributor

wenyihu6 commented Jan 8, 2024

Creating a list of sub-issues to track the work that still needs to be done:

@wenyihu6
Copy link
Contributor

wenyihu6 commented Jan 30, 2024

I discussed this with yev and my understanding of this issue is:

  • The newer version of sarama acknowledges the throttling behaviour from kafka and starts throttling upon receiving the response. https://github.com/IBM/sarama/commit/5ac5dc0e90b2317118b193cf1aa03dc5f2122539/
  • My understanding was that when sarama starts throttling after receiving the kafka response, this effect would propagate along the changefeed pipeline and cause rangefeed to slow down and emit less messages as well until the throttling is done. So no work would be needed on our client side to react to this throttling effect since sarama's effect gets propagated.

The work needs to be done in this area is:

  1. Upgrade sarama version - done deps: upgrade Shopify/sarama v1.38.1 to IBM/sarama v1.42.1 #117544
  2. Add observability metrics into sarama so that we know when sarama starts throttling and we can tell customer why their changefeed is slow and they would need to adjust their kafka quota configuration accordingly changefeedccl: add observability metrics into sarama code #117693
  3. Add per-changefeed configuration so that user can specify different kafka quota configuration for different changefeeds changefeedccl: allow per changefeed kafka quota config #118643. This is done by changing the sarama.clientID. Users can specify different quota based on the client ID according to https://kafka.apache.org/documentation/#quotas. I'm still doing more testing on this PR.

@miretskiy Could you help me confirm my understanding here?

cc:@rharding6373

@miretskiy
Copy link
Contributor

Your understanding is exactly right.
It is possible that the above solution might not be enough (I still worry about the possibility
of changefeeds constantly getting pushback signal; respecting this signal; and then restarting
with a blast of messages -- which would cause more pushback)... However, I think this gives
a good first step, and it's not clear yet if we'd need to do anything else.

@wenyihu6
Copy link
Contributor

Your understanding is exactly right. It is possible that the above solution might not be enough (I still worry about the possibility of changefeeds constantly getting pushback signal; respecting this signal; and then restarting with a blast of messages -- which would cause more pushback)... However, I think this gives a good first step, and it's not clear yet if we'd need to do anything else.

Thanks for the confirmation!

@wenyihu6
Copy link
Contributor

Your understanding is exactly right. It is possible that the above solution might not be enough (I still worry about the possibility of changefeeds constantly getting pushback signal; respecting this signal; and then restarting with a blast of messages -- which would cause more pushback)... However, I think this gives a good first step, and it's not clear yet if we'd need to do anything else.

For my education, I wanted to check how this effect gets propagated. Are we just busy waiting here https://github.com/cockroachdb/cockroach/blob/edd9e94cfd8489492816617200aae8c12946b83f/pkg/ccl/changefeedccl/sink_kafka.go#L558-L577 since the producer will be blocked while waiting for throttled https://github.com/IBM/sarama/blob/2767191b19b2e190f7095f21cac2a014de80e92c/broker.go#L1009-L1010. And since the sarama is waiting, we also wait in workerLoop() which in turn slows down the emitting messages. If this is correct, could you point to me where emitting less messages in turn slow down rangefeed in the code?

@wenyihu6
Copy link
Contributor

@miretskiy ^ : ) last question 😬

@miretskiy
Copy link
Contributor

Let's discuss offline...

wenyihu6 added a commit to wenyihu6/cockroach that referenced this issue Feb 2, 2024
Previously, users were limited to setting a single kafka quota configuration for
cockroachdb which was then applied and restricting all changefeeds. This patch
introduces a new changefeed configuration option, allowing users to define
client id for different changefeeds, allowing users to specify different kafka
quota configurations for different changefeeds. To use it, users can specify a
unique client ID using `kafka_sink_config` and configure different quota
settings on kafka server based on
https://kafka.apache.org/documentation/#quotas.

```
CREATE CHANGEFEED FOR foo WITH kafka_sink_config='{"ClientID": "clientID1"}'
```

Fixes: cockroachdb#92290

Release note: `kafka_sink_config` now supports specifying a different client ID
for different changefeeds, enabling users to define distinct kafka quota
configurations for various changefeeds.
wenyihu6 added a commit to wenyihu6/cockroach that referenced this issue Feb 7, 2024
Previously, users were limited to setting a single kafka quota configuration for
cockroachdb which was then applied and restricting all changefeeds. This patch
introduces a new changefeed configuration option, allowing users to define
client id for different changefeeds, allowing users to specify different kafka
quota configurations for different changefeeds. To use it, users can specify a
unique client ID using `kafka_sink_config` and configure different quota
settings on kafka server based on
https://kafka.apache.org/documentation/#quotas.

```
CREATE CHANGEFEED FOR foo WITH kafka_sink_config='{"ClientID": "clientID1"}'
```

Note that Fixes: cockroachdb#92290

Release note: `kafka_sink_config` now supports specifying a different client ID
for different changefeeds, enabling users to define distinct kafka quota
configurations for various changefeeds.

For any kafka versions >= V1_0_0_0 ([KIP-190: Handle client-ids consistently
between clients and
brokers](https://cwiki.apache.org/confluence/display/KAFKA/KIP-190%3A+Handle+client-ids+consistently+between+clients+and+brokers)),
any string can be used as client ID. For earlier kafka versions, clientID can
only contain characters [A-Za-z0-9._-] are acceptable.

For example,
```
CREATE CHANGEFEED FOR ... WITH kafka_sink_config='{"ClientID": "clientID1"}'
```
craig bot pushed a commit that referenced this issue Feb 9, 2024
118643: changefeedccl: allow per changefeed kafka quota config r=rharding6373 a=wenyihu6

Previously, users were limited to setting a single kafka quota configuration for
cockroachdb which was then applied and restricting all changefeeds. This patch
introduces a new changefeed configuration option, allowing users to define
client id for different changefeeds, allowing users to specify different kafka
quota configurations for different changefeeds. To use it, users can specify a
unique client ID using `kafka_sink_config` and configure different quota
settings on kafka server based on
https://kafka.apache.org/documentation/#quotas.

``` 
CREATE CHANGEFEED FOR foo WITH kafka_sink_config='{"ClientID": "clientID1"}'
```

Fixes: #92290

Release note: `kafka_sink_config` now supports specifying a different client ID
for different changefeeds, enabling users to define distinct kafka quota
configurations for various changefeeds.

For any kafka versions >= V1_0_0_0 ([KIP-190: Handle client-ids consistently
between clients and
brokers](https://cwiki.apache.org/confluence/display/KAFKA/KIP-190%3A+Handle+client-ids+consistently+between+clients+and+brokers)),
any string can be used as client ID. For earlier kafka versions, clientID can
only contain characters [A-Za-z0-9._-] are acceptable.

For example,
``` 
CREATE CHANGEFEED FOR ... WITH kafka_sink_config='{"ClientID": "clientID1"}'
```

Co-authored-by: Wenyi Hu <[email protected]>
@craig craig bot closed this as completed in 9a05851 Feb 9, 2024
wenyihu6 added a commit to wenyihu6/cockroach that referenced this issue Feb 21, 2024
Previously, users were limited to setting a single kafka quota configuration for
cockroachdb which was then applied and restricting all changefeeds. This patch
introduces a new changefeed configuration option, allowing users to define
client id for different changefeeds, allowing users to specify different kafka
quota configurations for different changefeeds. To use it, users can specify a
unique client ID using `kafka_sink_config` and configure different quota
settings on kafka server based on
https://kafka.apache.org/documentation/#quotas.

```
CREATE CHANGEFEED FOR foo WITH kafka_sink_config='{"ClientID": "clientID1"}'
```

Note that Fixes: cockroachdb#92290

Release note: `kafka_sink_config` now supports specifying a different client ID
for different changefeeds, enabling users to define distinct kafka quota
configurations for various changefeeds.

For any kafka versions >= V1_0_0_0 ([KIP-190: Handle client-ids consistently
between clients and
brokers](https://cwiki.apache.org/confluence/display/KAFKA/KIP-190%3A+Handle+client-ids+consistently+between+clients+and+brokers)),
any string can be used as client ID. For earlier kafka versions, clientID can
only contain characters [A-Za-z0-9._-] are acceptable.

For example,
```
CREATE CHANGEFEED FOR ... WITH kafka_sink_config='{"ClientID": "clientID1"}'
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
C-enhancement Solution expected to add code/behavior + preserve backward-compat (pg compat issues are exception) T-cdc
Projects
No open projects
Status: Closed
6 participants