diff --git a/benches/read_parquet.rs b/benches/read_parquet.rs index 55ce25fc96d..3e8584e0f6e 100644 --- a/benches/read_parquet.rs +++ b/benches/read_parquet.rs @@ -6,9 +6,18 @@ use criterion::{criterion_group, criterion_main, Criterion}; use arrow2::error::Result; use arrow2::io::parquet::read; -fn to_buffer(size: usize) -> Vec { +fn to_buffer(size: usize, dict: bool, multi_page: bool, compressed: bool) -> Vec { let dir = env!("CARGO_MANIFEST_DIR"); - let path = PathBuf::from(dir).join(format!("fixtures/pyarrow3/v1/benches_{}.parquet", size)); + + let dict = if dict { "dict/" } else { "" }; + let multi_page = if multi_page { "multi/" } else { "" }; + let compressed = if compressed { "snappy/" } else { "" }; + + let path = PathBuf::from(dir).join(format!( + "fixtures/pyarrow3/v1/{}{}{}benches_{}.parquet", + dict, multi_page, compressed, size + )); + let metadata = fs::metadata(&path).expect("unable to read metadata"); let mut file = fs::File::open(path).unwrap(); let mut buffer = vec![0; metadata.len() as usize]; @@ -16,7 +25,7 @@ fn to_buffer(size: usize) -> Vec { buffer } -fn read_decompressed_pages(buffer: &[u8], size: usize, column: usize) -> Result<()> { +fn read_batch(buffer: &[u8], size: usize, column: usize) -> Result<()> { let file = Cursor::new(buffer); let reader = read::RecordReader::try_new(file, Some(vec![column]), None, None, None)?; @@ -31,26 +40,38 @@ fn read_decompressed_pages(buffer: &[u8], size: usize, column: usize) -> Result< fn add_benchmark(c: &mut Criterion) { (10..=20).step_by(2).for_each(|i| { let size = 2usize.pow(i); - let buffer = to_buffer(size); + let buffer = to_buffer(size, false, false, false); let a = format!("read i64 2^{}", i); - c.bench_function(&a, |b| { - b.iter(|| read_decompressed_pages(&buffer, size * 8, 0).unwrap()) - }); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size * 8, 0).unwrap())); let a = format!("read utf8 2^{}", i); - c.bench_function(&a, |b| { - b.iter(|| read_decompressed_pages(&buffer, size * 8, 2).unwrap()) - }); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size * 8, 2).unwrap())); let a = format!("read utf8 large 2^{}", i); - c.bench_function(&a, |b| { - b.iter(|| read_decompressed_pages(&buffer, size * 8, 6).unwrap()) - }); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size * 8, 6).unwrap())); let a = format!("read bool 2^{}", i); - c.bench_function(&a, |b| { - b.iter(|| read_decompressed_pages(&buffer, size * 8, 3).unwrap()) - }); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size * 8, 3).unwrap())); + + let buffer = to_buffer(size, true, false, false); + let a = format!("read utf8 dict 2^{}", i); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size * 8, 2).unwrap())); + + let buffer = to_buffer(size, false, false, true); + let a = format!("read i64 snappy 2^{}", i); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size * 8, 0).unwrap())); + + let buffer = to_buffer(size, false, true, false); + let a = format!("read utf8 multi 2^{}", i); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size * 8, 2).unwrap())); + + let buffer = to_buffer(size, false, true, true); + let a = format!("read utf8 multi snappy 2^{}", i); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size * 8, 2).unwrap())); + + let buffer = to_buffer(size, false, true, true); + let a = format!("read i64 multi snappy 2^{}", i); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size * 8, 0).unwrap())); }); } diff --git a/parquet_integration/write_parquet.py b/parquet_integration/write_parquet.py index 0d9e556216d..165d916939f 100644 --- a/parquet_integration/write_parquet.py +++ b/parquet_integration/write_parquet.py @@ -11,7 +11,9 @@ def case_basic_nullable(size=1): float64 = [0.0, 1.0, None, 3.0, None, 5.0, 6.0, 7.0, None, 9.0] string = ["Hello", None, "aa", "", None, "abc", None, None, "def", "aaa"] boolean = [True, None, False, False, None, True, None, None, True, True] - string_large = ["ABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDšŸ˜ƒšŸŒššŸ•³šŸ‘Š"] * 10 + string_large = [ + "ABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDšŸ˜ƒšŸŒššŸ•³šŸ‘Š" + ] * 10 decimal = [Decimal(e) if e is not None else None for e in int64] fields = [ @@ -23,9 +25,9 @@ def case_basic_nullable(size=1): pa.field("uint32", pa.uint32()), pa.field("string_large", pa.utf8()), # decimal testing - pa.field("decimal_9", pa.decimal128(9,0)), - pa.field("decimal_18", pa.decimal128(18,0)), - pa.field("decimal_26", pa.decimal128(26,0)), + pa.field("decimal_9", pa.decimal128(9, 0)), + pa.field("decimal_18", pa.decimal128(18, 0)), + pa.field("decimal_26", pa.decimal128(26, 0)), ] schema = pa.schema(fields) @@ -67,9 +69,9 @@ def case_basic_required(size=1): nullable=False, ), pa.field("uint32", pa.uint32(), nullable=False), - pa.field("decimal_9", pa.decimal128(9,0), nullable=False), - pa.field("decimal_18", pa.decimal128(18,0), nullable=False), - pa.field("decimal_26", pa.decimal128(26,0), nullable=False), + pa.field("decimal_9", pa.decimal128(9, 0), nullable=False), + pa.field("decimal_18", pa.decimal128(18, 0), nullable=False), + pa.field("decimal_26", pa.decimal128(26, 0), nullable=False), ] schema = pa.schema(fields) @@ -156,13 +158,36 @@ def case_nested(size): ) -def write_pyarrow(case, size=1, page_version=1, use_dictionary=False): +def write_pyarrow( + case, + size: int, + page_version: int, + use_dictionary: bool, + multiple_pages: bool, + compression: bool, +): data, schema, path = case(size) base_path = f"{PYARROW_PATH}/v{page_version}" if use_dictionary: base_path = f"{base_path}/dict" + if multiple_pages: + base_path = f"{base_path}/multi" + + if compression: + base_path = f"{base_path}/snappy" + + if compression: + compression = "snappy" + else: + compression = None + + if multiple_pages: + data_page_size = 2 ** 10 # i.e. a small number to ensure multiple pages + else: + data_page_size = 2 ** 40 # i.e. a large number to ensure a single page + t = pa.table(data, schema=schema) os.makedirs(base_path, exist_ok=True) pa.parquet.write_table( @@ -170,9 +195,9 @@ def write_pyarrow(case, size=1, page_version=1, use_dictionary=False): f"{base_path}/{path}", row_group_size=2 ** 40, use_dictionary=use_dictionary, - compression=None, + compression=compression, write_statistics=True, - data_page_size=2 ** 40, # i.e. a large number to ensure a single page + data_page_size=data_page_size, data_page_version=f"{page_version}.0", ) @@ -180,7 +205,7 @@ def write_pyarrow(case, size=1, page_version=1, use_dictionary=False): for case in [case_basic_nullable, case_basic_required, case_nested]: for version in [1, 2]: for use_dict in [True, False]: - write_pyarrow(case, 1, version, use_dict) + write_pyarrow(case, 1, version, use_dict, False, False) def case_benches(size): @@ -194,4 +219,13 @@ def case_benches(size): # for read benchmarks for i in range(3 + 10, 3 + 22, 2): - write_pyarrow(case_benches, 2 ** i, 1) # V1 + # two pages (dict) + write_pyarrow(case_benches, 2 ** i, 1, True, False, False) + # single page + write_pyarrow(case_benches, 2 ** i, 1, False, False, False) + # multiple pages + write_pyarrow(case_benches, 2 ** i, 1, False, True, False) + # multiple compressed pages + write_pyarrow(case_benches, 2 ** i, 1, False, True, True) + # single compressed page + write_pyarrow(case_benches, 2 ** i, 1, False, False, True)