Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support to read parquet asynchronously #260

Merged
merged 4 commits into from
Aug 11, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions examples/s3/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
[package]
name = "s3"
version = "0.1.0"
edition = "2018"

[dependencies]
arrow2 = { path = "../../", default-features = false, features = ["io_parquet", "io_parquet_compression"] }
rust-s3 = { version = "0.27.0-rc4", features = ["tokio"] }
futures = "0.3"
tokio = { version = "1.0.0", features = ["macros", "rt-multi-thread"] }
67 changes: 67 additions & 0 deletions examples/s3/src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
use arrow2::array::{Array, Int64Array};
use arrow2::datatypes::DataType;
use arrow2::error::Result;
use arrow2::io::parquet::read::{
decompress, get_page_stream, page_stream_to_array, read_metadata_async,
};
use futures::{future::BoxFuture, StreamExt};
use s3::Bucket;

mod stream;
use stream::{RangedStreamer, SeekOutput};

#[tokio::main]
async fn main() -> Result<()> {
let bucket_name = "dev-jorgecardleitao";
let region = "eu-central-1".parse().unwrap();
let bucket = Bucket::new_public(bucket_name, region).unwrap();
let path = "benches_65536.parquet".to_string();

let (data, _) = bucket.head_object(&path).await.unwrap();
let length = data.content_length.unwrap() as usize;
println!("total size in bytes: {}", length);

let range_get = Box::new(move |start: u64, length: usize| {
let bucket = bucket.clone();
let path = path.clone();
Box::pin(async move {
let bucket = bucket.clone();
let path = path.clone();
// to get a sense of what is being queried in s3
println!("getting {} bytes starting at {}", length, start);
let (mut data, _) = bucket
// -1 because ranges are inclusive in `get_object_range`
.get_object_range(&path, start, Some(start + length as u64 - 1))
.await
.map_err(|x| std::io::Error::new(std::io::ErrorKind::Other, x.to_string()))?;
println!("got {}/{} bytes starting at {}", data.len(), length, start);
data.truncate(length);
Ok(SeekOutput { start, data })
}) as BoxFuture<'static, std::io::Result<SeekOutput>>
});

// at least 4kb per s3 request. Adjust as you like.
let mut reader = RangedStreamer::new(length, 4 * 1024, range_get);

let metadata = read_metadata_async(&mut reader).await?;

// metadata
println!("{}", metadata.num_rows);

// pages of the first row group and first column
// This is IO bounded and SHOULD be done in a shared thread pool (e.g. Tokio)
let pages = get_page_stream(&metadata, 0, 0, &mut reader, vec![]).await?;

// decompress the pages. This is CPU bounded and SHOULD be done in a dedicated thread pool (e.g. Rayon)
let pages = pages.map(|compressed_page| decompress(compressed_page?, &mut vec![]));

// deserialize the pages. This is CPU bounded and SHOULD be done in a dedicated thread pool (e.g. Rayon)
let array =
page_stream_to_array(pages, &metadata.row_groups[0].columns()[0], DataType::Int64).await?;

let array = array.as_any().downcast_ref::<Int64Array>().unwrap();
// ... and have fun with it.
println!("len: {}", array.len());
println!("null_count: {}", array.null_count());
Ok(())
}
113 changes: 113 additions & 0 deletions examples/s3/src/stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
// Special thanks to Alice for the help: https://users.rust-lang.org/t/63019/6
use std::io::{Result, SeekFrom};
use std::pin::Pin;

use futures::{
future::BoxFuture,
io::{AsyncRead, AsyncSeek},
Future,
};

pub struct RangedStreamer {
pos: u64,
length: u64, // total size
state: State,
range_get: F,
min_request_size: usize, // requests have at least this size
}

enum State {
HasChunk(SeekOutput),
Seeking(BoxFuture<'static, std::io::Result<SeekOutput>>),
}

pub struct SeekOutput {
pub start: u64,
pub data: Vec<u8>,
}

pub type F = Box<
dyn Fn(u64, usize) -> BoxFuture<'static, std::io::Result<SeekOutput>> + Send + Sync,
>;

impl RangedStreamer {
pub fn new(length: usize, min_request_size: usize, range_get: F) -> Self {
let length = length as u64;
Self {
pos: 0,
length,
state: State::HasChunk(SeekOutput {
start: 0,
data: vec![],
}),
range_get,
min_request_size,
}
}
}

// whether `test_interval` is inside `a` (start, length).
fn range_includes(a: (usize, usize), test_interval: (usize, usize)) -> bool {
if test_interval.0 < a.0 {
return false;
}
let test_end = test_interval.0 + test_interval.1;
let a_end = a.0 + a.1;
if test_end > a_end {
return false;
}
true
}

impl AsyncRead for RangedStreamer {
fn poll_read(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
buf: &mut [u8],
) -> std::task::Poll<Result<usize>> {
let requested_range = (self.pos as usize, buf.len());
let min_request_size = self.min_request_size;
match &mut self.state {
State::HasChunk(output) => {
let existing_range = (output.start as usize, output.data.len());
if range_includes(existing_range, requested_range) {
let offset = requested_range.0 - existing_range.0;
buf.copy_from_slice(&output.data[offset..offset + buf.len()]);
self.pos += buf.len() as u64;
std::task::Poll::Ready(Ok(buf.len()))
} else {
let start = requested_range.0 as u64;
let length = std::cmp::max(min_request_size, requested_range.1);
let future = (self.range_get)(start, length);
self.state = State::Seeking(Box::pin(future));
self.poll_read(cx, buf)
}
}
State::Seeking(ref mut future) => match Pin::new(future).poll(cx) {
std::task::Poll::Ready(v) => {
match v {
Ok(output) => self.state = State::HasChunk(output),
Err(e) => return std::task::Poll::Ready(Err(e)),
};
self.poll_read(cx, buf)
}
std::task::Poll::Pending => std::task::Poll::Pending,
},
}
}
}

impl AsyncSeek for RangedStreamer {
fn poll_seek(
mut self: std::pin::Pin<&mut Self>,
_: &mut std::task::Context<'_>,
pos: std::io::SeekFrom,
) -> std::task::Poll<Result<u64>> {
match pos {
SeekFrom::Start(pos) => self.pos = pos,
SeekFrom::End(pos) => self.pos = (self.length as i64 + pos) as u64,
SeekFrom::Current(pos) => self.pos = (self.pos as i64 + pos) as u64,
};
std::task::Poll::Ready(Ok(self.pos))
}
}
45 changes: 45 additions & 0 deletions src/io/parquet/read/binary/basic.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use futures::{pin_mut, Stream, StreamExt};
use parquet2::{
encoding::{bitpacking, delta_length_byte_array, hybrid_rle, uleb128, Encoding},
metadata::{ColumnChunkMetaData, ColumnDescriptor},
Expand Down Expand Up @@ -353,3 +354,47 @@ where
_ => unreachable!(),
})
}

