-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
changefeedccl: allow per changefeed kafka quota config #118643
Conversation
71a2db6
to
8dd6b33
Compare
Splitted the roachtest work into another PR here - #118428. We can revisit what the best way to test this work is there. |
First commit comes from #117693. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice work! Have a couple comments.
Reviewable status: complete! 0 of 0 LGTMs obtained (waiting on @wenyihu6)
pkg/ccl/changefeedccl/sink_kafka.go
line 838 at r2 (raw file):
kafka.Producer.Flush.Frequency = time.Duration(c.Flush.Frequency) kafka.Producer.Flush.MaxMessages = c.Flush.MaxMessages kafka.ClientID = c.ClientID
Are there any restrictions on client IDs, e.g., length, invalid characters, that we need to validate?
pkg/ccl/changefeedccl/sink_test.go
line 644 at r2 (raw file):
cfg, err = getSaramaConfig(opts) require.NoError(t, err) require.NoError(t, cfg.Validate())
You should add an extra check to make sure that clientID1
is reflected in the cfg
.
Is there also logic we should add toValidate
to make sure the client id is sensical? See my other comment.
Previously, rharding6373 (Rachael Harding) wrote…
Sarama provides an exported validation check in its library. I opened another PR to add this check #118890. |
8dd6b33
to
199555d
Compare
Previously, rharding6373 (Rachael Harding) wrote…
Done. |
199555d
to
66f33cb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 1 of 0 LGTMs obtained (waiting on @wenyihu6)
66f33cb
to
d841789
Compare
d841789
to
b8c166f
Compare
Previously, users were limited to setting a single kafka quota configuration for cockroachdb which was then applied and restricting all changefeeds. This patch introduces a new changefeed configuration option, allowing users to define client id for different changefeeds, allowing users to specify different kafka quota configurations for different changefeeds. To use it, users can specify a unique client ID using `kafka_sink_config` and configure different quota settings on kafka server based on https://kafka.apache.org/documentation/#quotas. ``` CREATE CHANGEFEED FOR foo WITH kafka_sink_config='{"ClientID": "clientID1"}' ``` Note that Fixes: cockroachdb#92290 Release note: `kafka_sink_config` now supports specifying a different client ID for different changefeeds, enabling users to define distinct kafka quota configurations for various changefeeds. For any kafka versions >= V1_0_0_0 ([KIP-190: Handle client-ids consistently between clients and brokers](https://cwiki.apache.org/confluence/display/KAFKA/KIP-190%3A+Handle+client-ids+consistently+between+clients+and+brokers)), any string can be used as client ID. For earlier kafka versions, clientID can only contain characters [A-Za-z0-9._-] are acceptable. For example, ``` CREATE CHANGEFEED FOR ... WITH kafka_sink_config='{"ClientID": "clientID1"}' ```
b8c166f
to
9a05851
Compare
TFTRs!! bors r=rharding6373 |
Build succeeded: |
Previously, users were limited to setting a single kafka quota configuration for
cockroachdb which was then applied and restricting all changefeeds. This patch
introduces a new changefeed configuration option, allowing users to define
client id for different changefeeds, allowing users to specify different kafka
quota configurations for different changefeeds. To use it, users can specify a
unique client ID using
kafka_sink_config
and configure different quotasettings on kafka server based on
https://kafka.apache.org/documentation/#quotas.
Fixes: #92290
Release note:
kafka_sink_config
now supports specifying a different client IDfor different changefeeds, enabling users to define distinct kafka quota
configurations for various changefeeds.
For any kafka versions >= V1_0_0_0 (KIP-190: Handle client-ids consistently
between clients and
brokers),
any string can be used as client ID. For earlier kafka versions, clientID can
only contain characters [A-Za-z0-9._-] are acceptable.
For example,