Skip to content

Commit

Permalink
Add basic test
Browse files Browse the repository at this point in the history
  • Loading branch information
tustvold committed Jan 28, 2022
1 parent c8069a5 commit 819913a
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 4 deletions.
2 changes: 1 addition & 1 deletion parquet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ brotli = "3.3"
flate2 = "1.0"
lz4 = "1.23"
serde_json = { version = "1.0", features = ["preserve_order"] }
arrow = { path = "../arrow", version = "8.0.0", default-features = false, features = ["test_utils"] }
arrow = { path = "../arrow", version = "8.0.0", default-features = false, features = ["test_utils", "prettyprint"] }

[features]
default = ["arrow", "snap", "brotli", "flate2", "lz4", "zstd", "base64"]
Expand Down
39 changes: 36 additions & 3 deletions parquet/src/arrow/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,24 +427,57 @@ impl PageIterator for ColumnChunkIterator {

#[cfg(test)]
mod tests {
use arrow::util::pretty::pretty_format_batches;
use futures::TryStreamExt;
use tokio::fs::File;

use super::*;

fn assert_batches_eq(batches: &[RecordBatch], expected_lines: &[&str]) {
let formatted = pretty_format_batches(batches).unwrap().to_string();
let actual_lines: Vec<_> = formatted.trim().lines().collect();
assert_eq!(
&actual_lines, expected_lines,
"\n\nexpected:\n\n{:#?}\nactual:\n\n{:#?}\n\n",
expected_lines, actual_lines
);
}

#[tokio::test]
async fn test_parquet_stream() {
let testdata = arrow::util::test_util::parquet_test_data();
let path = format!("{}/nested_structs.rust.parquet", testdata);
let path = format!("{}/alltypes_plain.parquet", testdata);
let file = File::open(path).await.unwrap();

let stream = ParquetRecordBatchStreamBuilder::new(file)
let builder = ParquetRecordBatchStreamBuilder::new(file)
.await
.unwrap()
.with_projection(vec![1, 2, 6])
.with_batch_size(3);

let stream = builder
.build()
.unwrap();

let results = stream.try_collect::<Vec<_>>().await.unwrap();
println!("{:?}", results);
assert_eq!(results.len(), 3);

assert_batches_eq(
&results,
&[
"+----------+-------------+-----------+",
"| bool_col | tinyint_col | float_col |",
"+----------+-------------+-----------+",
"| true | 0 | 0 |",
"| false | 1 | 1.1 |",
"| true | 0 | 0 |",
"| false | 1 | 1.1 |",
"| true | 0 | 0 |",
"| false | 1 | 1.1 |",
"| true | 0 | 0 |",
"| false | 1 | 1.1 |",
"+----------+-------------+-----------+",
],
);
}
}

0 comments on commit 819913a

Please sign in to comment.