Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ref(kafka): Remove ShardedProducer #3415

Merged
merged 9 commits into from
Apr 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
**Features**:

- **Breaking change:** Kafka topic configuration keys now support the default topic name. The previous aliases `metrics` and `metrics_transactions` are no longer supported if configuring topics manually. Use `ingest-metrics` or `metrics_sessions` instead of `metrics`, and `ingest-performance-metrics` or `metrics_generic` instead of `metrics_transactions`. ([#3361](https://github.com/getsentry/relay/pull/3361))
- **Breaking change:** Remove `ShardedProducer` and related code. The sharded configuration for Kafka is no longer supported. ([#3415](https://github.com/getsentry/relay/pull/3415))
- Add support for continuous profiling. ([#3270](https://github.com/getsentry/relay/pull/3270))
- Add support for Reporting API for CSP reports ([#3277](https://github.com/getsentry/relay/pull/3277))
- Extract op and description while converting opentelemetry spans to sentry spans. ([#3287](https://github.com/getsentry/relay/pull/3287))
Expand Down
146 changes: 32 additions & 114 deletions relay-kafka/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@
//! The configuration can be either;
//! - [`TopicAssignment::Primary`] - the main and default kafka configuration,
//! - [`TopicAssignment::Secondary`] - used to configure any additional kafka topic,
//! - [`TopicAssignment::Sharded`] - if we want to configure multiple kafka clusters,
//! we can create a mapping of the range of logical shards to the kafka configuration.

use std::collections::BTreeMap;

Expand Down Expand Up @@ -160,9 +158,6 @@ pub enum TopicAssignment {
/// `secondary_kafka_configs`. In this case that custom kafka config will be used to produce
/// data to the given topic name.
Secondary(KafkaTopicConfig),
/// If we want to configure multiple kafka clusters, we can create a mapping of the
/// range of logical shards to the kafka configuration.
Sharded(Sharded),
}

/// Configuration for topic
Expand All @@ -176,37 +171,6 @@ pub struct KafkaTopicConfig {
kafka_config_name: String,
}

/// Configuration for logical shards -> kafka configuration mapping.
///
/// The configuration for this should look like:
///
/// ```ignore
/// metrics:
/// shards: 65000
/// mapping:
/// 0:
/// name: "ingest-metrics-1"
/// config: "metrics_1"
/// 25000:
/// name: "ingest-metrics-2"
/// config: "metrics_2"
/// 45000:
/// name: "ingest-metrics-3"
/// config: "metrics_3"
/// ```
///
/// where the `shards` defines how many logical shards must be created, and `mapping`
/// describes the per-shard configuration. Index in the `mapping` is the initial inclusive
/// index of the shard and the range is last till the next index or the maximum shard defined in
/// the `shards` option. The first index must always start with 0.
#[derive(Serialize, Deserialize, Debug)]
pub struct Sharded {
/// The number of shards used for this topic.
shards: u64,
/// The Kafka configuration assigned to the specific shard range.
mapping: BTreeMap<u64, KafkaTopicConfig>,
}

/// Describes Kafka config, with all the parameters extracted, which will be used for creating the
/// kafka producer.
#[derive(Debug)]
Expand All @@ -216,14 +180,6 @@ pub enum KafkaConfig<'a> {
/// Kafka parameters to create the kafka producer.
params: KafkaParams<'a>,
},

/// The list of the Kafka configs with related shard configs.
Sharded {
/// The maximum number of logical shards for this set of configs.
shards: u64,
/// The list of the sharded Kafka configs.
configs: BTreeMap<u64, KafkaParams<'a>>,
},
}

/// Sharded Kafka config.
Expand Down Expand Up @@ -273,27 +229,6 @@ impl TopicAssignment {
.ok_or(ConfigError::UnknownKafkaConfigName)?,
},
},
Self::Sharded(Sharded { shards, mapping }) => {
// quick fail if the config does not contain shard 0
if !mapping.contains_key(&0) {
return Err(ConfigError::InvalidShard);
}
let mut kafka_params = BTreeMap::new();
for (shard, kafka_config) in mapping {
let config = KafkaParams {
topic_name: kafka_config.topic_name.as_str(),
config_name: Some(kafka_config.kafka_config_name.as_str()),
params: secondary_configs
.get(kafka_config.kafka_config_name.as_str())
.ok_or(ConfigError::UnknownKafkaConfigName)?,
};
kafka_params.insert(*shard, config);
}
KafkaConfig::Sharded {
shards: *shards,
configs: kafka_params,
}
}
};

Ok(kafka_config)
Expand Down Expand Up @@ -321,18 +256,7 @@ ingest-events: "ingest-events-kafka-topic"
profiles:
name: "ingest-profiles"
config: "profiles"
ingest-metrics:
shards: 65000
mapping:
0:
name: "ingest-metrics-1"
config: "metrics_1"
25000:
name: "ingest-metrics-2"
config: "metrics_2"
45000:
name: "ingest-metrics-3"
config: "metrics_3"
ingest-metrics: "ingest-metrics-3"
transactions: "ingest-transactions-kafka-topic"
"#;

Expand All @@ -348,41 +272,17 @@ transactions: "ingest-transactions-kafka-topic"
value: "test-value".to_string(),
}],
);
second_config.insert(
"metrics_1".to_string(),
vec![KafkaConfigParam {
name: "test".to_string(),
value: "test-value".to_string(),
}],
);
second_config.insert(
"metrics_2".to_string(),
vec![KafkaConfigParam {
name: "test".to_string(),
value: "test-value".to_string(),
}],
);
second_config.insert(
"metrics_3".to_string(),
vec![KafkaConfigParam {
name: "test".to_string(),
value: "test-value".to_string(),
}],
);

let topics: TopicAssignments = serde_yaml::from_str(yaml).unwrap();
let events = topics.events;
let profiles = topics.profiles;
let metrics = topics.metrics_sessions;
let metrics_sessions = topics.metrics_sessions;
let transactions = topics.transactions;

assert!(matches!(events, TopicAssignment::Primary(_)));
assert!(matches!(profiles, TopicAssignment::Secondary { .. }));
assert!(matches!(metrics, TopicAssignment::Sharded { .. }));

let events_config = metrics
.kafka_config(&def_config, &second_config)
.expect("Kafka config for metrics topic");
assert!(matches!(events_config, KafkaConfig::Sharded { .. }));
assert!(matches!(metrics_sessions, TopicAssignment::Primary(_)));
assert!(matches!(transactions, TopicAssignment::Primary(_)));

let events_config = events
.kafka_config(&def_config, &second_config)
Expand All @@ -397,6 +297,33 @@ transactions: "ingest-transactions-kafka-topic"
}
));

