Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Parquet writer stalls at a certain column size for Utf8 dtypes. #1292

Closed
ritchie46 opened this issue Nov 5, 2022 · 1 comment · Fixed by #1293
Closed

Parquet writer stalls at a certain column size for Utf8 dtypes. #1292

ritchie46 opened this issue Nov 5, 2022 · 1 comment · Fixed by #1293
Labels
no-changelog Issues whose changes are covered by a PR and thus should not be shown in the changelog

Comments

@ritchie46
Copy link
Collaborator

ritchie46 commented Nov 5, 2022

Continued from pola-rs/polars#3845, but worked out in arrow2 only code extracted from the examples.

Invoking the example below with cargo run --release 150_000_000 ran in 6 seconds.

But when we invoke with cargo run --release 175_000_000 the program completely comes to a freeze. The memory is still slightly increasing and will consume much more memory than we'd expect by extrapolating from previous example (some sort of user stack increasing?).

I killed the process after 5 minus as it doesn't seem to finish.

Two things I found.

  • It happens with Utf8 columns, not with numerical types.
  • If we don't slice in chunks, memory explodes when writing.

There is a stacktrace in the issue upstream.

Reproducable example

use std::fs::File;

use arrow2::{
    array::{Array, Int32Array},
    chunk::Chunk,
    datatypes::{Field, Schema},
    error::Result,
    io::parquet::write::{
        transverse, CompressionOptions, Encoding, FileWriter, RowGroupIterator, Version,
        WriteOptions,
    },
};
use arrow2::array::Utf8Array;
use arrow2::datatypes::DataType::LargeUtf8;

fn write_chunks(path: &str, schema: Schema, chunks: Vec<Chunk<Box<dyn Array>>>) -> Result<()> {
    let options = WriteOptions {
        write_statistics: true,
        compression: CompressionOptions::Zstd(None),
        version: Version::V2,
    };


    let encodings = schema
        .fields
        .iter()
        .map(|f| transverse(&f.data_type, |_| Encoding::Plain))
        .collect();

    let row_groups = RowGroupIterator::try_new(chunks.into_iter().map(Ok), &schema, options, encodings)?;

    // Create a new empty file
    let file = File::create(path)?;

    let mut writer = FileWriter::try_new(file, schema, options)?;

    for group in row_groups {
        writer.write(group?)?;
    }
    let _size = writer.end(None)?;
    Ok(())
}

fn main() -> Result<()> {
    let args: Vec<String> = std::env::args().collect();

    let n = args[1].replace("_", "").parse::<usize>().unwrap();

    let iter = std::iter::repeat("40240gh32152n").take(n);
    let array = Utf8Array::<i64>::from_iter_values(iter).boxed();

    let field = Field::new("c1", array.data_type().clone(), true);
    let schema = Schema::from(vec![field]);

    let chunk_size = 1_000_000;
    let mut chunks = Vec::with_capacity(n / chunk_size + 1);
    let mut offset = 0;
    while offset < n {
        chunks.push(Chunk::new(vec![array.slice(offset, chunk_size)]));
        offset += chunk_size;
    }

    write_chunks("test.parquet", schema, chunks)
}

GDB stacktrace

If we ask gdb for a stacktrace on the moment the program freezes we get the following trace. This agrees with the culprits found in the flamegraph.

0x00007ffff7f832f7 in __libc_write (fd=4, buf=0x7fffffffb31f, nbytes=1)
    at ../sysdeps/unix/sysv/linux/write.c:26
26	../sysdeps/unix/sysv/linux/write.c: No such file or directory.
(gdb) backtrace
#0  0x00007ffff7f832f7 in __libc_write (fd=4, buf=0x7fffffffb31f, nbytes=1)
    at ../sysdeps/unix/sysv/linux/write.c:26
#1  0x00005555556cbec3 in std::sys::unix::fd::FileDesc::write ()
    at library/std/src/sys/unix/fd.rs:152
#2  std::sys::unix::net::Socket::write () at library/std/src/sys/unix/net.rs:303
#3  std::os::unix::net::datagram::UnixDatagram::send ()
    at library/std/src/os/unix/net/datagram.rs:572
#4  0x00005555555884fc in std::io::impls::<impl std::io::Write for &mut W>::write ()
    at /rustc/7fcf850d7942804990a1d2e3fe036622a0fe4c74/library/std/src/io/impls.rs:57
