diff --git a/examples/s3/Cargo.toml b/examples/s3/Cargo.toml new file mode 100644 index 00000000000..a238257f904 --- /dev/null +++ b/examples/s3/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "s3" +version = "0.1.0" +edition = "2018" + +[dependencies] +arrow2 = { path = "../../", default-features = false, features = ["io_parquet", "io_parquet_compression"] } +rust-s3 = { version = "0.27.0-rc4", features = ["tokio"] } +futures = "0.3" +tokio = { version = "1.0.0", features = ["macros", "rt-multi-thread"] } diff --git a/examples/s3/src/main.rs b/examples/s3/src/main.rs new file mode 100644 index 00000000000..db98240f408 --- /dev/null +++ b/examples/s3/src/main.rs @@ -0,0 +1,67 @@ +use arrow2::array::{Array, Int64Array}; +use arrow2::datatypes::DataType; +use arrow2::error::Result; +use arrow2::io::parquet::read::{ + decompress, get_page_stream, page_stream_to_array, read_metadata_async, +}; +use futures::{future::BoxFuture, StreamExt}; +use s3::Bucket; + +mod stream; +use stream::{RangedStreamer, SeekOutput}; + +#[tokio::main] +async fn main() -> Result<()> { + let bucket_name = "dev-jorgecardleitao"; + let region = "eu-central-1".parse().unwrap(); + let bucket = Bucket::new_public(bucket_name, region).unwrap(); + let path = "benches_65536.parquet".to_string(); + + let (data, _) = bucket.head_object(&path).await.unwrap(); + let length = data.content_length.unwrap() as usize; + println!("total size in bytes: {}", length); + + let range_get = Box::new(move |start: u64, length: usize| { + let bucket = bucket.clone(); + let path = path.clone(); + Box::pin(async move { + let bucket = bucket.clone(); + let path = path.clone(); + // to get a sense of what is being queried in s3 + println!("getting {} bytes starting at {}", length, start); + let (mut data, _) = bucket + // -1 because ranges are inclusive in `get_object_range` + .get_object_range(&path, start, Some(start + length as u64 - 1)) + .await + .map_err(|x| std::io::Error::new(std::io::ErrorKind::Other, x.to_string()))?; + println!("got {}/{} bytes starting at {}", data.len(), length, start); + data.truncate(length); + Ok(SeekOutput { start, data }) + }) as BoxFuture<'static, std::io::Result> + }); + + // at least 4kb per s3 request. Adjust as you like. + let mut reader = RangedStreamer::new(length, 4 * 1024, range_get); + + let metadata = read_metadata_async(&mut reader).await?; + + // metadata + println!("{}", metadata.num_rows); + + // pages of the first row group and first column + // This is IO bounded and SHOULD be done in a shared thread pool (e.g. Tokio) + let pages = get_page_stream(&metadata, 0, 0, &mut reader, vec![]).await?; + + // decompress the pages. This is CPU bounded and SHOULD be done in a dedicated thread pool (e.g. Rayon) + let pages = pages.map(|compressed_page| decompress(compressed_page?, &mut vec![])); + + // deserialize the pages. This is CPU bounded and SHOULD be done in a dedicated thread pool (e.g. Rayon) + let array = + page_stream_to_array(pages, &metadata.row_groups[0].columns()[0], DataType::Int64).await?; + + let array = array.as_any().downcast_ref::().unwrap(); + // ... and have fun with it. + println!("len: {}", array.len()); + println!("null_count: {}", array.null_count()); + Ok(()) +} diff --git a/examples/s3/src/stream.rs b/examples/s3/src/stream.rs new file mode 100644 index 00000000000..d09d26e7dd1 --- /dev/null +++ b/examples/s3/src/stream.rs @@ -0,0 +1,113 @@ +// Special thanks to Alice for the help: https://users.rust-lang.org/t/63019/6 +use std::io::{Result, SeekFrom}; +use std::pin::Pin; + +use futures::{ + future::BoxFuture, + io::{AsyncRead, AsyncSeek}, + Future, +}; + +pub struct RangedStreamer { + pos: u64, + length: u64, // total size + state: State, + range_get: F, + min_request_size: usize, // requests have at least this size +} + +enum State { + HasChunk(SeekOutput), + Seeking(BoxFuture<'static, std::io::Result>), +} + +pub struct SeekOutput { + pub start: u64, + pub data: Vec, +} + +pub type F = Box< + dyn Fn(u64, usize) -> BoxFuture<'static, std::io::Result> + Send + Sync, +>; + +impl RangedStreamer { + pub fn new(length: usize, min_request_size: usize, range_get: F) -> Self { + let length = length as u64; + Self { + pos: 0, + length, + state: State::HasChunk(SeekOutput { + start: 0, + data: vec![], + }), + range_get, + min_request_size, + } + } +} + +// whether `test_interval` is inside `a` (start, length). +fn range_includes(a: (usize, usize), test_interval: (usize, usize)) -> bool { + if test_interval.0 < a.0 { + return false; + } + let test_end = test_interval.0 + test_interval.1; + let a_end = a.0 + a.1; + if test_end > a_end { + return false; + } + true +} + +impl AsyncRead for RangedStreamer { + fn poll_read( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + buf: &mut [u8], + ) -> std::task::Poll> { + let requested_range = (self.pos as usize, buf.len()); + let min_request_size = self.min_request_size; + match &mut self.state { + State::HasChunk(output) => { + let existing_range = (output.start as usize, output.data.len()); + if range_includes(existing_range, requested_range) { + let offset = requested_range.0 - existing_range.0; + buf.copy_from_slice(&output.data[offset..offset + buf.len()]); + self.pos += buf.len() as u64; + std::task::Poll::Ready(Ok(buf.len())) + } else { + let start = requested_range.0 as u64; + let length = std::cmp::max(min_request_size, requested_range.1); + let future = (self.range_get)(start, length); + self.state = State::Seeking(Box::pin(future)); + self.poll_read(cx, buf) + } + } + State::Seeking(ref mut future) => match Pin::new(future).poll(cx) { + std::task::Poll::Ready(v) => { + match v { + Ok(output) => self.state = State::HasChunk(output), + Err(e) => return std::task::Poll::Ready(Err(e)), + }; + self.poll_read(cx, buf) + } + std::task::Poll::Pending => std::task::Poll::Pending, + }, + } + } +} + +impl AsyncSeek for RangedStreamer { + fn poll_seek( + mut self: std::pin::Pin<&mut Self>, + _: &mut std::task::Context<'_>, + pos: std::io::SeekFrom, + ) -> std::task::Poll> { + match pos { + SeekFrom::Start(pos) => self.pos = pos, + SeekFrom::End(pos) => self.pos = (self.length as i64 + pos) as u64, + SeekFrom::Current(pos) => self.pos = (self.pos as i64 + pos) as u64, + }; + std::task::Poll::Ready(Ok(self.pos)) + } +} diff --git a/src/io/parquet/read/binary/basic.rs b/src/io/parquet/read/binary/basic.rs index 8ceca3ebbb3..a418c00ac59 100644 --- a/src/io/parquet/read/binary/basic.rs +++ b/src/io/parquet/read/binary/basic.rs @@ -1,3 +1,4 @@ +use futures::{pin_mut, Stream, StreamExt}; use parquet2::{ encoding::{bitpacking, delta_length_byte_array, hybrid_rle, uleb128, Encoding}, metadata::{ColumnChunkMetaData, ColumnDescriptor}, @@ -353,3 +354,47 @@ where _ => unreachable!(), }) } + +pub async fn stream_to_array( + pages: I, + metadata: &ColumnChunkMetaData, + data_type: &DataType, +) -> Result> +where + ArrowError: From, + O: Offset, + E: Clone, + I: Stream>, +{ + let capacity = metadata.num_values() as usize; + let mut values = MutableBuffer::::with_capacity(0); + let mut offsets = MutableBuffer::::with_capacity(1 + capacity); + offsets.push(O::default()); + let mut validity = MutableBitmap::with_capacity(capacity); + + pin_mut!(pages); // needed for iteration + + while let Some(page) = pages.next().await { + extend_from_page( + page.as_ref().map_err(|x| x.clone())?, + metadata.descriptor(), + &mut offsets, + &mut values, + &mut validity, + )? + } + + Ok(match data_type { + DataType::LargeBinary | DataType::Binary => Box::new(BinaryArray::from_data( + offsets.into(), + values.into(), + validity.into(), + )), + DataType::LargeUtf8 | DataType::Utf8 => Box::new(Utf8Array::from_data( + offsets.into(), + values.into(), + validity.into(), + )), + _ => unreachable!(), + }) +} diff --git a/src/io/parquet/read/binary/mod.rs b/src/io/parquet/read/binary/mod.rs index 13b47154377..912f1bb026b 100644 --- a/src/io/parquet/read/binary/mod.rs +++ b/src/io/parquet/read/binary/mod.rs @@ -2,4 +2,5 @@ mod basic; mod nested; pub use basic::iter_to_array; +pub use basic::stream_to_array; pub use nested::iter_to_array as iter_to_array_nested; diff --git a/src/io/parquet/read/boolean/basic.rs b/src/io/parquet/read/boolean/basic.rs index 8e3e155d797..221da32be06 100644 --- a/src/io/parquet/read/boolean/basic.rs +++ b/src/io/parquet/read/boolean/basic.rs @@ -5,6 +5,8 @@ use crate::{ }; use super::super::utils; + +use futures::{pin_mut, Stream, StreamExt}; use parquet2::{ encoding::{hybrid_rle, Encoding}, metadata::{ColumnChunkMetaData, ColumnDescriptor}, @@ -86,6 +88,30 @@ where Ok(BooleanArray::from_data(values.into(), validity.into())) } +pub async fn stream_to_array(pages: I, metadata: &ColumnChunkMetaData) -> Result +where + ArrowError: From, + E: Clone, + I: Stream>, +{ + let capacity = metadata.num_values() as usize; + let mut values = MutableBitmap::with_capacity(capacity); + let mut validity = MutableBitmap::with_capacity(capacity); + + pin_mut!(pages); // needed for iteration + + while let Some(page) = pages.next().await { + extend_from_page( + page.as_ref().map_err(|x| x.clone())?, + metadata.descriptor(), + &mut values, + &mut validity, + )? + } + + Ok(BooleanArray::from_data(values.into(), validity.into())) +} + fn extend_from_page( page: &DataPage, descriptor: &ColumnDescriptor, diff --git a/src/io/parquet/read/boolean/mod.rs b/src/io/parquet/read/boolean/mod.rs index 13b47154377..912f1bb026b 100644 --- a/src/io/parquet/read/boolean/mod.rs +++ b/src/io/parquet/read/boolean/mod.rs @@ -2,4 +2,5 @@ mod basic; mod nested; pub use basic::iter_to_array; +pub use basic::stream_to_array; pub use nested::iter_to_array as iter_to_array_nested; diff --git a/src/io/parquet/read/fixed_size_binary.rs b/src/io/parquet/read/fixed_size_binary.rs index e92dd0619be..6ea5e78250e 100644 --- a/src/io/parquet/read/fixed_size_binary.rs +++ b/src/io/parquet/read/fixed_size_binary.rs @@ -1,3 +1,4 @@ +use futures::{pin_mut, Stream, StreamExt}; use parquet2::{ encoding::{bitpacking, hybrid_rle, uleb128, Encoding}, page::{DataPage, DataPageHeader, DataPageHeaderExt, FixedLenByteArrayPageDict}, @@ -159,6 +160,39 @@ where )) } +pub async fn stream_to_array( + pages: I, + size: i32, + metadata: &ColumnChunkMetaData, +) -> Result +where + ArrowError: From, + E: Clone, + I: Stream>, +{ + let capacity = metadata.num_values() as usize; + let mut values = MutableBuffer::::with_capacity(capacity * size as usize); + let mut validity = MutableBitmap::with_capacity(capacity); + + pin_mut!(pages); // needed for iteration + + while let Some(page) = pages.next().await { + extend_from_page( + page.as_ref().map_err(|x| x.clone())?, + size, + metadata.descriptor(), + &mut values, + &mut validity, + )? + } + + Ok(FixedSizeBinaryArray::from_data( + DataType::FixedSizeBinary(size), + values.into(), + validity.into(), + )) +} + pub(crate) fn extend_from_page( page: &DataPage, size: i32, diff --git a/src/io/parquet/read/mod.rs b/src/io/parquet/read/mod.rs index 6c53a92211b..8d86ab05d03 100644 --- a/src/io/parquet/read/mod.rs +++ b/src/io/parquet/read/mod.rs @@ -1,30 +1,16 @@ -use std::io::{Read, Seek}; - -mod binary; -mod boolean; -mod fixed_size_binary; -mod nested_utils; -mod primitive; -mod record_batch; -pub mod schema; -pub mod statistics; -mod utils; - -use crate::{ - array::Array, - datatypes::{DataType, IntervalUnit, TimeUnit}, - error::{ArrowError, Result}, +use std::{ + io::{Read, Seek}, + sync::Arc, }; -pub use record_batch::RecordReader; -pub use schema::{get_schema, is_type_nullable, FileMetaData}; - +use futures::{AsyncRead, AsyncSeek, Stream}; pub use parquet2::{ error::ParquetError, metadata::{ColumnChunkMetaData, ColumnDescriptor, RowGroupMetaData}, page::{CompressedDataPage, DataPage, DataPageHeader}, read::{ - decompress, get_page_iterator as _get_page_iterator, read_metadata as _read_metadata, + decompress, get_page_iterator as _get_page_iterator, get_page_stream as _get_page_stream, + read_metadata as _read_metadata, read_metadata_async as _read_metadata_async, streaming_iterator, Decompressor, PageFilter, PageIterator, StreamingIterator, }, schema::types::{ @@ -34,6 +20,25 @@ pub use parquet2::{ types::int96_to_i64_ns, }; +use crate::{ + array::Array, + datatypes::{DataType, IntervalUnit, TimeUnit}, + error::{ArrowError, Result}, +}; + +mod binary; +mod boolean; +mod fixed_size_binary; +mod nested_utils; +mod primitive; +mod record_batch; +pub mod schema; +pub mod statistics; +mod utils; + +pub use record_batch::RecordReader; +pub use schema::{get_schema, is_type_nullable, FileMetaData}; + /// Creates a new iterator of compressed pages. pub fn get_page_iterator<'b, RR: Read + Seek>( metadata: &FileMetaData, @@ -53,11 +58,31 @@ pub fn get_page_iterator<'b, RR: Read + Seek>( )?) } -/// Reads parquets' metadata. +/// Creates a new iterator of compressed pages. +pub async fn get_page_stream<'a, RR: AsyncRead + Unpin + Send + AsyncSeek>( + metadata: &'a FileMetaData, + row_group: usize, + column: usize, + reader: &'a mut RR, + pages_filter: Option, + buffer: Vec, +) -> Result> + 'a> { + let pages_filter = pages_filter.unwrap_or_else(|| Arc::new(|_, _| true)); + Ok(_get_page_stream(metadata, row_group, column, reader, buffer, pages_filter).await?) +} + +/// Reads parquets' metadata syncronously. pub fn read_metadata(reader: &mut R) -> Result { Ok(_read_metadata(reader)?) } +/// Reads parquets' metadata asynchronously. +pub async fn read_metadata_async( + reader: &mut R, +) -> Result { + Ok(_read_metadata_async(reader).await?) +} + pub fn page_iter_to_array< I: StreamingIterator>, >( @@ -169,6 +194,65 @@ pub fn page_iter_to_array< } } +// Converts an async stream of compressed data pages into an [`Array`]. +pub async fn page_stream_to_array>>( + pages: I, + metadata: &ColumnChunkMetaData, + data_type: DataType, +) -> Result> { + use DataType::*; + match data_type { + // INT32 + UInt8 => primitive::stream_to_array(pages, metadata, data_type, |x: i32| x as u8).await, + UInt16 => primitive::stream_to_array(pages, metadata, data_type, |x: i32| x as u16).await, + UInt32 => primitive::stream_to_array(pages, metadata, data_type, |x: i32| x as u32).await, + Int8 => primitive::stream_to_array(pages, metadata, data_type, |x: i32| x as i8).await, + Int16 => primitive::stream_to_array(pages, metadata, data_type, |x: i32| x as i16).await, + Int32 | Date32 | Time32(_) | Interval(IntervalUnit::YearMonth) => { + primitive::stream_to_array(pages, metadata, data_type, |x: i32| x as i32).await + } + + Timestamp(TimeUnit::Nanosecond, None) => match metadata.descriptor().type_() { + ParquetType::PrimitiveType { physical_type, .. } => match physical_type { + PhysicalType::Int96 => { + primitive::stream_to_array( + pages, + metadata, + DataType::Timestamp(TimeUnit::Nanosecond, None), + int96_to_i64_ns, + ) + .await + } + _ => primitive::stream_to_array(pages, metadata, data_type, |x: i64| x).await, + }, + _ => unreachable!(), + }, + + // INT64 + Int64 | Date64 | Time64(_) | Duration(_) | Timestamp(_, _) => { + primitive::stream_to_array(pages, metadata, data_type, |x: i64| x).await + } + UInt64 => primitive::stream_to_array(pages, metadata, data_type, |x: i64| x as u64).await, + + Float32 => primitive::stream_to_array(pages, metadata, data_type, |x: f32| x).await, + Float64 => primitive::stream_to_array(pages, metadata, data_type, |x: f64| x).await, + + Boolean => Ok(Box::new(boolean::stream_to_array(pages, metadata).await?)), + + Binary | Utf8 => binary::stream_to_array::(pages, metadata, &data_type).await, + LargeBinary | LargeUtf8 => { + binary::stream_to_array::(pages, metadata, &data_type).await + } + FixedSizeBinary(size) => Ok(Box::new( + fixed_size_binary::stream_to_array(pages, size, metadata).await?, + )), + other => Err(ArrowError::NotYetImplemented(format!( + "Async conversion of {:?}", + other + ))), + } +} + #[cfg(test)] mod tests { use std::fs::File; diff --git a/src/io/parquet/read/primitive/mod.rs b/src/io/parquet/read/primitive/mod.rs index de4332b8074..3ea354e656f 100644 --- a/src/io/parquet/read/primitive/mod.rs +++ b/src/io/parquet/read/primitive/mod.rs @@ -5,6 +5,7 @@ mod utils; use std::sync::Arc; +use futures::{pin_mut, Stream, StreamExt}; use parquet2::{page::DataPage, read::StreamingIterator, types::NativeType}; use super::nested_utils::*; @@ -59,6 +60,48 @@ where ))) } +pub async fn stream_to_array( + pages: I, + metadata: &ColumnChunkMetaData, + data_type: DataType, + op: F, +) -> Result> +where + ArrowError: From, + T: NativeType, + E: Clone, + A: ArrowNativeType, + F: Copy + Fn(T) -> A, + I: Stream>, +{ + let capacity = metadata.num_values() as usize; + let mut values = MutableBuffer::::with_capacity(capacity); + let mut validity = MutableBitmap::with_capacity(capacity); + + pin_mut!(pages); // needed for iteration + + while let Some(page) = pages.next().await { + basic::extend_from_page( + page.as_ref().map_err(|x| x.clone())?, + metadata.descriptor(), + &mut values, + &mut validity, + op, + )? + } + + let data_type = match data_type { + DataType::Dictionary(_, values) => values.as_ref().clone(), + _ => data_type, + }; + + Ok(Box::new(PrimitiveArray::from_data( + data_type, + values.into(), + validity.into(), + ))) +} + pub fn iter_to_array_nested( mut iter: I, metadata: &ColumnChunkMetaData, diff --git a/src/io/parquet/write/utf8/nested.rs b/src/io/parquet/write/utf8/nested.rs index 65e7d3d13bf..d9b7dafbd0e 100644 --- a/src/io/parquet/write/utf8/nested.rs +++ b/src/io/parquet/write/utf8/nested.rs @@ -1,5 +1,6 @@ -use parquet2::encoding::Encoding; -use parquet2::{metadata::ColumnDescriptor, page::CompressedDataPage, write::WriteOptions}; +use parquet2::{ + encoding::Encoding, metadata::ColumnDescriptor, page::CompressedDataPage, write::WriteOptions, +}; use super::super::{levels, utils}; use super::basic::{build_statistics, encode_plain};