-
Notifications
You must be signed in to change notification settings - Fork 94
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix(spooler): Proactively move on-disk spool to memory when it fits #2949
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should have a changelog entry for this, people in OPS may notice a different behaviour.
relay_statsd::metric!(counter(RelayCounters::BufferReads) += 1); | ||
|
||
// Stream is empty, we can break the loop, since we read everything by now. | ||
if Pin::new(&mut envelopes).peek().await.is_none() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be able to pin on the stack here via pin!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done in b2b35f2
.fetch(&self.db) | ||
.peekable(); | ||
let mut envelopes = | ||
sql::delete_and_fetch(key, BATCH_SIZE.min(self.buffer_guard.available() as i64)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe move BATCH_SIZE.min(self.buffer_guard.available() as i64)
into a separate method, it's used twice and then we can also give it a short doc string
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done in b2b35f2
error = &err as &dyn Error, | ||
"failed to read the buffer stream from the disk", | ||
); | ||
self.track_count(-count); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does this work here, if the stream contains an error:
- does it end or can we just skip over the error?
- can there be more rows deleted than returned?
The count may not be correct here then, also we maybe shouldn't stop on error but just skip over them?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, not skipping a stream error may result in losing all envelopes?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
skipping the error now and trying to read out all the envelopes instead , done in b2b35f2
|
||
loop { | ||
let mut envelopes = | ||
sql::delete_and_fetch_all(BATCH_SIZE.min(self.buffer_guard.available() as i64)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if buffer_guard.available()
keeps returning 0
, aren't we just spinning in this loop?
I assume this can't happen because we check for the watermark before calling this function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added another check to make sure we won't have this situation, done in b2b35f2
error = &err as &dyn Error, | ||
"failed to read the buffer stream from the disk", | ||
); | ||
self.track_count(-count); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, not skipping a stream error may result in losing all envelopes?
@@ -396,10 +425,19 @@ impl OnDisk { | |||
.try_get("received_at") | |||
.map_err(BufferError::FetchFailed)?; | |||
let start_time = StartTime::from_timestamp_millis(received_at as u64); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isnt it better to fetch received_at as u64 from the DB, i guess that would return an error rather than converting to huge number if negative?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assume this is because SQLite does not know the u64
datatype only i64
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not supported by sqlx, only i64
in this case
/// * fit into memory and take not more than 30% of the configured space | ||
/// * the used buffer guards also must be under the low watermark. | ||
async fn is_below_low_mem_watermark(config: &Config, disk: &OnDisk) -> bool { | ||
((config.spool_envelopes_max_memory_size() as f64 * 0.3) as i64) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe ratio could be a const
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done in b2b35f2
@@ -396,10 +425,19 @@ impl OnDisk { | |||
.try_get("received_at") | |||
.map_err(BufferError::FetchFailed)?; | |||
let start_time = StartTime::from_timestamp_millis(received_at as u64); | |||
let own_key: &str = row.try_get("own_key").map_err(BufferError::FetchFailed)?; | |||
let sampling_key: &str = row | |||
.try_get("sampling_key") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not really this PR, but thoughts on an enum that represents the columns? it could be a nice documentation for how the DB looks like. I assume try_get would accept it if it implements display
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe we can look into this in the followups 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make sure that we switch to in-memory spool faster we can proactively check if the size of the on-disk data fits into memory and move the spooled envelopes in one go.
related to https://github.com/getsentry/team-ingest/issues/257