Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a custom implementation LocalFileSystem::list_with_offset #7019

Merged
merged 10 commits into from
Feb 8, 2025
219 changes: 155 additions & 64 deletions object_store/src/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -483,71 +483,15 @@ impl ObjectStore for LocalFileSystem {
}

fn list(&self, prefix: Option<&Path>) -> BoxStream<'static, Result<ObjectMeta>> {
let config = Arc::clone(&self.config);

let root_path = match prefix {
Some(prefix) => match config.prefix_to_filesystem(prefix) {
Ok(path) => path,
Err(e) => return futures::future::ready(Err(e)).into_stream().boxed(),
},
None => self.config.root.to_file_path().unwrap(),
};

let walkdir = WalkDir::new(root_path)
// Don't include the root directory itself
.min_depth(1)
.follow_links(true);

let s = walkdir.into_iter().flat_map(move |result_dir_entry| {
let entry = match convert_walkdir_result(result_dir_entry).transpose()? {
Ok(entry) => entry,
Err(e) => return Some(Err(e)),
};

if !entry.path().is_file() {
return None;
}

match config.filesystem_to_path(entry.path()) {
Ok(path) => match is_valid_file_path(&path) {
true => convert_entry(entry, path).transpose(),
false => None,
},
Err(e) => Some(Err(e)),
}
});

// If no tokio context, return iterator directly as no
// need to perform chunked spawn_blocking reads
if tokio::runtime::Handle::try_current().is_err() {
return futures::stream::iter(s).boxed();
}

// Otherwise list in batches of CHUNK_SIZE
const CHUNK_SIZE: usize = 1024;

let buffer = VecDeque::with_capacity(CHUNK_SIZE);
futures::stream::try_unfold((s, buffer), |(mut s, mut buffer)| async move {
if buffer.is_empty() {
(s, buffer) = tokio::task::spawn_blocking(move || {
for _ in 0..CHUNK_SIZE {
match s.next() {
Some(r) => buffer.push_back(r),
None => break,
}
}
(s, buffer)
})
.await?;
}
self.list_with_maybe_offset(prefix, None)
}

match buffer.pop_front() {
Some(Err(e)) => Err(e),
Some(Ok(meta)) => Ok(Some((meta, (s, buffer)))),
None => Ok(None),
}
})
.boxed()
fn list_with_offset(
&self,
prefix: Option<&Path>,
offset: &Path,
) -> BoxStream<'static, Result<ObjectMeta>> {
self.list_with_maybe_offset(prefix, Some(offset))
}

async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
Expand Down Expand Up @@ -678,6 +622,93 @@ impl ObjectStore for LocalFileSystem {
}
}

impl LocalFileSystem {
fn list_with_maybe_offset(
&self,
prefix: Option<&Path>,
maybe_offset: Option<&Path>,
) -> BoxStream<'static, Result<ObjectMeta>> {
let config = Arc::clone(&self.config);

let root_path = match prefix {
Some(prefix) => match config.prefix_to_filesystem(prefix) {
Ok(path) => path,
Err(e) => return futures::future::ready(Err(e)).into_stream().boxed(),
},
None => config.root.to_file_path().unwrap(),
};

let walkdir = WalkDir::new(root_path)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did some digging and I don't think WalkDir guarantees anything about the sort order of the directory entries

thus if the OS returns a different order on the next call to list_with_offset this may end up with a different result

However I think the existing default impl of list_with_offset (which just calls list() and takes the offset) has the same problem so I don't think it needs to be fixed in this PR

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does look to me like you can explicitly set an order via https://docs.rs/walkdir/latest/walkdir/struct.WalkDir.html#method.sort_by

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@alamb I agree with all this and thanks for the helpful comments! I think your concern about not having a deterministic order is a good one and agree this is the existing behavior. I also think we should change this, but propose we do that in a follow-up PR since it does change the behavior and could (potentially) break an assumption somewhere. I think we can just use https://docs.rs/walkdir/latest/walkdir/struct.WalkDir.html#method.sort_by_file_name
which should be fine and I believe matches what the offset filter expects.

// Don't include the root directory itself
.min_depth(1)
.follow_links(true);

let maybe_offset = maybe_offset.cloned();

let s = walkdir.into_iter().flat_map(move |result_dir_entry| {
// Apply offset filter before proceeding, to reduce statx file system calls
// This matters for NFS mounts
if let (Some(offset), Ok(entry)) = (maybe_offset.as_ref(), result_dir_entry.as_ref()) {
let location = config.filesystem_to_path(entry.path());
match location {
Ok(path) if path <= *offset => return None,
Err(e) => return Some(Err(e)),
_ => {}
}
}

let entry = match convert_walkdir_result(result_dir_entry).transpose()? {
corwinjoy marked this conversation as resolved.
Show resolved Hide resolved
Ok(entry) => entry,
Err(e) => return Some(Err(e)),
};

if !entry.path().is_file() {
return None;
}

match config.filesystem_to_path(entry.path()) {
Ok(path) => match is_valid_file_path(&path) {
true => convert_entry(entry, path).transpose(),
false => None,
},
Err(e) => Some(Err(e)),
}
});

// If no tokio context, return iterator directly as no
// need to perform chunked spawn_blocking reads
if tokio::runtime::Handle::try_current().is_err() {
return futures::stream::iter(s).boxed();
}

// Otherwise list in batches of CHUNK_SIZE
const CHUNK_SIZE: usize = 1024;

let buffer = VecDeque::with_capacity(CHUNK_SIZE);
futures::stream::try_unfold((s, buffer), |(mut s, mut buffer)| async move {
if buffer.is_empty() {
(s, buffer) = tokio::task::spawn_blocking(move || {
for _ in 0..CHUNK_SIZE {
match s.next() {
Some(r) => buffer.push_back(r),
None => break,
}
}
(s, buffer)
})
.await?;
}

match buffer.pop_front() {
Some(Err(e)) => Err(e),
Some(Ok(meta)) => Ok(Some((meta, (s, buffer)))),
None => Ok(None),
}
})
.boxed()
}
}

