From c08e54c63a9728181ec01e69ae3f434445b133dc Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sat, 16 Oct 2021 07:13:06 +0000 Subject: [PATCH] Improved bench reporting. --- benches/read_parquet.rs | 18 +++++----- benchmarks/.gitignore | 1 + benchmarks/bench_read.py | 49 ++++++++++++++++++++++++++ benchmarks/run.py | 20 +++++++++++ benchmarks/summarize.py | 51 ++++++++++++++++++++++++++++ parquet_integration/bench_read.py | 26 -------------- parquet_integration/write_parquet.py | 7 ++-- 7 files changed, 133 insertions(+), 39 deletions(-) create mode 100644 benchmarks/.gitignore create mode 100644 benchmarks/bench_read.py create mode 100644 benchmarks/run.py create mode 100644 benchmarks/summarize.py delete mode 100644 parquet_integration/bench_read.py diff --git a/benches/read_parquet.rs b/benches/read_parquet.rs index 3e8584e0f6e..8f536ed6842 100644 --- a/benches/read_parquet.rs +++ b/benches/read_parquet.rs @@ -42,36 +42,36 @@ fn add_benchmark(c: &mut Criterion) { let size = 2usize.pow(i); let buffer = to_buffer(size, false, false, false); let a = format!("read i64 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size * 8, 0).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap())); let a = format!("read utf8 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size * 8, 2).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap())); let a = format!("read utf8 large 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size * 8, 6).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 6).unwrap())); let a = format!("read bool 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size * 8, 3).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 3).unwrap())); let buffer = to_buffer(size, true, false, false); let a = format!("read utf8 dict 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size * 8, 2).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap())); let buffer = to_buffer(size, false, false, true); let a = format!("read i64 snappy 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size * 8, 0).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap())); let buffer = to_buffer(size, false, true, false); let a = format!("read utf8 multi 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size * 8, 2).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap())); let buffer = to_buffer(size, false, true, true); let a = format!("read utf8 multi snappy 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size * 8, 2).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap())); let buffer = to_buffer(size, false, true, true); let a = format!("read i64 multi snappy 2^{}", i); - c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size * 8, 0).unwrap())); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap())); }); } diff --git a/benchmarks/.gitignore b/benchmarks/.gitignore new file mode 100644 index 00000000000..344f079e498 --- /dev/null +++ b/benchmarks/.gitignore @@ -0,0 +1 @@ +runs diff --git a/benchmarks/bench_read.py b/benchmarks/bench_read.py new file mode 100644 index 00000000000..cab190fe2cd --- /dev/null +++ b/benchmarks/bench_read.py @@ -0,0 +1,49 @@ +import timeit +import io +import os +import json + +import pyarrow.parquet + + +def _bench_single(log2_size: int, column: str, use_dict: bool) -> float: + if use_dict: + path = f"fixtures/pyarrow3/v1/dict/benches_{2**log2_size}.parquet" + else: + path = f"fixtures/pyarrow3/v1/benches_{2**log2_size}.parquet" + with open(path, "rb") as f: + data = f.read() + data = io.BytesIO(data) + + def f(): + pyarrow.parquet.read_table(data, columns=[column]) + + seconds = timeit.Timer(f).timeit(number=512) / 512 + ns = seconds * 1000 * 1000 * 1000 + return ns + + +def _report(name: str, result: float): + path = f"benchmarks/runs/{name}/new" + os.makedirs(path, exist_ok=True) + with open(f"{path}/estimates.json", "w") as f: + json.dump({"mean": {"point_estimate": result}}, f) + + +def _bench(size, ty): + column, use_dict = { + "i64": ("int64", False), + "bool": ("bool", False), + "utf8": ("string", False), + "utf8 dict": ("string", True), + }[ty] + + result = _bench_single(size, column, use_dict) + print(result) + _report(f"read {ty} 2_{size}", result) + + +for size in range(10, 22, 2): + for ty in ["i64", "bool", "utf8", "utf8 dict"]: + print(size, ty) + _bench(size, ty) diff --git a/benchmarks/run.py b/benchmarks/run.py new file mode 100644 index 00000000000..a707f23f1bd --- /dev/null +++ b/benchmarks/run.py @@ -0,0 +1,20 @@ +import subprocess + + +# run pyarrow +subprocess.call(["python", "benchmarks/bench_read.py"]) + + +for ty in ["i64", "bool", "utf8", "utf8 dict"]: + args = [ + "cargo", + "bench", + "--features", + "io_parquet,io_parquet_compression", + "--bench", + "read_parquet", + "--", + f"{ty} 2", + ] + + subprocess.call(args) diff --git a/benchmarks/summarize.py b/benchmarks/summarize.py new file mode 100644 index 00000000000..a44c0ac182f --- /dev/null +++ b/benchmarks/summarize.py @@ -0,0 +1,51 @@ +import json +import os + + +def _read_reports(engine: str): + root = { + "arrow2": "target/criterion", + "pyarrow": "benchmarks/runs", + }[engine] + + result = [] + for item in os.listdir(root): + if item == "report": + continue + + with open(os.path.join(root, item, "new", "estimates.json")) as f: + data = json.load(f) + + ms = data["mean"]["point_estimate"] / 1000 + task = item.split()[0] + type = " ".join(item.split()[1:-1]) + size = int(item.split()[-1].split("_")[1]) + result.append( + { + "engine": engine, + "task": task, + "type": type, + "size": size, + "time": ms, + } + ) + return result + + +def _print_report(result): + for ty in ["i64", "bool", "utf8", "utf8 dict"]: + print(ty) + r = filter(lambda x: x["type"] == ty, result) + r = sorted(r, key=lambda x: x["size"]) + for row in r: + print(row["time"]) + + +def print_report(): + for engine in ["arrow2", "pyarrow"]: + print(engine) + result = _read_reports(engine) + _print_report(result) + + +print_report() diff --git a/parquet_integration/bench_read.py b/parquet_integration/bench_read.py deleted file mode 100644 index f1db81addee..00000000000 --- a/parquet_integration/bench_read.py +++ /dev/null @@ -1,26 +0,0 @@ -import timeit -import io - -import pyarrow.parquet - - -def bench(log2_size: int, datatype: str): - with open(f"fixtures/pyarrow3/v1/benches_{2**log2_size}.parquet", "rb") as f: - data = f.read() - data = io.BytesIO(data) - - def f(): - pyarrow.parquet.read_table(data, columns=[datatype]) - - seconds = timeit.Timer(f).timeit(number=512) / 512 - microseconds = seconds * 1000 * 1000 - print(f"read {datatype} 2^{log2_size} time: {microseconds:.2f} us") - -#for i in range(10, 22, 2): -# bench(i, "int64") - -for i in range(10, 22, 2): - bench(i, "string") - -for i in range(10, 22, 2): - bench(i, "bool") diff --git a/parquet_integration/write_parquet.py b/parquet_integration/write_parquet.py index 165d916939f..d97ff1edc9d 100644 --- a/parquet_integration/write_parquet.py +++ b/parquet_integration/write_parquet.py @@ -210,15 +210,14 @@ def write_pyarrow( def case_benches(size): assert size % 8 == 0 - size //= 8 - data, schema, path = case_basic_nullable(1) + data, schema, _ = case_basic_nullable(1) for k in data: - data[k] = data[k][:8] * size + data[k] = data[k][:8] * (size // 8) return data, schema, f"benches_{size}.parquet" # for read benchmarks -for i in range(3 + 10, 3 + 22, 2): +for i in range(10, 22, 2): # two pages (dict) write_pyarrow(case_benches, 2 ** i, 1, True, False, False) # single page