pub async fn stream_to_array<O, I, E>(
pages: I,
metadata: &ColumnChunkMetaData,
data_type: &DataType,
) -> Result<Box<dyn Array>>
where
ArrowError: From<E>,
O: Offset,
E: Clone,
I: Stream<Item = std::result::Result<DataPage, E>>,
{
let capacity = metadata.num_values() as usize;
let mut values = MutableBuffer::<u8>::with_capacity(0);
let mut offsets = MutableBuffer::<O>::with_capacity(1 + capacity);
offsets.push(O::default());
let mut validity = MutableBitmap::with_capacity(capacity);

pin_mut!(pages); // needed for iteration

while let Some(page) = pages.next().await {
extend_from_page(
page.as_ref().map_err(|x| x.clone())?,
metadata.descriptor(),
&mut offsets,
&mut values,
&mut validity,
)?
}

Ok(match data_type {
DataType::LargeBinary | DataType::Binary => Box::new(BinaryArray::from_data(
offsets.into(),
values.into(),
validity.into(),
)),
DataType::LargeUtf8 | DataType::Utf8 => Box::new(Utf8Array::from_data(
offsets.into(),
values.into(),
validity.into(),
)),
_ => unreachable!(),
})
}
1 change: 1 addition & 0 deletions src/io/parquet/read/binary/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ mod basic;
mod nested;

pub use basic::iter_to_array;
pub use basic::stream_to_array;
pub use nested::iter_to_array as iter_to_array_nested;
26 changes: 26 additions & 0 deletions src/io/parquet/read/boolean/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ use crate::{
};

use super::super::utils;

use futures::{pin_mut, Stream, StreamExt};
use parquet2::{
encoding::{hybrid_rle, Encoding},
metadata::{ColumnChunkMetaData, ColumnDescriptor},
Expand Down Expand Up @@ -86,6 +88,30 @@ where
Ok(BooleanArray::from_data(values.into(), validity.into()))
}

pub async fn stream_to_array<I, E>(pages: I, metadata: &ColumnChunkMetaData) -> Result<BooleanArray>
where
ArrowError: From<E>,
E: Clone,
I: Stream<Item = std::result::Result<DataPage, E>>,
{
let capacity = metadata.num_values() as usize;
let mut values = MutableBitmap::with_capacity(capacity);
let mut validity = MutableBitmap::with_capacity(capacity);

pin_mut!(pages); // needed for iteration

while let Some(page) = pages.next().await {
extend_from_page(
page.as_ref().map_err(|x| x.clone())?,
metadata.descriptor(),
&mut values,
&mut validity,
)?
}

Ok(BooleanArray::from_data(values.into(), validity.into()))
}

fn extend_from_page(
page: &DataPage,
descriptor: &ColumnDescriptor,
Expand Down
1 change: 1 addition & 0 deletions src/io/parquet/read/boolean/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ mod basic;
mod nested;

pub use basic::iter_to_array;
pub use basic::stream_to_array;
pub use nested::iter_to_array as iter_to_array_nested;
34 changes: 34 additions & 0 deletions src/io/parquet/read/fixed_size_binary.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use futures::{pin_mut, Stream, StreamExt};
use parquet2::{
encoding::{bitpacking, hybrid_rle, uleb128, Encoding},
page::{DataPage, DataPageHeader, DataPageHeaderExt, FixedLenByteArrayPageDict},
Expand Down Expand Up @@ -159,6 +160,39 @@ where
))
}

pub async fn stream_to_array<I, E>(
pages: I,
size: i32,
metadata: &ColumnChunkMetaData,
) -> Result<FixedSizeBinaryArray>
where
ArrowError: From<E>,
E: Clone,
I: Stream<Item = std::result::Result<DataPage, E>>,
{
let capacity = metadata.num_values() as usize;
let mut values = MutableBuffer::<u8>::with_capacity(capacity * size as usize);
let mut validity = MutableBitmap::with_capacity(capacity);

pin_mut!(pages); // needed for iteration

while let Some(page) = pages.next().await {
extend_from_page(
page.as_ref().map_err(|x| x.clone())?,
size,
metadata.descriptor(),
&mut values,
&mut validity,
)?
}

Ok(FixedSizeBinaryArray::from_data(
DataType::FixedSizeBinary(size),
values.into(),
validity.into(),
))
}

pub(crate) fn extend_from_page(
page: &DataPage,
size: i32,
Expand Down
Loading