Skip to content

Commit

Permalink
feat(spooler): Add ability to partition the buffer into multiple ones (
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo authored Nov 26, 2024
1 parent a2f9e11 commit 35e9bff
Show file tree
Hide file tree
Showing 14 changed files with 382 additions and 114 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

- Implement zstd http encoding for Relay to Relay communication. ([#4266](https://github.com/getsentry/relay/pull/4266))
- Support empty branches in Pattern alternations. ([#4283](https://github.com/getsentry/relay/pull/4283))
- Add support for partitioning of the `EnvelopeBufferService`. ([#4291](https://github.com/getsentry/relay/pull/4291))

**Internal**:

Expand Down
35 changes: 32 additions & 3 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::{BTreeMap, HashMap};
use std::error::Error;
use std::io::Write;
use std::net::{IpAddr, SocketAddr, ToSocketAddrs};
use std::num::NonZeroU8;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::time::Duration;
Expand Down Expand Up @@ -911,6 +912,11 @@ fn spool_max_backpressure_memory_percent() -> f32 {
0.9
}

/// Default number of partitions for the buffer.
fn spool_envelopes_partitions() -> NonZeroU8 {
NonZeroU8::new(1).unwrap()
}

/// Persistent buffering configuration for incoming envelopes.
#[derive(Debug, Serialize, Deserialize)]
pub struct EnvelopeSpool {
Expand Down Expand Up @@ -966,6 +972,9 @@ pub struct EnvelopeSpool {
/// Defaults to 90% (5% less than max memory).
#[serde(default = "spool_max_backpressure_memory_percent")]
max_backpressure_memory_percent: f32,
/// Number of partitions of the buffer.
#[serde(default = "spool_envelopes_partitions")]
partitions: NonZeroU8,
/// Version of the spooler.
#[serde(default)]
version: EnvelopeSpoolVersion,
Expand Down Expand Up @@ -1003,6 +1012,7 @@ impl Default for EnvelopeSpool {
disk_usage_refresh_frequency_ms: spool_disk_usage_refresh_frequency_ms(),
max_backpressure_envelopes: spool_max_backpressure_envelopes(),
max_backpressure_memory_percent: spool_max_backpressure_memory_percent(),
partitions: spool_envelopes_partitions(),
version: EnvelopeSpoolVersion::default(),
}
}
Expand Down Expand Up @@ -2142,13 +2152,27 @@ impl Config {
}

/// Returns the path of the buffer file if the `cache.persistent_envelope_buffer.path` is configured.
pub fn spool_envelopes_path(&self) -> Option<PathBuf> {
self.values
///
/// In case a partition with id > 0 is supplied, the filename of the envelopes path will be
/// suffixed with `.{partition_id}`.
pub fn spool_envelopes_path(&self, partition_id: u8) -> Option<PathBuf> {
let mut path = self
.values
.spool
.envelopes
.path
.as_ref()
.map(|path| path.to_owned())
.map(|path| path.to_owned())?;

if partition_id == 0 {
return Some(path);
}

let file_name = path.file_name().and_then(|f| f.to_str())?;
let new_file_name = format!("{}.{}", file_name, partition_id);
path.set_file_name(new_file_name);

Some(path)
}

/// Maximum number of connections to create to buffer file.
Expand Down Expand Up @@ -2210,6 +2234,11 @@ impl Config {
self.values.spool.envelopes.max_backpressure_memory_percent
}

/// Returns the number of partitions for the buffer.
pub fn spool_partitions(&self) -> NonZeroU8 {
self.values.spool.envelopes.partitions
}

/// Returns the maximum size of an event payload in bytes.
pub fn max_event_size(&self) -> usize {
self.values.limits.max_event_size.as_bytes()
Expand Down
4 changes: 2 additions & 2 deletions relay-server/benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ fn benchmark_envelope_buffer(c: &mut Criterion) {
|envelopes| {
runtime.block_on(async {
let mut buffer =
PolymorphicEnvelopeBuffer::from_config(&config, memory_checker.clone())
PolymorphicEnvelopeBuffer::from_config(0, &config, memory_checker.clone())
.await
.unwrap();
for envelope in envelopes.into_iter() {
Expand Down Expand Up @@ -292,7 +292,7 @@ fn benchmark_envelope_buffer(c: &mut Criterion) {
|envelopes| {
runtime.block_on(async {
let mut buffer =
PolymorphicEnvelopeBuffer::from_config(&config, memory_checker.clone())
PolymorphicEnvelopeBuffer::from_config(0, &config, memory_checker.clone())
.await
.unwrap();
let n = envelopes.len();
Expand Down
5 changes: 3 additions & 2 deletions relay-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use serde::Deserialize;

use crate::envelope::{AttachmentType, Envelope, EnvelopeError, Item, ItemType, Items};
use crate::service::ServiceState;
use crate::services::buffer::EnvelopeBuffer;
use crate::services::buffer::{EnvelopeBuffer, ProjectKeyPair};
use crate::services::outcome::{DiscardReason, Outcome};
use crate::services::processor::{BucketSource, MetricData, ProcessMetrics, ProcessingGroup};
use crate::services::projects::cache::legacy::ValidateEnvelope;
Expand Down Expand Up @@ -293,7 +293,8 @@ fn queue_envelope(
);
envelope.scope(scoping);

match state.envelope_buffer() {
let project_key_pair = ProjectKeyPair::from_envelope(envelope.envelope());
match state.envelope_buffer(project_key_pair) {
Some(buffer) => {
if !buffer.has_capacity() {
return Err(BadStoreRequest::QueueFailed);
Expand Down
33 changes: 19 additions & 14 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use std::sync::Arc;
use std::time::Duration;

use crate::metrics::{MetricOutcomes, MetricStats};
use crate::services::buffer::{self, EnvelopeBufferService, ObservableEnvelopeBuffer};
use crate::services::buffer::{
ObservableEnvelopeBuffer, PartitionedEnvelopeBuffer, ProjectKeyPair,
};
use crate::services::cogs::{CogsService, CogsServiceRecorder};
use crate::services::global_config::{GlobalConfigManager, GlobalConfigService};
use crate::services::health_check::{HealthCheck, HealthCheckService};
Expand Down Expand Up @@ -63,7 +65,7 @@ pub struct Registry {
pub global_config: Addr<GlobalConfigManager>,
pub legacy_project_cache: Addr<legacy::ProjectCache>,
pub upstream_relay: Addr<UpstreamRelay>,
pub envelope_buffer: Option<ObservableEnvelopeBuffer>,
pub envelope_buffer: PartitionedEnvelopeBuffer,

pub project_cache_handle: ProjectCacheHandle,
}
Expand Down Expand Up @@ -260,22 +262,22 @@ impl ServiceState {
);

let (envelopes_tx, envelopes_rx) = mpsc::channel(config.spool_max_backpressure_envelopes());
let envelope_buffer = EnvelopeBufferService::new(

let envelope_buffer = PartitionedEnvelopeBuffer::create(
config.spool_partitions(),
config.clone(),
memory_stat.clone(),
global_config_rx.clone(),
buffer::Services {
envelopes_tx,
project_cache_handle: project_cache_handle.clone(),
outcome_aggregator: outcome_aggregator.clone(),
test_store: test_store.clone(),
},
)
.map(|b| b.start_in(&mut runner));
envelopes_tx.clone(),
project_cache_handle.clone(),
outcome_aggregator.clone(),
test_store.clone(),
&mut runner,
);

// Keep all the services in one context.
let project_cache_services = legacy::Services {
envelope_buffer: envelope_buffer.as_ref().map(ObservableEnvelopeBuffer::addr),
envelope_buffer: envelope_buffer.clone(),
aggregator: aggregator.clone(),
envelope_processor: processor.clone(),
outcome_aggregator: outcome_aggregator.clone(),
Expand Down Expand Up @@ -356,8 +358,11 @@ impl ServiceState {
}

/// Returns the V2 envelope buffer, if present.
pub fn envelope_buffer(&self) -> Option<&ObservableEnvelopeBuffer> {
self.inner.registry.envelope_buffer.as_ref()
pub fn envelope_buffer(
&self,
project_key_pair: ProjectKeyPair,
) -> Option<&ObservableEnvelopeBuffer> {
self.inner.registry.envelope_buffer.buffer(project_key_pair)
}

/// Returns the address of the [`legacy::ProjectCache`] service.
Expand Down
Loading

0 comments on commit 35e9bff

Please sign in to comment.