-
Notifications
You must be signed in to change notification settings - Fork 421
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: buffered reading of transaction logs #1549
Conversation
ACTION NEEDED delta-rs follows the Conventional Commits specification for release automation. The PR title and description are used as the merge commit message. Please update your PR title and description to match the specification. |
rust/src/delta.rs
Outdated
|
||
let buf_size = 50; // TODO | ||
|
||
// why is mut needed here and is it OK? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To answer the question in the comment here, the mut
is needed because of the log_buffer.next()
below which will consume from the buffer
rust/src/delta.rs
Outdated
// why is mut needed here and is it OK? | ||
let mut log_buffer = { | ||
match max_version { | ||
Some(n) => log_stream.take((n - self.version() + 2).try_into().unwrap()).buffered(buf_size), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some(n) => log_stream.take((n - self.version() + 2).try_into().unwrap()).buffered(buf_size), | |
Some(n) => log_stream.take((n - self.version() + 2).try_into()?).buffered(buf_size), |
I'm not sure off the top of my head, but I think the ?
will work and help ensure that errors get propagated up to the DeltaResult
returned by the function
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This didn't work directly, but I added explicit handling for the error cases
rust/src/delta.rs
Outdated
}; | ||
|
||
while let Some((new_version, actions)) = { | ||
let next_commit = log_buffer.next().await.unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let next_commit = log_buffer.next().await.unwrap(); | |
let next_commit = log_buffer.next().await?; |
This might be warranted as well, unwraps will panic the caller on errors which is not something we want
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case the await
returned an Option
so I just handled it as such in the match below. Now the case when the stream ends before expected is explicitly an error
rust/src/delta.rs
Outdated
/// TODO | ||
pub async fn open_table_with_version_and_log_buffer( | ||
table_uri: impl AsRef<str>, | ||
version: i64, | ||
log_buffer_size: usize, | ||
) -> Result<DeltaTable, DeltaTableError> { | ||
let table = DeltaTableBuilder::from_uri(table_uri) | ||
.with_version(version) | ||
.with_log_buffer_size(log_buffer_size) | ||
.load() | ||
.await?; | ||
Ok(table) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have been discouraging more use of these top level methods, I would prefer this function not be added and instead users who are interested in buffering to use the table builder method
rust/src/builder.rs
Outdated
@@ -153,6 +169,12 @@ impl DeltaTableBuilder { | |||
self | |||
} | |||
|
|||
/// Sets `log_buffer_size` to the builder | |||
pub fn with_log_buffer_size(mut self, log_buffer_size: usize) -> Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub fn with_log_buffer_size(mut self, log_buffer_size: usize) -> Self { | |
pub fn with_buffer(mut self, log_buffer_size: usize) -> Self { |
IMHO this can be more concise
e3a913c
to
6c64eb2
Compare
c8e5a79
to
e32d71f
Compare
Did some refactoring and revisions according to suggestions, and also exposed the parameter to Python. Confirmed the speed-up locally against a table in S3, |
@@ -365,7 +388,7 @@ lazy_static::lazy_static! { | |||
/// Extra slashes will be removed from the end path as well. | |||
/// | |||
/// Will return an error if the location is not valid. For example, | |||
pub(crate) fn ensure_table_uri(table_uri: impl AsRef<str>) -> DeltaResult<Url> { | |||
pub fn ensure_table_uri(table_uri: impl AsRef<str>) -> DeltaResult<Url> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This I figured might not be a good idea, but is there some other way of using the function in the test?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To keep it private, we can keep the tests within this file, in the tests section at the bottom.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, then I have some trouble with the mod fs_common
where I put the SlowStore implementation. And the tests are not really testing builder per se. Would it make sense to just make a copy of this function in fs_common
for the purposes of the test?
rust/src/delta.rs
Outdated
Some((v, Ok(x))) => Ok(Some((v, self.get_actions(v, x.bytes().await?).await?))), | ||
Some((_, Err(ObjectStoreError::NotFound { .. }))) => Ok(None), | ||
Some((_, Err(err))) => Err(DeltaTableError::GenericError { source: Box::new(err) }), // TODO ?? | ||
None => Err(DeltaTableError::Generic(String::from("Log stream closed unexpectedly!"))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding is perhaps a bit hazy, but isn't a None
at the end of the stream an expected condition? Won't this always happen once the stream is at the end?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah you're right, this is expected. I made this an Ok case and also updated the .take
call to take max_version - self.version()
elements instead of +2. Had some confusion there earlier due to this error.
rust/tests/fs_common/mod.rs
Outdated
pub struct SlowStore { | ||
inner: DeltaObjectStore | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😃
rust/tests/read_delta_test.rs
Outdated
let max_iter = 10; | ||
let buf_size = 10; | ||
|
||
let location = deltalake::builder::ensure_table_uri(path).unwrap(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm on the fence about letting this API leak out here, but let's roll with it for now
4490249
to
caa18f0
Compare
rust/src/delta.rs
Outdated
while let PeekCommit::New(new_version, actions) = | ||
self.peek_next_commit(self.version()).await? | ||
let max_version = max_version.and_then(|x| { | ||
if x <= self.version() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rtyler I refactored this max_version handling a bit so it's clearer. Here I made explicit the behavior that's on main
currently: i.e. the table will be updated to the latest version if max_version is smaller or equal to the current version.
Did some refactoring and fixes and added a test to check that results are correct with different buffer sizes. I will still think a bit about how to limit the number of requests, as now there can be potentially a lot of unnecessary ones if max_version is not set |
a7608d2
to
b187374
Compare
@@ -540,14 +550,78 @@ impl DeltaTable { | |||
self.version(), | |||
); | |||
|
|||
while let PeekCommit::New(new_version, actions) = | |||
self.peek_next_commit(self.version()).await? | |||
// update to latest version if given max_version is not larger than current version |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rtyler I ended up adding this bigger change to list the files to find out what the latest version is. The code is mostly copied over from another module so it needs refactoring. It's somewhat orthogonal to the buffered reading changes, but unless I'm missing something
- it's necessary in order to make sure the load actually completes
- it lets get requests to be buffered without creating any wasted requests (and thus the user doesn't need to be made aware of potential extra costs)
I understand though that this is a more major change because listing can be costly with big logs and in particular with no prefix filtering. And as far as I understood the protocol doesn't prescribe how to do this, but it does sort of hint at listing first being the way to read tables.
6da464f
to
c591971
Compare
Head branch was pushed to by a user without write access
cfd41e5
to
41cfafb
Compare
Rebased on main |
Description
This PR introduces two changes to how the commit log is read:
list_with_offset
. This helps avoid a potential infinite loop in a slow reader/fast writer scenario, and makes it possible to buffer GET requests without making any unnecessary ones. This listing logic is now also used for the time-travel reading.Related Issue(s)
Documentation