-
Notifications
You must be signed in to change notification settings - Fork 94
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(spooler): Add ability to partition the buffer into multiple ones #4291
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
relay-server/src/service.rs
Outdated
|
||
// Keep all the services in one context. | ||
let project_cache_services = legacy::Services { | ||
envelope_buffer: envelope_buffer.as_ref().map(ObservableEnvelopeBuffer::addr), | ||
partitioned_buffer: partitioned_buffer.clone(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I would still call this field envelope_buffer
, the use does not really care whether it partitions internally or not.
@@ -356,8 +370,14 @@ impl ServiceState { | |||
} | |||
|
|||
/// Returns the V2 envelope buffer, if present. | |||
pub fn envelope_buffer(&self) -> Option<&ObservableEnvelopeBuffer> { | |||
self.inner.registry.envelope_buffer.as_ref() | |||
pub fn envelope_buffer( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would change the interface of PartitionedEnvelopeBuffer
to behave roughly like ObservableEnvelopeBuffer
behaved before. I.e. give it a send(..)
and a has_capacity()
method, but not expose internals about partitioning.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about this when designing the API, but it conflicted with how we access the buffer in the queue_envelope
. I wanted to keep that logic like it is right now.
Co-authored-by: Joris Bayer <[email protected]>
Co-authored-by: Joris Bayer <[email protected]>
let mut envelope_buffers = Vec::with_capacity(partitions.get() as usize); | ||
for partition_id in 0..partitions.get() { | ||
let envelope_buffer = EnvelopeBufferService::new( | ||
partition_id, | ||
config.clone(), | ||
memory_stat.clone(), | ||
global_config_rx.clone(), | ||
Services { | ||
envelopes_tx: envelopes_tx.clone(), | ||
project_cache_handle: project_cache_handle.clone(), | ||
outcome_aggregator: outcome_aggregator.clone(), | ||
test_store: test_store.clone(), | ||
}, | ||
) | ||
.map(|b| b.start_in(runner)); | ||
|
||
if let Some(envelope_buffer) = envelope_buffer { | ||
envelope_buffers.push(envelope_buffer); | ||
} | ||
} | ||
|
||
Self { | ||
buffers: Arc::new(envelope_buffers), | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can do this with collect
, though it's probably a matter of taste which is better.
let mut envelope_buffers = Vec::with_capacity(partitions.get() as usize); | |
for partition_id in 0..partitions.get() { | |
let envelope_buffer = EnvelopeBufferService::new( | |
partition_id, | |
config.clone(), | |
memory_stat.clone(), | |
global_config_rx.clone(), | |
Services { | |
envelopes_tx: envelopes_tx.clone(), | |
project_cache_handle: project_cache_handle.clone(), | |
outcome_aggregator: outcome_aggregator.clone(), | |
test_store: test_store.clone(), | |
}, | |
) | |
.map(|b| b.start_in(runner)); | |
if let Some(envelope_buffer) = envelope_buffer { | |
envelope_buffers.push(envelope_buffer); | |
} | |
} | |
Self { | |
buffers: Arc::new(envelope_buffers), | |
} | |
} | |
let envelope_buffers = (0..partitions.get()) | |
.filter_map(|partition_id| { | |
EnvelopeBufferService::new( | |
partition_id, | |
config.clone(), | |
memory_stat.clone(), | |
global_config_rx.clone(), | |
Services { | |
envelopes_tx: envelopes_tx.clone(), | |
project_cache_handle: project_cache_handle.clone(), | |
outcome_aggregator: outcome_aggregator.clone(), | |
test_store: test_store.clone(), | |
}, | |
) | |
}) | |
.map(|b| b.start_in(runner)) | |
.collect(); |
This reverts commit d59258b.
This PR allows the buffer to be partitioned across several partitions. For simplicity, partitioning is achieved at the highest level, meaning that Relay will run
x
independent partitions of the sameEnvelopeBufferService
each with its own database file (if the database config is used).