Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spooler): Add ability to partition the buffer into multiple ones #4291

Merged
merged 28 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

- Implement zstd http encoding for Relay to Relay communication. ([#4266](https://github.com/getsentry/relay/pull/4266))
- Support empty branches in Pattern alternations. ([#4283](https://github.com/getsentry/relay/pull/4283))
- Add support for partitioning of the `EnvelopeBufferService`. ([#4291](https://github.com/getsentry/relay/pull/4291))

**Internal**:

Expand Down
35 changes: 32 additions & 3 deletions relay-config/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::collections::{BTreeMap, HashMap};
use std::error::Error;
use std::io::Write;
use std::net::{IpAddr, SocketAddr, ToSocketAddrs};
use std::num::NonZeroU8;
use std::path::{Path, PathBuf};
use std::str::FromStr;
use std::time::Duration;
Expand Down Expand Up @@ -911,6 +912,11 @@ fn spool_max_backpressure_memory_percent() -> f32 {
0.9
}

/// Default number of partitions for the buffer.
fn spool_envelopes_partitions() -> NonZeroU8 {
NonZeroU8::new(1).unwrap()
}

/// Persistent buffering configuration for incoming envelopes.
#[derive(Debug, Serialize, Deserialize)]
pub struct EnvelopeSpool {
Expand Down Expand Up @@ -966,6 +972,9 @@ pub struct EnvelopeSpool {
/// Defaults to 90% (5% less than max memory).
#[serde(default = "spool_max_backpressure_memory_percent")]
max_backpressure_memory_percent: f32,
/// Number of partitions of the buffer.
#[serde(default = "spool_envelopes_partitions")]
partitions: NonZeroU8,
/// Version of the spooler.
#[serde(default)]
version: EnvelopeSpoolVersion,
Expand Down Expand Up @@ -1003,6 +1012,7 @@ impl Default for EnvelopeSpool {
disk_usage_refresh_frequency_ms: spool_disk_usage_refresh_frequency_ms(),
max_backpressure_envelopes: spool_max_backpressure_envelopes(),
max_backpressure_memory_percent: spool_max_backpressure_memory_percent(),
partitions: spool_envelopes_partitions(),
version: EnvelopeSpoolVersion::default(),
}
}
Expand Down Expand Up @@ -2142,13 +2152,27 @@ impl Config {
}

/// Returns the path of the buffer file if the `cache.persistent_envelope_buffer.path` is configured.
pub fn spool_envelopes_path(&self) -> Option<PathBuf> {
self.values
///
/// In case a partition with id > 0 is supplied, the filename of the envelopes path will be
/// suffixed with `.{partition_id}`.
pub fn spool_envelopes_path(&self, partition_id: u8) -> Option<PathBuf> {
let mut path = self
.values
.spool
.envelopes
.path
.as_ref()
.map(|path| path.to_owned())
.map(|path| path.to_owned())?;

if partition_id == 0 {
return Some(path);
}

let file_name = path.file_name().and_then(|f| f.to_str())?;
let new_file_name = format!("{}.{}", file_name, partition_id);
path.set_file_name(new_file_name);

Some(path)
}

/// Maximum number of connections to create to buffer file.
Expand Down Expand Up @@ -2210,6 +2234,11 @@ impl Config {
self.values.spool.envelopes.max_backpressure_memory_percent
}

/// Returns the number of partitions for the buffer.
pub fn spool_partitions(&self) -> NonZeroU8 {
self.values.spool.envelopes.partitions
}

/// Returns the maximum size of an event payload in bytes.
pub fn max_event_size(&self) -> usize {
self.values.limits.max_event_size.as_bytes()
Expand Down
4 changes: 2 additions & 2 deletions relay-server/benches/benches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ fn benchmark_envelope_buffer(c: &mut Criterion) {
|envelopes| {
runtime.block_on(async {
let mut buffer =
PolymorphicEnvelopeBuffer::from_config(&config, memory_checker.clone())
PolymorphicEnvelopeBuffer::from_config(0, &config, memory_checker.clone())
.await
.unwrap();
for envelope in envelopes.into_iter() {
Expand Down Expand Up @@ -292,7 +292,7 @@ fn benchmark_envelope_buffer(c: &mut Criterion) {
|envelopes| {
runtime.block_on(async {
let mut buffer =
PolymorphicEnvelopeBuffer::from_config(&config, memory_checker.clone())
PolymorphicEnvelopeBuffer::from_config(0, &config, memory_checker.clone())
.await
.unwrap();
let n = envelopes.len();
Expand Down
5 changes: 3 additions & 2 deletions relay-server/src/endpoints/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use serde::Deserialize;

use crate::envelope::{AttachmentType, Envelope, EnvelopeError, Item, ItemType, Items};
use crate::service::ServiceState;
use crate::services::buffer::EnvelopeBuffer;
use crate::services::buffer::{EnvelopeBuffer, ProjectKeyPair};
use crate::services::outcome::{DiscardReason, Outcome};
use crate::services::processor::{BucketSource, MetricData, ProcessMetrics, ProcessingGroup};
use crate::services::projects::cache::legacy::ValidateEnvelope;
Expand Down Expand Up @@ -293,7 +293,8 @@ fn queue_envelope(
);
envelope.scope(scoping);

match state.envelope_buffer() {
let project_key_pair = ProjectKeyPair::from_envelope(envelope.envelope());
match state.envelope_buffer(project_key_pair) {
Some(buffer) => {
if !buffer.has_capacity() {
return Err(BadStoreRequest::QueueFailed);
Expand Down
33 changes: 19 additions & 14 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ use std::sync::Arc;
use std::time::Duration;

use crate::metrics::{MetricOutcomes, MetricStats};
use crate::services::buffer::{self, EnvelopeBufferService, ObservableEnvelopeBuffer};
use crate::services::buffer::{
ObservableEnvelopeBuffer, PartitionedEnvelopeBuffer, ProjectKeyPair,
};
use crate::services::cogs::{CogsService, CogsServiceRecorder};
use crate::services::global_config::{GlobalConfigManager, GlobalConfigService};
use crate::services::health_check::{HealthCheck, HealthCheckService};
Expand Down Expand Up @@ -63,7 +65,7 @@ pub struct Registry {
pub global_config: Addr<GlobalConfigManager>,
pub legacy_project_cache: Addr<legacy::ProjectCache>,
pub upstream_relay: Addr<UpstreamRelay>,
pub envelope_buffer: Option<ObservableEnvelopeBuffer>,
pub envelope_buffer: PartitionedEnvelopeBuffer,

pub project_cache_handle: ProjectCacheHandle,
}
Expand Down Expand Up @@ -260,22 +262,22 @@ impl ServiceState {
);

let (envelopes_tx, envelopes_rx) = mpsc::channel(config.spool_max_backpressure_envelopes());
let envelope_buffer = EnvelopeBufferService::new(

let envelope_buffer = PartitionedEnvelopeBuffer::create(
config.spool_partitions(),
config.clone(),
memory_stat.clone(),
global_config_rx.clone(),
buffer::Services {
envelopes_tx,
project_cache_handle: project_cache_handle.clone(),
outcome_aggregator: outcome_aggregator.clone(),
test_store: test_store.clone(),
},
)
.map(|b| b.start_in(&mut runner));
envelopes_tx.clone(),
project_cache_handle.clone(),
outcome_aggregator.clone(),
test_store.clone(),
&mut runner,
);

// Keep all the services in one context.
let project_cache_services = legacy::Services {
envelope_buffer: envelope_buffer.as_ref().map(ObservableEnvelopeBuffer::addr),
envelope_buffer: envelope_buffer.clone(),
aggregator: aggregator.clone(),
envelope_processor: processor.clone(),
outcome_aggregator: outcome_aggregator.clone(),
Expand Down Expand Up @@ -356,8 +358,11 @@ impl ServiceState {
}

/// Returns the V2 envelope buffer, if present.
pub fn envelope_buffer(&self) -> Option<&ObservableEnvelopeBuffer> {
self.inner.registry.envelope_buffer.as_ref()
pub fn envelope_buffer(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would change the interface of PartitionedEnvelopeBuffer to behave roughly like ObservableEnvelopeBuffer behaved before. I.e. give it a send(..) and a has_capacity() method, but not expose internals about partitioning.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this when designing the API, but it conflicted with how we access the buffer in the queue_envelope. I wanted to keep that logic like it is right now.

&self,
project_key_pair: ProjectKeyPair,
) -> Option<&ObservableEnvelopeBuffer> {
self.inner.registry.envelope_buffer.buffer(project_key_pair)
}

/// Returns the address of the [`legacy::ProjectCache`] service.
Expand Down
Loading
Loading