let events_config = profiles
.kafka_config(&def_config, &second_config)
.expect("Kafka config for profiles topic");
assert!(matches!(
events_config,
KafkaConfig::Single {
params: KafkaParams {
topic_name: "ingest-profiles",
config_name: Some("profiles"),
..
}
}
));

let events_config = metrics_sessions
.kafka_config(&def_config, &second_config)
.expect("Kafka config for metrics topic");
assert!(matches!(
events_config,
KafkaConfig::Single {
params: KafkaParams {
topic_name: "ingest-metrics-3",
..
}
}
));

// Legacy keys are still supported
let transactions_config = transactions
.kafka_config(&def_config, &second_config)
Expand All @@ -410,15 +337,6 @@ transactions: "ingest-transactions-kafka-topic"
}
}
));

let (shards, mapping) =
if let TopicAssignment::Sharded(Sharded { shards, mapping }) = metrics {
(shards, mapping)
} else {
unreachable!()
};
assert_eq!(shards, 65000);
assert_eq!(3, mapping.len());
}

#[test]
Expand Down
99 changes: 2 additions & 97 deletions relay-kafka/src/producer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@
//!
//! There are two different producers that are supported in Relay right now:
//! - [`SingleProducer`] - which sends all the messages to the defined kafka [`KafkaTopic`],
//! - [`ShardedProducer`] - which expects to have at least one shard configured, and depending on
//! the shard number the different messages will be sent to different topics using the configured
//! producer for the this exact shard.

use std::borrow::Cow;
use std::cell::Cell;
Expand Down Expand Up @@ -99,54 +96,6 @@ impl fmt::Debug for SingleProducer {
}
}

/// Sharded producer configuration.
struct ShardedProducer {
/// The maximum number of shards for this producer.
shards: u64,
/// The actual Kafka producer assigned to the range of logical shards, where the `u64` in the map is
/// the inclusive beginning of the range.
producers: BTreeMap<u64, (String, Arc<ThreadedProducer>)>,
}

