Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spooler): Add ability to partition the buffer into multiple ones #4291

Merged
merged 28 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

- Implement zstd http encoding for Relay to Relay communication. ([#4266](https://github.com/getsentry/relay/pull/4266))
- Support empty branches in Pattern alternations. ([#4283](https://github.com/getsentry/relay/pull/4283))
- Add support for partitioning of the `EnvelopeBufferService`. ([#4291](https://github.com/getsentry/relay/pull/4291))

**Internal**:

Expand Down
14 changes: 14 additions & 0 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -911,6 +911,11 @@ fn spool_max_backpressure_memory_percent() -> f32 {
0.9
}

/// Default number of partitions for the buffer.
fn spool_envelopes_partitions() -> u32 {
1
}

/// Persistent buffering configuration for incoming envelopes.
#[derive(Debug, Serialize, Deserialize)]
pub struct EnvelopeSpool {
Expand Down Expand Up @@ -966,6 +971,9 @@ pub struct EnvelopeSpool {
/// Defaults to 90% (5% less than max memory).
#[serde(default = "spool_max_backpressure_memory_percent")]
max_backpressure_memory_percent: f32,
/// Number of partitions of the buffer.
#[serde(default = "spool_envelopes_partitions")]
partitions: u32,
iambriccardo marked this conversation as resolved.
Show resolved Hide resolved
/// Version of the spooler.
#[serde(default)]
version: EnvelopeSpoolVersion,
Expand Down Expand Up @@ -1003,6 +1011,7 @@ impl Default for EnvelopeSpool {
disk_usage_refresh_frequency_ms: spool_disk_usage_refresh_frequency_ms(),
max_backpressure_envelopes: spool_max_backpressure_envelopes(),
max_backpressure_memory_percent: spool_max_backpressure_memory_percent(),
partitions: spool_envelopes_partitions(),
version: EnvelopeSpoolVersion::default(),
}
}
Expand Down Expand Up @@ -2210,6 +2219,11 @@ impl Config {
self.values.spool.envelopes.max_backpressure_memory_percent
}

/// Returns the number of partitions for the buffer.
pub fn spool_partitions(&self) -> u32 {
self.values.spool.envelopes.partitions
}

/// Returns the maximum size of an event payload in bytes.
pub fn max_event_size(&self) -> usize {
self.values.limits.max_event_size.as_bytes()
Expand Down
4 changes: 2 additions & 2 deletions relay-server/benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ fn benchmark_envelope_buffer(c: &mut Criterion) {
|envelopes| {
runtime.block_on(async {
let mut buffer =
PolymorphicEnvelopeBuffer::from_config(&config, memory_checker.clone())
PolymorphicEnvelopeBuffer::from_config(0, &config, memory_checker.clone())
.await
.unwrap();
for envelope in envelopes.into_iter() {
Expand Down Expand Up @@ -292,7 +292,7 @@ fn benchmark_envelope_buffer(c: &mut Criterion) {
|envelopes| {
runtime.block_on(async {
let mut buffer =
PolymorphicEnvelopeBuffer::from_config(&config, memory_checker.clone())
PolymorphicEnvelopeBuffer::from_config(0, &config, memory_checker.clone())
.await
.unwrap();
let n = envelopes.len();
Expand Down
5 changes: 3 additions & 2 deletions relay-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use serde::Deserialize;

use crate::envelope::{AttachmentType, Envelope, EnvelopeError, Item, ItemType, Items};
use crate::service::ServiceState;
use crate::services::buffer::EnvelopeBuffer;
use crate::services::buffer::{EnvelopeBuffer, ProjectKeyPair};
use crate::services::outcome::{DiscardReason, Outcome};
use crate::services::processor::{BucketSource, MetricData, ProcessMetrics, ProcessingGroup};
use crate::services::projects::cache::legacy::ValidateEnvelope;
Expand Down Expand Up @@ -293,7 +293,8 @@ fn queue_envelope(
);
envelope.scope(scoping);

match state.envelope_buffer() {
let project_key_pair = ProjectKeyPair::from_envelope(envelope.envelope());
match state.envelope_buffer(project_key_pair) {
Some(buffer) => {
if !buffer.has_capacity() {
return Err(BadStoreRequest::QueueFailed);
Expand Down
58 changes: 39 additions & 19 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ use std::sync::Arc;
use std::time::Duration;

use crate::metrics::{MetricOutcomes, MetricStats};
use crate::services::buffer::{self, EnvelopeBufferService, ObservableEnvelopeBuffer};
use crate::services::buffer::{
self, EnvelopeBufferService, ObservableEnvelopeBuffer, PartitionedEnvelopeBuffer,
ProjectKeyPair,
};
use crate::services::cogs::{CogsService, CogsServiceRecorder};
use crate::services::global_config::{GlobalConfigManager, GlobalConfigService};
use crate::services::health_check::{HealthCheck, HealthCheckService};
Expand Down Expand Up @@ -63,7 +66,7 @@ pub struct Registry {
pub global_config: Addr<GlobalConfigManager>,
pub legacy_project_cache: Addr<legacy::ProjectCache>,
pub upstream_relay: Addr<UpstreamRelay>,
pub envelope_buffer: Option<ObservableEnvelopeBuffer>,
pub partitioned_buffer: PartitionedEnvelopeBuffer,

pub project_cache_handle: ProjectCacheHandle,
}
Expand Down Expand Up @@ -260,22 +263,33 @@ impl ServiceState {
);

let (envelopes_tx, envelopes_rx) = mpsc::channel(config.spool_max_backpressure_envelopes());
let envelope_buffer = EnvelopeBufferService::new(
config.clone(),
memory_stat.clone(),
global_config_rx.clone(),
buffer::Services {
envelopes_tx,
project_cache_handle: project_cache_handle.clone(),
outcome_aggregator: outcome_aggregator.clone(),
test_store: test_store.clone(),
},
)
.map(|b| b.start_in(&mut runner));

let mut envelope_buffers = Vec::with_capacity(config.spool_partitions() as usize);
for partition_id in 0..config.spool_partitions() {
let envelope_buffer = EnvelopeBufferService::new(
partition_id,
config.clone(),
memory_stat.clone(),
global_config_rx.clone(),
buffer::Services {
envelopes_tx: envelopes_tx.clone(),
project_cache_handle: project_cache_handle.clone(),
outcome_aggregator: outcome_aggregator.clone(),
test_store: test_store.clone(),
},
)
.map(|b| b.start_in(&mut runner));

if let Some(envelope_buffer) = envelope_buffer {
envelope_buffers.push(envelope_buffer);
}
}
let partitioned_buffer =
PartitionedEnvelopeBuffer::new(envelope_buffers, config.spool_partitions());
iambriccardo marked this conversation as resolved.
Show resolved Hide resolved

// Keep all the services in one context.
let project_cache_services = legacy::Services {
envelope_buffer: envelope_buffer.as_ref().map(ObservableEnvelopeBuffer::addr),
partitioned_buffer: partitioned_buffer.clone(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would still call this field envelope_buffer, the use does not really care whether it partitions internally or not.

aggregator: aggregator.clone(),
envelope_processor: processor.clone(),
outcome_aggregator: outcome_aggregator.clone(),
Expand All @@ -300,7 +314,7 @@ impl ServiceState {
MemoryChecker::new(memory_stat.clone(), config.clone()),
aggregator_handle,
upstream_relay.clone(),
envelope_buffer.clone(),
partitioned_buffer.clone(),
));

runner.start(RelayStats::new(
Expand All @@ -326,7 +340,7 @@ impl ServiceState {
legacy_project_cache,
project_cache_handle,
upstream_relay,
envelope_buffer,
partitioned_buffer,
};

let state = StateInner {
Expand Down Expand Up @@ -356,8 +370,14 @@ impl ServiceState {
}

/// Returns the V2 envelope buffer, if present.
pub fn envelope_buffer(&self) -> Option<&ObservableEnvelopeBuffer> {
self.inner.registry.envelope_buffer.as_ref()
pub fn envelope_buffer(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would change the interface of PartitionedEnvelopeBuffer to behave roughly like ObservableEnvelopeBuffer behaved before. I.e. give it a send(..) and a has_capacity() method, but not expose internals about partitioning.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this when designing the API, but it conflicted with how we access the buffer in the queue_envelope. I wanted to keep that logic like it is right now.

&self,
project_key_pair: ProjectKeyPair,
) -> Option<&ObservableEnvelopeBuffer> {
self.inner
.registry
.partitioned_buffer
.buffer(project_key_pair)
}

/// Returns the address of the [`legacy::ProjectCache`] service.
Expand Down
11 changes: 6 additions & 5 deletions relay-server/src/services/buffer/envelope_buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,13 @@ impl PolymorphicEnvelopeBuffer {
/// Creates either a memory-based or a disk-based envelope buffer,
/// depending on the given configuration.
pub async fn from_config(
partition_id: u32,
config: &Config,
memory_checker: MemoryChecker,
) -> Result<Self, EnvelopeBufferError> {
let buffer = if config.spool_envelopes_path().is_some() {
relay_log::trace!("PolymorphicEnvelopeBuffer: initializing sqlite envelope buffer");
let buffer = EnvelopeBuffer::<SqliteStackProvider>::new(config).await?;
let buffer = EnvelopeBuffer::<SqliteStackProvider>::new(partition_id, config).await?;
Self::Sqlite(buffer)
} else {
relay_log::trace!("PolymorphicEnvelopeBuffer: initializing memory envelope buffer");
Expand Down Expand Up @@ -232,11 +233,11 @@ impl EnvelopeBuffer<MemoryStackProvider> {
#[allow(dead_code)]
impl EnvelopeBuffer<SqliteStackProvider> {
/// Creates an empty sqlite-based buffer.
pub async fn new(config: &Config) -> Result<Self, EnvelopeBufferError> {
pub async fn new(partition_id: u32, config: &Config) -> Result<Self, EnvelopeBufferError> {
Ok(Self {
stacks_by_project: Default::default(),
priority_queue: Default::default(),
stack_provider: SqliteStackProvider::new(config).await?,
stack_provider: SqliteStackProvider::new(partition_id, config).await?,
total_count: Arc::new(AtomicI64::new(0)),
total_count_initialized: false,
})
Expand Down Expand Up @@ -1012,8 +1013,8 @@ mod tests {
.into_string()
.unwrap();
let config = mock_config(&path);
let mut store = SqliteEnvelopeStore::prepare(&config).await.unwrap();
let mut buffer = EnvelopeBuffer::<SqliteStackProvider>::new(&config)
let mut store = SqliteEnvelopeStore::prepare(0, &config).await.unwrap();
let mut buffer = EnvelopeBuffer::<SqliteStackProvider>::new(0, &config)
.await
.unwrap();

Expand Down
30 changes: 28 additions & 2 deletions relay-server/src/services/buffer/envelope_store/sqlite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ pub enum SqliteEnvelopeStoreError {
#[error("no file path for the spool was provided")]
NoFilePath,

#[error("no file name for the spool was provided")]
NoFileName,

#[error("failed to migrate the database: {0}")]
MigrationError(MigrateError),

Expand Down Expand Up @@ -302,12 +305,35 @@ impl SqliteEnvelopeStore {

/// Prepares the [`SqliteEnvelopeStore`] by running all the necessary migrations and preparing
/// the folders where data will be stored.
pub async fn prepare(config: &Config) -> Result<SqliteEnvelopeStore, SqliteEnvelopeStoreError> {
pub async fn prepare(
partition_id: u32,
config: &Config,
) -> Result<SqliteEnvelopeStore, SqliteEnvelopeStoreError> {
// If no path is provided, we can't do disk spooling.
let Some(path) = config.spool_envelopes_path() else {
let Some(mut path) = config.spool_envelopes_path() else {
return Err(SqliteEnvelopeStoreError::NoFilePath);
};

// Modify the filename to include the partition_id
let file_name = path
.file_name()
.and_then(|f| f.to_str())
.ok_or(SqliteEnvelopeStoreError::NoFileName)?;
if let Some(extension) = path.extension().and_then(|e| e.to_str()) {
let new_file_name = format!(
"{}_{}.{}",
file_name
.strip_suffix(&format!(".{}", extension))
.unwrap_or(file_name),
partition_id,
extension
);
path.set_file_name(new_file_name);
} else {
let new_file_name = format!("{}_{}", file_name, partition_id);
path.set_file_name(new_file_name);
}
iambriccardo marked this conversation as resolved.
Show resolved Hide resolved

relay_log::info!("buffer file {}", path.to_string_lossy());

Self::setup(&path).await?;
Expand Down
Loading
Loading