#5  std::io::impls::<impl std::io::Write for &mut W>::write ()
    at /rustc/7fcf850d7942804990a1d2e3fe036622a0fe4c74/library/std/src/io/impls.rs:57
#6  <parquet_format_safe::thrift::protocol::compact_write::TCompactOutputProtocol<T> as parquet_format_safe::thrift::protocol::TOutputProtocol>::write_byte ()
    at /home/ritchie46/.cargo/registry/src/github.com-1ecc6299db9ec823/parquet-format-safe-0.2.4/src/thrift/protocol/compact_write.rs:247
#7  parquet_format_safe::thrift::protocol::compact_write::TCompactOutputProtocol<T>::write_field_header ()
    at /home/ritchie46/.cargo/registry/src/github.com-1ecc6299db9ec823/parquet-format-safe-0.2.4/src/thrift/protocol/compact_write.rs:50
#8  0x00005555555880b9 in <parquet_format_safe::thrift::protocol::compact_write::TCompactOutputProtocol<T> as parquet_format_safe::thrift::protocol::TOutputProtocol>::write_field_begin ()
    at /home/ritchie46/.cargo/registry/src/github.com-1ecc6299db9ec823/parquet-format-safe-0.2.4/src/thrift/protocol/compact_write.rs:130
#9  0x000055555556a8fc in parquet_format_safe::parquet_format::Statistics::write_to_out_protocol ()
    at /home/ritchie46/.cargo/registry/src/github.com-1ecc6299db9ec823/parquet-format-safe-0.2.4/src/parquet_format.rs:804
#10 0x0000555555572954 in parquet_format_safe::parquet_format::DataPageHeaderV2::write_to_out_protocol ()
    at /home/ritchie46/.cargo/registry/src/github.com-1ecc6299db9ec823/parquet-format-safe-0.2.4/src/parquet_format.rs:4067
#11 0x000055555556a275 in parquet_format_safe::parquet_format::PageHeader::write_to_out_protocol ()
    at /home/ritchie46/.cargo/registry/src/github.com-1ecc6299db9ec823/parquet-format-safe-0.2.4/src/parquet_format.rs:5295
#12 0x000055555557d36e in parquet2::write::page::write_page_header ()
    at /home/ritchie46/.cargo/registry/src/github.com-1ecc6299db9ec823/parquet2-0.16.3/src/write/page.rs:203
#13 parquet2::write::page::write_page ()
    at /home/ritchie46/.cargo/registry/src/github.com-1ecc6299db9ec823/parquet2-0.16.3/src/write/page.rs:67
#14 0x000055555558d6c4 in parquet2::write::column_chunk::write_column_chunk ()
    at /home/ritchie46/.cargo/registry/src/github.com-1ecc6299db9ec823/parquet2-0.16.3/src/write/column_chunk.rs:46
#15 0x000055555557fd59 in parquet2::write::row_group::write_row_group::{{closure}} ()
    at /home/ritchie46/.cargo/registry/src/github.com-1ecc6299db9ec823/parquet2-0.16.3/src/write/row_group.rs:100
#16 core::iter::adapters::map::map_try_fold::{{closure}} ()
    at /rustc/7fcf850d7942804990a1d2e3fe036622a0fe4c74/library/core/src/iter/adapters/map.rs:91
#17 core::iter::traits::iterator::Iterator::try_fold ()
    at /rustc/7fcf850d7942804990a1d2e3fe036622a0fe4c74/library/core/src/iter/traits/iterator.rs:2238
#18 <core::iter::adapters::map::Map<I,F> as core::iter::traits::iterator::Iterator>::try_fold ()
    at /rustc/7fcf850d7942804990a1d2e3fe036622a0fe4c74/library/core/src/iter/adapters/map.rs:117
#19 0x0000555555586e1a in <core::iter::adapters::GenericShunt<I,R> as core::iter::traits::iterator::Iterator>::try_fold ()
    at /rustc/7fcf850d7942804990a1d2e3fe036622a0fe4c74/library/core/src/iter/adapters/mod.rs:195
@ritchie46 ritchie46 changed the title Parquet writer stalls at a certain column size. Parquet writer stalls at a certain column size for Utf8 dtypes. Nov 5, 2022
@ritchie46
Copy link
Collaborator Author

Found the culprit. I think I can fix it.

@jorgecarleitao jorgecarleitao added the no-changelog Issues whose changes are covered by a PR and thus should not be shown in the changelog label Dec 13, 2022
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
no-changelog Issues whose changes are covered by a PR and thus should not be shown in the changelog
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants