-
Notifications
You must be signed in to change notification settings - Fork 94
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(processing): Support multiple kafka clusters #1101
Conversation
relay-config/src/config.rs
Outdated
topic_name: String, | ||
/// An identifier referencing one of the kafka configurations in `secondary_kafka_configs`. | ||
kafka_config_name: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These should be named differently as per spec.
topic_name: String, | |
/// An identifier referencing one of the kafka configurations in `secondary_kafka_configs`. | |
kafka_config_name: String, | |
name: String, | |
/// An identifier referencing one of the kafka configurations in `secondary_kafka_configs`. | |
broker: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wanted to change this because I found our use of "kafka config" vs "broker" inconsistent. The example in the spec where we write secondary_kafka_config: some_broker: ...
only looks good because the user named their kafka config "broker", but it can be any string
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, I prefer @untitaker's proposal (I vote for changing the spec).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, thanks. I'm 👍 with this.
In an attempt to keep it short, a last suggestion on the proposal: name
and config
. Given that this is in a key called topics
, I think name
is unambiguous.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
applied
/// The `kafka_config` is the default producer configuration used for all topics. A secondary | ||
/// kafka config can be referenced in `topics:` like this: | ||
/// | ||
/// ```yaml |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kudos for the example!
relay-config/src/config.rs
Outdated
/// Kafka topic names. | ||
#[serde(default)] | ||
pub topics: TopicNames, | ||
pub topics: TopicMap<TopicAssignment>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would add a type alias for TopicMap<TopicAssignment>
, and maybe rename the generic type to something more that indicates its generic structure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you have a suggestion for rename? I am not sure how to convey that via name
for config_p in config.kafka_config() { | ||
for config_p in config | ||
.kafka_config(config.kafka_topics().get(KafkaTopic::Outcomes)) | ||
.context(ServerErrorKind::KafkaError)? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Please assign this to a variable rather than wrapping the for header.
relay-server/src/actors/store.rs
Outdated
let mut reused_producers: BTreeMap<_, Arc<_>> = BTreeMap::new(); | ||
|
||
let producers = config | ||
.kafka_topics() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This elegantly comes with the advantage that you never connect to a broker that's unreferenced from the topic assignments. It's a bit of a stretch that you need to now expose a generic topic map from relay-config
just for that, though.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, but I already expose that map for the purpose of initializing the store actor and it's "producer map"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, that's what I was hinting at -- this is the store actor's initialization function.
It's a bit of a stretch to export such a generic type from the config crate to create a topic to producer map. It's elegant that you then have the producers mapped by topic name, but would overall be less code and probably easier to follow if you just do a lookup by name in here.
@RaduW is raising a similar point in #1101 (comment).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, that's what I was hinting at -- this is the store actor's initialization function.
Sorry yes I was thinking that you were commenting on the wrong line.
To be honest I didn't really understand what you meant by "lookup by name", but I removed TopicMap such that the interface between relay-config and relay-server gets smaller (which is what I think you wanted to achieve)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me.
relay-config/src/config.rs
Outdated
#[derive(Serialize, Deserialize, Debug)] | ||
#[serde(default)] | ||
pub struct TopicNames { | ||
pub struct TopicMap<T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I think we are saving very little in terms of avoiding redefining/maintaining 6 fields in two places
while paying for the extra complication of making TopicMap generic.
relay-config/src/config.rs
Outdated
topic_name: String, | ||
/// An identifier referencing one of the kafka configurations in `secondary_kafka_configs`. | ||
kafka_config_name: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, I prefer @untitaker's proposal (I vote for changing the spec).
Instructions and example for changelogFor changes exposed to the Python package, please add an entry to For changes to the Relay server, please add an entry to
To the changelog entry, please add a link to this PR (consider a more descriptive message): - Support multiple kafka clusters. ([#1101](https://github.com/getsentry/relay/pull/1101)) If none of the above apply, you can opt out by adding #skip-changelog to the PR description. |
self.values.processing.topics.get(topic).topic_name() | ||
} | ||
|
||
/// Topic name and list of Kafka configuration parameters for a given topic. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not return the topic name but rather the configuration name.
This adds additional config to Relay to allow for using dedicated kafka clusters for, in our usecase, the metrics topic. See https://www.notion.so/sentry/Configure-multiple-Kafka-brokers-in-Relay-36fc942b889b429589e87097a02b16e4 for design doc.
Points of improvement (for the future, not sure if relevant right now)
Unused kafka configs are not warned about in any way (but then we also don't warn about unrecognized keys anywhere in the config)
kafka configs are generally very lazily validated. that already included the actual values we pass to rdkafka, but it now also includes the names of the secondary configs. so for example you can reference unknown kafka configs, and it doesn't matter for relay as long as you don't turn on processing mode
the error messages on invalid topic assignments are atrocious because untagged enums in serde only provide generic error messages
you can't do this:
i.e. use the "verbose" form of metrics assignment without explicitly specifying both a topic name and a kafka config name. I think that's okay! if you don't want to specify a custom kafka config, simply write
metrics: foo
like before, if you don't want to specify a custom topic name while explicitly specifying a custom kafka config I guess you're out of luck.