impl ShardedProducer {
/// Returns topic name and the Kafka producer based on the provided sharding key.
/// Returns error [`ClientError::InvalidShard`] if the shard range for the provided sharding
/// key could not be found.
///
/// # Errors
/// Returns [`ClientError::InvalidShard`] error if the provided `sharding_key` could not be
/// placed in any configured shard ranges.
pub fn get_producer(
&self,
sharding_key: u64,
) -> Result<(&str, &ThreadedProducer), ClientError> {
let shard = sharding_key % self.shards;
let (topic_name, producer) = self
.producers
.iter()
.take_while(|(k, _)| *k <= &shard)
.last()
.map(|(_, v)| v)
.ok_or(ClientError::InvalidShard)?;

Ok((topic_name, producer))
}
}

impl fmt::Debug for ShardedProducer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let producers = &self
.producers
.iter()
.map(|(shard, (topic, _))| (shard, topic))
.collect::<BTreeMap<_, _>>();
f.debug_struct("ShardedProducer")
.field("shards", &self.shards)
.field("producers", producers)
.finish()
}
}

/// Keeps all the configured kafka producers and responsible for the routing of the messages.
#[derive(Debug)]
pub struct KafkaClient {
Expand All @@ -167,7 +116,6 @@ impl KafkaClient {
pub fn send_message(
&self,
topic: KafkaTopic,
organization_id: u64,
message: &impl Message,
) -> Result<&str, ClientError> {
let serialized = message.serialize()?;
Expand All @@ -179,7 +127,6 @@ impl KafkaClient {
let key = message.key();
self.send(
topic,
organization_id,
&key,
message.headers(),
message.variant(),
Expand All @@ -193,7 +140,6 @@ impl KafkaClient {
pub fn send(
&self,
topic: KafkaTopic,
organization_id: u64,
key: &[u8; 16],
headers: Option<&BTreeMap<String, String>>,
variant: &str,
Expand All @@ -205,7 +151,7 @@ impl KafkaClient {
);
ClientError::InvalidTopicName
})?;
producer.send(organization_id, key, headers, variant, payload)
producer.send(key, headers, variant, payload)
}
}

Expand Down Expand Up @@ -268,34 +214,6 @@ impl KafkaClientBuilder {
.insert(topic, Producer::single((*topic_name).to_string(), producer));
Ok(self)
}
KafkaConfig::Sharded { shards, configs } => {
let mut producers = BTreeMap::new();
for (shard, kafka_params) in configs {
let config_name = kafka_params.config_name.map(str::to_string);
if let Some(producer) = self.reused_producers.get(&config_name) {
let cached_producer = Arc::clone(producer);
producers.insert(
*shard,
(kafka_params.topic_name.to_string(), cached_producer),
);
continue;
}
for config_p in kafka_params.params {
client_config.set(config_p.name.as_str(), config_p.value.as_str());
}
let producer = Arc::new(
client_config
.create_with_context(Context)
.map_err(ClientError::InvalidConfig)?,
);
self.reused_producers
.insert(config_name, Arc::clone(&producer));
producers.insert(*shard, (kafka_params.topic_name.to_string(), producer));
}
self.producers
.insert(topic, Producer::sharded(*shards, producers));
Ok(self)
}
}
}

Expand All @@ -318,14 +236,11 @@ impl fmt::Debug for KafkaClientBuilder {
}
}

/// This object contains the Kafka producer variants for single and sharded configurations.
/// This object contains the Kafka producer variants for single.
#[derive(Debug)]
enum ProducerInner {
/// Configuration variant for the single kafka producer.
Single(SingleProducer),
/// Configuration variant for sharded kafka producer, when one topic has different producers
/// dedicated to the range of the shards.
Sharded(ShardedProducer),
}

#[derive(Debug)]
Expand All @@ -345,17 +260,9 @@ impl Producer {
}
}

fn sharded(shards: u64, producers: BTreeMap<u64, (String, Arc<ThreadedProducer>)>) -> Self {
Self {
last_report: Instant::now().into(),
inner: ProducerInner::Sharded(ShardedProducer { shards, producers }),
}
}

/// Sends the payload to the correct producer for the current topic.
fn send(
&self,
organization_id: u64,
key: &[u8; 16],
headers: Option<&BTreeMap<String, String>>,
variant: &str,
Expand All @@ -370,8 +277,6 @@ impl Producer {
topic_name,
producer,
}) => (topic_name.as_str(), producer.as_ref()),

ProducerInner::Sharded(sharded) => sharded.get_producer(organization_id)?,
};
let mut record = BaseRecord::to(topic_name).key(key).payload(payload);

Expand Down
Loading
Loading