Skip to content

Commit

Permalink
Merge branch 'master' into release/try-guard-decompression
Browse files Browse the repository at this point in the history
* master:
  meta(vscode): Update python extension settings (#1109)
  ci: Bump sentry-integration python to 3.8 (#1110)
  feat(processing): Support multiple kafka clusters (#1101)
  • Loading branch information
jan-auer committed Oct 28, 2021
2 parents 662514d + bc38521 commit d837444
Show file tree
Hide file tree
Showing 9 changed files with 277 additions and 58 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ jobs:
- name: Run tests
run: pytest tests -n auto -v
env:
RELAY_VERSION_CHAIN: '20.6.0,latest'
RELAY_VERSION_CHAIN: "20.6.0,latest"

sentry-relay-integration-tests:
name: Sentry-Relay Integration Tests
Expand Down Expand Up @@ -267,7 +267,7 @@ jobs:
with:
workdir: sentry
cache-files-hash: ${{ hashFiles('sentry/requirements**.txt') }}
python-version: 3.6
python-version: 3.8
snuba: true
kafka: true

Expand Down
2 changes: 2 additions & 0 deletions .vscode/extensions.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
"bungcip.better-toml",
// Rust language server
"matklad.rust-analyzer",
// Python including Pylance
"ms-python.python",
// Crates.io dependency versions
"serayuzgur.crates",
// Debugger support for Rust and native
Expand Down
4 changes: 2 additions & 2 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
"files.insertFinalNewline": true,

// Python configuration
"python.pythonPath": ".venv/bin/python",
"python.linting.enabled": false,
"python.defaultInterpreterPath": ".venv/bin/python",
"python.linting.enabled": true,
"python.formatting.provider": "black",

// Language-specific overrides
Expand Down
4 changes: 2 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

## Unreleased

**Features**
**Features**:

- Add bucket width to bucket protocol. ([#1103](https://github.com/getsentry/relay/pull/1103))

- Support multiple kafka cluster configurations ([#1101](https://github.com/getsentry/relay/pull/1101))

## 21.10.0

Expand Down
159 changes: 129 additions & 30 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ pub enum ConfigErrorKind {
/// compiled without the processing feature.
#[fail(display = "was not compiled with processing, cannot enable processing")]
ProcessingNotAvailable,
/// The user referenced a kafka config name that does not exist.
#[fail(display = "unknown kafka config name")]
UnknownKafkaConfigName,
}

enum ConfigFormat {
Expand Down Expand Up @@ -703,30 +706,97 @@ pub enum KafkaTopic {
/// Configuration for topics.
#[derive(Serialize, Deserialize, Debug)]
#[serde(default)]
pub struct TopicNames {
pub struct TopicAssignments {
/// Simple events topic name.
pub events: String,
pub events: TopicAssignment,
/// Events with attachments topic name.
pub attachments: String,
pub attachments: TopicAssignment,
/// Transaction events topic name.
pub transactions: String,
pub transactions: TopicAssignment,
/// Event outcomes topic name.
pub outcomes: String,
pub outcomes: TopicAssignment,
/// Session health topic name.
pub sessions: String,
pub sessions: TopicAssignment,
/// Metrics topic name.
pub metrics: String,
pub metrics: TopicAssignment,
}

impl Default for TopicNames {
impl TopicAssignments {
/// Get a topic assignment by KafkaTopic value
pub fn get(&self, kafka_topic: KafkaTopic) -> &TopicAssignment {
match kafka_topic {
KafkaTopic::Attachments => &self.attachments,
KafkaTopic::Events => &self.events,
KafkaTopic::Transactions => &self.transactions,
KafkaTopic::Outcomes => &self.outcomes,
KafkaTopic::Sessions => &self.sessions,
KafkaTopic::Metrics => &self.metrics,
}
}
}

impl Default for TopicAssignments {
fn default() -> Self {
Self {
events: "ingest-events".to_owned(),
attachments: "ingest-attachments".to_owned(),
transactions: "ingest-transactions".to_owned(),
outcomes: "outcomes".to_owned(),
sessions: "ingest-sessions".to_owned(),
metrics: "ingest-metrics".to_owned(),
events: "ingest-events".to_owned().into(),
attachments: "ingest-attachments".to_owned().into(),
transactions: "ingest-transactions".to_owned().into(),
outcomes: "outcomes".to_owned().into(),
sessions: "ingest-sessions".to_owned().into(),
metrics: "ingest-metrics".to_owned().into(),
}
}
}

/// Configuration for a "logical" topic/datasink that Relay should forward data into.
///
/// Can be either a string containing the kafka topic name to produce into (using the default
/// `kafka_config`), or an object containing keys `topic_name` and `kafka_config_name` for using a
/// custom kafka cluster.
///
/// See documentation for `secondary_kafka_configs` for more information.
#[derive(Serialize, Deserialize, Debug)]
#[serde(untagged)]
pub enum TopicAssignment {
/// String containing the kafka topic name. In this case the default kafka cluster configured
/// in `kafka_config` will be used.
Primary(String),
/// Object containing topic name and string identifier of one of the clusters configured in
/// `secondary_kafka_configs`. In this case that custom kafkaconfig will be used to produce
/// data to the given topic name.
Secondary {
/// The topic name to use.
#[serde(rename = "name")]
topic_name: String,
/// An identifier referencing one of the kafka configurations in `secondary_kafka_configs`.
#[serde(rename = "config")]
kafka_config_name: String,
},
}

impl From<String> for TopicAssignment {
fn from(topic_name: String) -> TopicAssignment {
TopicAssignment::Primary(topic_name)
}
}

impl TopicAssignment {
/// Get the topic name from this topic assignment.
fn topic_name(&self) -> &str {
match *self {
TopicAssignment::Primary(ref s) => s.as_str(),
TopicAssignment::Secondary { ref topic_name, .. } => topic_name.as_str(),
}
}

/// Get the name of the kafka config to use. `None` means default configuration.
fn kafka_config_name(&self) -> Option<&str> {
match *self {
TopicAssignment::Primary(_) => None,
TopicAssignment::Secondary {
ref kafka_config_name,
..
} => Some(kafka_config_name.as_str()),
}
}
}
Expand Down Expand Up @@ -784,9 +854,30 @@ pub struct Processing {
pub max_session_secs_in_past: u32,
/// Kafka producer configurations.
pub kafka_config: Vec<KafkaConfigParam>,
/// Additional kafka producer configurations.
///
/// The `kafka_config` is the default producer configuration used for all topics. A secondary
/// kafka config can be referenced in `topics:` like this:
///
/// ```yaml
/// secondary_kafka_configs:
/// mycustomcluster:
/// - name: 'bootstrap.servers'
/// value: 'sentry_kafka_metrics:9093'
///
/// topics:
/// transactions: ingest-transactions
/// metrics:
/// name: ingest-metrics
/// config: mycustomcluster
/// ```
///
/// Then metrics will be produced to an entirely different Kafka cluster.
#[serde(default)]
pub secondary_kafka_configs: BTreeMap<String, Vec<KafkaConfigParam>>,
/// Kafka topic names.
#[serde(default)]
pub topics: TopicNames,
pub topics: TopicAssignments,
/// Redis hosts to connect to for storing state for rate limits.
#[serde(default)]
pub redis: Option<RedisConfig>,
Expand All @@ -811,7 +902,8 @@ impl Default for Processing {
max_secs_in_past: default_max_secs_in_past(),
max_session_secs_in_past: default_max_session_secs_in_past(),
kafka_config: Vec::new(),
topics: TopicNames::default(),
secondary_kafka_configs: BTreeMap::new(),
topics: TopicAssignments::default(),
redis: None,
attachment_chunk_size: default_chunk_size(),
projectconfig_cache_prefix: default_projectconfig_cache_prefix(),
Expand Down Expand Up @@ -1592,21 +1684,28 @@ impl Config {
self.values.processing.max_session_secs_in_past.into()
}

/// The list of Kafka configuration parameters.
pub fn kafka_config(&self) -> &[KafkaConfigParam] {
self.values.processing.kafka_config.as_slice()
}

/// Returns the name of the specified Kafka topic.
/// Topic name and list of Kafka configuration parameters for a given topic.
pub fn kafka_topic_name(&self, topic: KafkaTopic) -> &str {
let topics = &self.values.processing.topics;
match topic {
KafkaTopic::Attachments => topics.attachments.as_str(),
KafkaTopic::Events => topics.events.as_str(),
KafkaTopic::Transactions => topics.transactions.as_str(),
KafkaTopic::Outcomes => topics.outcomes.as_str(),
KafkaTopic::Sessions => topics.sessions.as_str(),
KafkaTopic::Metrics => topics.metrics.as_str(),
self.values.processing.topics.get(topic).topic_name()
}

/// Topic name and list of Kafka configuration parameters for a given topic.
pub fn kafka_config(
&self,
topic: KafkaTopic,
) -> Result<(Option<&str>, &[KafkaConfigParam]), ConfigErrorKind> {
if let Some(config_name) = self.values.processing.topics.get(topic).kafka_config_name() {
Ok((
Some(config_name),
self.values
.processing
.secondary_kafka_configs
.get(config_name)
.ok_or(ConfigErrorKind::UnknownKafkaConfigName)?
.as_slice(),
))
} else {
Ok((None, self.values.processing.kafka_config.as_slice()))
}
}

Expand Down
6 changes: 5 additions & 1 deletion relay-server/src/actors/outcome.rs
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,11 @@ mod processing {
pub fn create(config: Arc<Config>) -> Result<Self, ServerError> {
let (future_producer, http_producer) = if config.processing_enabled() {
let mut client_config = ClientConfig::new();
for config_p in config.kafka_config() {
for config_p in config
.kafka_config(KafkaTopic::Outcomes)
.context(ServerErrorKind::KafkaError)?
.1
{
client_config.set(config_p.name.as_str(), config_p.value.as_str());
}
let future_producer = client_config
Expand Down
92 changes: 76 additions & 16 deletions relay-server/src/actors/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,32 +50,89 @@ pub enum StoreError {
NoEventId,
}

type Producer = Arc<ThreadedProducer>;

struct Producers {
events: Producer,
attachments: Producer,
transactions: Producer,
sessions: Producer,
metrics: Producer,
}

impl Producers {
/// Get a producer by KafkaTopic value
pub fn get(&self, kafka_topic: KafkaTopic) -> Option<&Producer> {
match kafka_topic {
KafkaTopic::Attachments => Some(&self.attachments),
KafkaTopic::Events => Some(&self.events),
KafkaTopic::Transactions => Some(&self.transactions),
KafkaTopic::Outcomes => {
// should be unreachable
relay_log::error!("attempted to send data to outcomes topic from store forwarder. there is another actor for that.");
None
}
KafkaTopic::Sessions => Some(&self.sessions),
KafkaTopic::Metrics => Some(&self.metrics),
}
}
}

/// Actor for publishing events to Sentry through kafka topics.
pub struct StoreForwarder {
config: Arc<Config>,
producer: Arc<ThreadedProducer>,
producers: Producers,
}

fn make_distinct_id(s: &str) -> Uuid {
s.parse()
.unwrap_or_else(|_| Uuid::new_v5(&NAMESPACE_DID, s.as_bytes()))
}

impl StoreForwarder {
pub fn create(config: Arc<Config>) -> Result<Self, ServerError> {
let mut client_config = ClientConfig::new();
for config_p in config.kafka_config() {
client_config.set(config_p.name.as_str(), config_p.value.as_str());
}
/// Temporary map used to deduplicate kafka producers
type ReusedProducersMap<'a> = BTreeMap<Option<&'a str>, Producer>;

fn make_producer<'a>(
config: &'a Config,
reused_producers: &mut ReusedProducersMap<'a>,
kafka_topic: KafkaTopic,
) -> Result<Producer, ServerError> {
let (config_name, kafka_config) = config
.kafka_config(kafka_topic)
.context(ServerErrorKind::KafkaError)?;

if let Some(producer) = reused_producers.get(&config_name) {
return Ok(Arc::clone(producer));
}

let producer = client_config
let mut client_config = ClientConfig::new();

for config_p in kafka_config {
client_config.set(config_p.name.as_str(), config_p.value.as_str());
}

let producer = Arc::new(
client_config
.create_with_context(CaptureErrorContext)
.context(ServerErrorKind::KafkaError)?;
.context(ServerErrorKind::KafkaError)?,
);

Ok(Self {
config,
producer: Arc::new(producer),
})
reused_producers.insert(config_name, Arc::clone(&producer));
Ok(producer)
}

impl StoreForwarder {
pub fn create(config: Arc<Config>) -> Result<Self, ServerError> {
let mut reused_producers = BTreeMap::new();
let producers = Producers {
attachments: make_producer(&*config, &mut reused_producers, KafkaTopic::Attachments)?,
events: make_producer(&*config, &mut reused_producers, KafkaTopic::Events)?,
transactions: make_producer(&*config, &mut reused_producers, KafkaTopic::Transactions)?,
sessions: make_producer(&*config, &mut reused_producers, KafkaTopic::Sessions)?,
metrics: make_producer(&*config, &mut reused_producers, KafkaTopic::Metrics)?,
};

Ok(Self { config, producers })
}

fn produce(&self, topic: KafkaTopic, message: KafkaMessage) -> Result<(), StoreError> {
Expand All @@ -86,10 +143,13 @@ impl StoreForwarder {
.key(&key)
.payload(&serialized);

match self.producer.send(record) {
Ok(_) => Ok(()),
Err((kafka_error, _message)) => Err(StoreError::SendFailed(kafka_error)),
if let Some(producer) = self.producers.get(topic) {
producer
.send(record)
.map_err(|(kafka_error, _message)| StoreError::SendFailed(kafka_error))?;
}

Ok(())
}

fn produce_attachment_chunks(
Expand Down
Loading

0 comments on commit d837444

Please sign in to comment.