/// Creates the parent directories of `path` or returns an error based on `source` if no parent
fn create_parent_dirs(path: &std::path::Path, source: io::Error) -> Result<()> {
let parent = path.parent().ok_or_else(|| {
Expand Down Expand Up @@ -1401,6 +1432,66 @@ mod tests {
);
}

#[tokio::test]
async fn test_path_with_offset() {
let root = TempDir::new().unwrap();
let integration = LocalFileSystem::new_with_prefix(root.path()).unwrap();

let root_path = root.path();
for i in 0..5 {
let filename = format!("test{}.parquet", i);
let file = root_path.join(filename);
std::fs::write(file, "test").unwrap();
}
let filter_str = "test";
let filter = String::from(filter_str);
let offset_str = filter + "1";
let offset = Path::from(offset_str.clone());

// Use list_with_offset to retrieve files
let res = integration.list_with_offset(None, &offset);
let offset_paths: Vec<_> = res.map_ok(|x| x.location).try_collect().await.unwrap();
let mut offset_files: Vec<_> = offset_paths
.iter()
.map(|x| String::from(x.filename().unwrap()))
.collect();

// Check result with direct filesystem read
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this depends on the file system returning the same order each time. Again this is probably ok but it seems like it could result in a non deterministic test failure over time

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it does not depend on this because later, in line 1485 we sort both result sets before comparing. So the initial sort order that gets returned does not matter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that if the underlying directory list was different, then the values after skipping the first 10 (for example) might be different, even if we sorted the values that came out.

But since this doesn't seem to happen in practice I am not going to worry too much about it

Copy link
Contributor

@tustvold tustvold Feb 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've not had time to review this in detail but a couple of points that may or may not be relevant:

Or to phrase it either way, we shouldn't need to sort, nor should we rely on data being sorted

For context on why we don't guarantee any ordering, it is because many stores, e.g. S3 Express, don't

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense -- thank you

To be clear I don't think this PR changes anything about the object_store operation in the face of inconsistent ordering between calls (but I was speculating on what would happen if the ordering returned by the underlying systems was different between calls)

let files = fs::read_dir(root_path).unwrap();
let filtered_files = files
.filter_map(Result::ok)
.filter_map(|d| {
d.file_name().to_str().and_then(|f| {
if f.contains(filter_str) {
Some(String::from(f))
} else {
None
}
})
})
.collect::<Vec<_>>();

let mut expected_offset_files: Vec<_> = filtered_files
.iter()
.filter(|s| **s > offset_str)
.cloned()
.collect();

fn do_vecs_match<T: PartialEq>(a: &[T], b: &[T]) -> bool {
let matching = a.iter().zip(b.iter()).filter(|&(a, b)| a == b).count();
matching == a.len() && matching == b.len()
}

offset_files.sort();
expected_offset_files.sort();

// println!("Expected Offset Files: {:?}", expected_offset_files);
// println!("Actual Offset Files: {:?}", offset_files);

assert_eq!(offset_files.len(), expected_offset_files.len());
assert!(do_vecs_match(&expected_offset_files, &offset_files));
}

#[tokio::test]
async fn filesystem_filename_with_percent() {
let temp_dir = TempDir::new().unwrap();
Expand Down
Loading