Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Is there a good way to make max_bytes_per_file used in Stable data storage version #3393

Open
SaintBacchus opened this issue Jan 18, 2025 · 5 comments · May be fixed by #3435
Open

Is there a good way to make max_bytes_per_file used in Stable data storage version #3393

SaintBacchus opened this issue Jan 18, 2025 · 5 comments · May be fixed by #3435
Labels
bug Something isn't working

Comments

@SaintBacchus
Copy link
Collaborator

Now, with the Stable data storage version, the max_bytes_per_file option did not affect data file size.

Like the example below, the data file has 9000569 bytes in the file.

import lance
import pyarrow as pa

def generate_large_table():
    data = []
    for i in range(1000000):
        if i % 2 == 0:
            data.append({"name": "Alice", "age": 20})
        else:
            data.append({"name": "Bob", "age": 30})
    table = pa.Table.from_pylist(data)
    return table

table = generate_large_table()

lance.write_dataset(table, "./alice_and_bob.lance", max_bytes_per_file=512)

@westonpace is there a good way to limit the data file size?

@SaintBacchus SaintBacchus added the bug Something isn't working label Jan 18, 2025
@wjones127
Copy link
Contributor

So the logic for enforcing max_bytes_per_file looks like:

for batch in data:
    writer.write(batch)
    bytes_written = writer.tell()
    if bytes_written > max_bytes_per_file:
        writer.close()
        writer = new_file()

while let Some(batch_chunk) = buffered_reader.next().await {
let batch_chunk = batch_chunk?;
if writer.is_none() {
let (new_writer, new_fragment) = writer_generator.new_writer().await?;
params.progress.begin(&new_fragment).await?;
writer = Some(new_writer);
fragments.push(new_fragment);
}
writer.as_mut().unwrap().write(&batch_chunk).await?;
for batch in batch_chunk {
num_rows_in_current_file += batch.num_rows() as u32;
}
if num_rows_in_current_file >= params.max_rows_per_file as u32
|| writer.as_mut().unwrap().tell().await? >= params.max_bytes_per_file as u64
{
let (num_rows, data_file) = writer.take().unwrap().finish().await?;
debug_assert_eq!(num_rows, num_rows_in_current_file);
params.progress.complete(fragments.last().unwrap()).await?;
let last_fragment = fragments.last_mut().unwrap();
last_fragment.physical_rows = Some(num_rows as usize);
last_fragment.files.push(data_file);
num_rows_in_current_file = 0;
}
}

So I think the problem you are encountering is if you make the input data 1 large batch, then there is only 1 iteration in that loop. So by the time we realize we've written too much data, it's too late.

To enforce max_rows_per_file, what we do is split the input data into batches with at most max_rows_per_file for each batch.

break_stream(data, params.max_rows_per_file)
.map_ok(|batch| vec![batch])
.boxed()

// Given a stream of record batches, and a desired break point, this will
// make sure that a new record batch is emitted every time `break_point` rows
// have passed.
//
// This method will not combine record batches in any way. For example, if
// the input lengths are [3, 5, 8, 3, 5], and the break point is 10 then the
// output batches will be [3, 5, 2 (break inserted) 6, 3, 1 (break inserted) 4]
pub fn break_stream(
stream: SendableRecordBatchStream,
max_chunk_size: usize,
) -> Pin<Box<dyn Stream<Item = Result<RecordBatch>> + Send>> {

To do something equivalent for bytes, I think the best we could do is attempt to split the in-memory stream into batches based on a computed average bytes per row, targeting some increment like 10 MB batches, so that we don't ever overshoot by too much. What do you think of that?

@SaintBacchus
Copy link
Collaborator Author

Yes, now the break_stream only depends on the max_rows_per_file which will allocate much memory.

It's better to be limited by max_rows_per_file and max_bytes_per_file.

@wjones127
Copy link
Contributor

wjones127 commented Jan 21, 2025

Yes, now the break_stream only depends on the max_rows_per_file which will allocate much memory.

It's better to be limited by max_rows_per_file and max_bytes_per_file.

I'm not sure you understood me. break_stream is how we successfully enforce max_rows_per_file. To enforce max_bytes_per_file, we'll want to modify break_stream() to also attempt to break the batches up based on approximate byte size.

And it doesn't allocate significantly more memory. The smaller batches are zero-copy views into the larger batches.

@SaintBacchus
Copy link
Collaborator Author

OK, I get it now.

@westonpace
Copy link
Contributor

Having a good "bytes chunker" would allow us to remove a sort of ugly hack we have further down in the writer too so I'd be in favor of the idea.

@SaintBacchus SaintBacchus linked a pull request Feb 7, 2025 that will close this issue
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants