Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(processing): Support multiple kafka clusters #1101

Merged
merged 10 commits into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

## Unreleased

**Features**
**Features**:

- Add bucket width to bucket protocol. ([#1103](https://github.com/getsentry/relay/pull/1103))

- Support multiple kafka cluster configurations ([#1101](https://github.com/getsentry/relay/pull/1101))

## 21.10.0

Expand Down
159 changes: 129 additions & 30 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ pub enum ConfigErrorKind {
/// compiled without the processing feature.
#[fail(display = "was not compiled with processing, cannot enable processing")]
ProcessingNotAvailable,
/// The user referenced a kafka config name that does not exist.
#[fail(display = "unknown kafka config name")]
UnknownKafkaConfigName,
}

enum ConfigFormat {
Expand Down Expand Up @@ -703,30 +706,97 @@ pub enum KafkaTopic {
/// Configuration for topics.
#[derive(Serialize, Deserialize, Debug)]
#[serde(default)]
pub struct TopicNames {
pub struct TopicAssignments {
/// Simple events topic name.
pub events: String,
pub events: TopicAssignment,
/// Events with attachments topic name.
pub attachments: String,
pub attachments: TopicAssignment,
/// Transaction events topic name.
pub transactions: String,
pub transactions: TopicAssignment,
/// Event outcomes topic name.
pub outcomes: String,
pub outcomes: TopicAssignment,
/// Session health topic name.
pub sessions: String,
pub sessions: TopicAssignment,
/// Metrics topic name.
pub metrics: String,
pub metrics: TopicAssignment,
}

impl Default for TopicNames {
impl TopicAssignments {
/// Get a topic assignment by KafkaTopic value
pub fn get(&self, kafka_topic: KafkaTopic) -> &TopicAssignment {
match kafka_topic {
KafkaTopic::Attachments => &self.attachments,
KafkaTopic::Events => &self.events,
KafkaTopic::Transactions => &self.transactions,
KafkaTopic::Outcomes => &self.outcomes,
KafkaTopic::Sessions => &self.sessions,
KafkaTopic::Metrics => &self.metrics,
}
}
}

impl Default for TopicAssignments {
fn default() -> Self {
Self {
events: "ingest-events".to_owned(),
attachments: "ingest-attachments".to_owned(),
transactions: "ingest-transactions".to_owned(),
outcomes: "outcomes".to_owned(),
sessions: "ingest-sessions".to_owned(),
metrics: "ingest-metrics".to_owned(),
events: "ingest-events".to_owned().into(),
attachments: "ingest-attachments".to_owned().into(),
transactions: "ingest-transactions".to_owned().into(),
outcomes: "outcomes".to_owned().into(),
sessions: "ingest-sessions".to_owned().into(),
metrics: "ingest-metrics".to_owned().into(),
}
}
}

/// Configuration for a "logical" topic/datasink that Relay should forward data into.
///
/// Can be either a string containing the kafka topic name to produce into (using the default
/// `kafka_config`), or an object containing keys `topic_name` and `kafka_config_name` for using a
/// custom kafka cluster.
///
/// See documentation for `secondary_kafka_configs` for more information.
#[derive(Serialize, Deserialize, Debug)]
#[serde(untagged)]
pub enum TopicAssignment {
/// String containing the kafka topic name. In this case the default kafka cluster configured
/// in `kafka_config` will be used.
Primary(String),
/// Object containing topic name and string identifier of one of the clusters configured in
/// `secondary_kafka_configs`. In this case that custom kafkaconfig will be used to produce
/// data to the given topic name.
Secondary {
/// The topic name to use.
#[serde(rename = "name")]
topic_name: String,
/// An identifier referencing one of the kafka configurations in `secondary_kafka_configs`.
#[serde(rename = "config")]
kafka_config_name: String,
},
}

impl From<String> for TopicAssignment {
fn from(topic_name: String) -> TopicAssignment {
TopicAssignment::Primary(topic_name)
}
}

impl TopicAssignment {
/// Get the topic name from this topic assignment.
fn topic_name(&self) -> &str {
match *self {
TopicAssignment::Primary(ref s) => s.as_str(),
TopicAssignment::Secondary { ref topic_name, .. } => topic_name.as_str(),
}
}

/// Get the name of the kafka config to use. `None` means default configuration.
fn kafka_config_name(&self) -> Option<&str> {
match *self {
TopicAssignment::Primary(_) => None,
TopicAssignment::Secondary {
ref kafka_config_name,
..
} => Some(kafka_config_name.as_str()),
}
}
}
Expand Down Expand Up @@ -784,9 +854,30 @@ pub struct Processing {
pub max_session_secs_in_past: u32,
/// Kafka producer configurations.
pub kafka_config: Vec<KafkaConfigParam>,
/// Additional kafka producer configurations.
///
/// The `kafka_config` is the default producer configuration used for all topics. A secondary
/// kafka config can be referenced in `topics:` like this:
///
/// ```yaml
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kudos for the example!

/// secondary_kafka_configs:
/// mycustomcluster:
/// - name: 'bootstrap.servers'
/// value: 'sentry_kafka_metrics:9093'
///
/// topics:
/// transactions: ingest-transactions
/// metrics:
/// topic_name: ingest-metrics
/// kafka_config_name: mycustomcluster
untitaker marked this conversation as resolved.
Show resolved Hide resolved
/// ```
///
/// Then metrics will be produced to an entirely different Kafka cluster.
#[serde(default)]
pub secondary_kafka_configs: BTreeMap<String, Vec<KafkaConfigParam>>,
/// Kafka topic names.
#[serde(default)]
pub topics: TopicNames,
pub topics: TopicAssignments,
/// Redis hosts to connect to for storing state for rate limits.
#[serde(default)]
pub redis: Option<RedisConfig>,
Expand All @@ -811,7 +902,8 @@ impl Default for Processing {
max_secs_in_past: default_max_secs_in_past(),
max_session_secs_in_past: default_max_session_secs_in_past(),
kafka_config: Vec::new(),
topics: TopicNames::default(),
secondary_kafka_configs: BTreeMap::new(),
topics: TopicAssignments::default(),
redis: None,
attachment_chunk_size: default_chunk_size(),
projectconfig_cache_prefix: default_projectconfig_cache_prefix(),
Expand Down Expand Up @@ -1592,21 +1684,28 @@ impl Config {
self.values.processing.max_session_secs_in_past.into()
}

/// The list of Kafka configuration parameters.
pub fn kafka_config(&self) -> &[KafkaConfigParam] {
self.values.processing.kafka_config.as_slice()
}

/// Returns the name of the specified Kafka topic.
/// Topic name and list of Kafka configuration parameters for a given topic.
pub fn kafka_topic_name(&self, topic: KafkaTopic) -> &str {
let topics = &self.values.processing.topics;
match topic {
KafkaTopic::Attachments => topics.attachments.as_str(),
KafkaTopic::Events => topics.events.as_str(),
KafkaTopic::Transactions => topics.transactions.as_str(),
KafkaTopic::Outcomes => topics.outcomes.as_str(),
KafkaTopic::Sessions => topics.sessions.as_str(),
KafkaTopic::Metrics => topics.metrics.as_str(),
self.values.processing.topics.get(topic).topic_name()
}

/// Topic name and list of Kafka configuration parameters for a given topic.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not return the topic name but rather the configuration name.

pub fn kafka_config(
&self,
topic: KafkaTopic,
) -> Result<(Option<&str>, &[KafkaConfigParam]), ConfigErrorKind> {
if let Some(config_name) = self.values.processing.topics.get(topic).kafka_config_name() {
Ok((
Some(config_name),
self.values
.processing
.secondary_kafka_configs
.get(config_name)
.ok_or(ConfigErrorKind::UnknownKafkaConfigName)?
.as_slice(),
))
} else {
Ok((None, self.values.processing.kafka_config.as_slice()))
}
}

Expand Down
6 changes: 5 additions & 1 deletion relay-server/src/actors/outcome.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,11 @@ mod processing {
pub fn create(config: Arc<Config>) -> Result<Self, ServerError> {
let (future_producer, http_producer) = if config.processing_enabled() {
let mut client_config = ClientConfig::new();
for config_p in config.kafka_config() {
for config_p in config
.kafka_config(KafkaTopic::Outcomes)
.context(ServerErrorKind::KafkaError)?
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Please assign this to a variable rather than wrapping the for header.

.1
{
client_config.set(config_p.name.as_str(), config_p.value.as_str());
}
let future_producer = client_config
Expand Down
92 changes: 76 additions & 16 deletions relay-server/src/actors/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,32 +50,89 @@ pub enum StoreError {
NoEventId,
}

type Producer = Arc<ThreadedProducer>;

struct Producers {
events: Producer,
attachments: Producer,
transactions: Producer,
sessions: Producer,
metrics: Producer,
}

impl Producers {
/// Get a producer by KafkaTopic value
pub fn get(&self, kafka_topic: KafkaTopic) -> Option<&Producer> {
match kafka_topic {
KafkaTopic::Attachments => Some(&self.attachments),
KafkaTopic::Events => Some(&self.events),
KafkaTopic::Transactions => Some(&self.transactions),
KafkaTopic::Outcomes => {
// should be unreachable
relay_log::error!("attempted to send data to outcomes topic from store forwarder. there is another actor for that.");
None
}
KafkaTopic::Sessions => Some(&self.sessions),
KafkaTopic::Metrics => Some(&self.metrics),
}
}
}

/// Actor for publishing events to Sentry through kafka topics.
pub struct StoreForwarder {
config: Arc<Config>,
producer: Arc<ThreadedProducer>,
producers: Producers,
}

fn make_distinct_id(s: &str) -> Uuid {
s.parse()
.unwrap_or_else(|_| Uuid::new_v5(&NAMESPACE_DID, s.as_bytes()))
}

impl StoreForwarder {
pub fn create(config: Arc<Config>) -> Result<Self, ServerError> {
let mut client_config = ClientConfig::new();
for config_p in config.kafka_config() {
client_config.set(config_p.name.as_str(), config_p.value.as_str());
}
/// Temporary map used to deduplicate kafka producers
type ReusedProducersMap<'a> = BTreeMap<Option<&'a str>, Producer>;

fn make_producer<'a>(
config: &'a Config,
reused_producers: &mut ReusedProducersMap<'a>,
kafka_topic: KafkaTopic,
) -> Result<Producer, ServerError> {
let (config_name, kafka_config) = config
.kafka_config(kafka_topic)
.context(ServerErrorKind::KafkaError)?;

if let Some(producer) = reused_producers.get(&config_name) {
return Ok(Arc::clone(producer));
}

let producer = client_config
let mut client_config = ClientConfig::new();

for config_p in kafka_config {
client_config.set(config_p.name.as_str(), config_p.value.as_str());
}

let producer = Arc::new(
client_config
.create_with_context(CaptureErrorContext)
.context(ServerErrorKind::KafkaError)?;
.context(ServerErrorKind::KafkaError)?,
);

Ok(Self {
config,
producer: Arc::new(producer),
})
reused_producers.insert(config_name, Arc::clone(&producer));
Ok(producer)
}

impl StoreForwarder {
pub fn create(config: Arc<Config>) -> Result<Self, ServerError> {
let mut reused_producers = BTreeMap::new();
let producers = Producers {
attachments: make_producer(&*config, &mut reused_producers, KafkaTopic::Attachments)?,
events: make_producer(&*config, &mut reused_producers, KafkaTopic::Events)?,
transactions: make_producer(&*config, &mut reused_producers, KafkaTopic::Transactions)?,
sessions: make_producer(&*config, &mut reused_producers, KafkaTopic::Sessions)?,
metrics: make_producer(&*config, &mut reused_producers, KafkaTopic::Metrics)?,
};

Ok(Self { config, producers })
}

fn produce(&self, topic: KafkaTopic, message: KafkaMessage) -> Result<(), StoreError> {
Expand All @@ -86,10 +143,13 @@ impl StoreForwarder {
.key(&key)
.payload(&serialized);

match self.producer.send(record) {
Ok(_) => Ok(()),
Err((kafka_error, _message)) => Err(StoreError::SendFailed(kafka_error)),
if let Some(producer) = self.producers.get(topic) {
producer
.send(record)
.map_err(|(kafka_error, _message)| StoreError::SendFailed(kafka_error))?;
}

Ok(())
}

fn produce_attachment_chunks(
Expand Down
5 changes: 0 additions & 5 deletions tests/integration/fixtures/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,6 @@ def relay_with_processing(relay, mini_sentry, processing_config):

def inner(options=None):
options = processing_config(options)

kafka_config = {}
for elm in options["processing"]["kafka_config"]:
kafka_config[elm["name"]] = elm["value"]

return relay(mini_sentry, options=options)

return inner
Expand Down
Loading