Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(rust,python): Enable object store in scan_parquet python #6426

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions examples/read_parquet_cloud/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ fn main() -> PolarsResult<()> {
.select([
// select all columns
all(),
// and do some aggregations
cols(["fats_g", "sugars_g"]).sum().suffix("_summed"),
])
.collect()?;

Expand Down
2 changes: 1 addition & 1 deletion polars/polars-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ indexmap = { version = "1", features = ["std"] }
itoap = { version = "1", optional = true }
ndarray = { version = "0.15", optional = true, default_features = false }
num-traits.workspace = true
object_store = { version = "0.5.3", default-features = false, optional = true }
object_store = { version = "0.5.5", default-features = false, optional = true }
once_cell.workspace = true
polars-arrow = { version = "0.27.2", path = "../polars-arrow", features = ["compute"] }
polars-error = { version = "0.27.2", path = "../polars-error" }
Expand Down
4 changes: 2 additions & 2 deletions polars/polars-io/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ lexical-core = { version = "0.8", optional = true }
memchr.workspace = true
memmap = { package = "memmap2", version = "0.5.2", optional = true }
num-traits.workspace = true
object_store = { version = "0.5.3", default-features = false, optional = true }
object_store = { version = "0.5.5", default-features = false, optional = true }
once_cell = "1"
polars-arrow = { version = "0.27.2", path = "../polars-arrow" }
polars-core = { version = "0.27.2", path = "../polars-core", features = ["private"], default-features = false }
Expand All @@ -76,7 +76,7 @@ serde = { version = "1", features = ["derive"], optional = true }
serde_json = { version = "1", optional = true, default-features = false, features = ["alloc", "raw_value"] }
simd-json = { version = "0.7.0", optional = true, features = ["allow-non-simd", "known-key"] }
simdutf8 = "0.1"
tokio = { version = "1.22.0", features = ["net"], optional = true }
tokio = { version = "1.24.0", features = ["net", "rt-multi-thread"], optional = true }
url = { version = "2.3.1", optional = true }

[dev-dependencies]
Expand Down
11 changes: 10 additions & 1 deletion polars/polars-io/src/cloud/glob.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,6 @@ impl Matcher {
}
}

#[tokio::main(flavor = "current_thread")]
/// List files with a prefix derived from the pattern.
pub async fn glob(url: &str, cloud_options: Option<&CloudOptions>) -> PolarsResult<Vec<String>> {
// Find the fixed prefix, up to the first '*'.
Expand All @@ -164,6 +163,9 @@ pub async fn glob(url: &str, cloud_options: Option<&CloudOptions>) -> PolarsResu
store,
) = super::build(url, cloud_options)?;
let matcher = Matcher::new(prefix.clone(), expansion.as_deref())?;
if expansion.is_none() {
return Ok(vec![url.into()]);
}

let list_stream = store
.list(Some(&Path::from(prefix)))
Expand Down Expand Up @@ -260,6 +262,13 @@ mod test {
assert!(!a.is_matching(&Path::from("folder/1parquet")));
// Intermediary folders are not allowed.
assert!(!a.is_matching(&Path::from("folder/other/1.parquet")));

// Match full name.
let cloud_location = CloudLocation::new("s3://bucket/folder/some.parquet").unwrap();
let a = Matcher::new(cloud_location.prefix, cloud_location.expansion.as_deref()).unwrap();
// Regular match.
assert!(a.is_matching(&Path::from("folder/some.parquet")));
assert!(!a.is_matching(&Path::from("folder/other.parquet")));
}

#[test]
Expand Down
7 changes: 4 additions & 3 deletions polars/polars-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,14 @@ pub mod partition;

use std::io::{Read, Seek, Write};
use std::path::{Path, PathBuf};
use std::str::FromStr;

#[allow(unused)] // remove when updating to rust nightly >= 1.61
use arrow::array::new_empty_array;
use arrow::error::Result as ArrowResult;
pub use options::*;
use polars_core::config::verbose;
use polars_core::cloud::CloudType;
use polars_core::frame::ArrowChunk;
use polars_core::prelude::*;

Expand Down Expand Up @@ -173,7 +175,6 @@ pub(crate) fn finish_reader<R: ArrowReader>(

/// Check if the path is a cloud url.
pub fn is_cloud_url<P: AsRef<Path>>(p: P) -> bool {
p.as_ref().starts_with("s3://")
|| p.as_ref().starts_with("file://")
|| p.as_ref().starts_with("gcs://")
let path = p.as_ref();
return CloudType::from_str(&path.to_string_lossy()).is_ok();
winding-lines marked this conversation as resolved.
Show resolved Hide resolved
}
31 changes: 20 additions & 11 deletions polars/polars-io/src/parquet/async_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use arrow::io::parquet::read::{
use arrow::io::parquet::write::FileMetaData;
use futures::future::BoxFuture;
use futures::lock::Mutex;
use futures::{stream, StreamExt, TryFutureExt, TryStreamExt};
use futures::{stream, StreamExt, TryFutureExt, TryStreamExt, FutureExt};
use object_store::path::Path as ObjectPath;
use object_store::ObjectStore;
use polars_core::cloud::CloudOptions;
Expand All @@ -20,7 +20,7 @@ use polars_core::schema::Schema;

use super::cloud::{build, CloudLocation, CloudReader};
use super::mmap;
use super::mmap::ColumnStore;
use super::mmap::CloudMapper;
use super::read_impl::FetchRowGroups;

pub struct ParquetObjectStore {
Expand Down Expand Up @@ -104,7 +104,6 @@ type RowGroupChunks<'a> = Vec<Vec<(&'a ColumnChunkMetaData, Vec<u8>)>>;

/// Download rowgroups for the column whose indexes are given in `projection`.
/// We concurrently download the columns for each field.
#[tokio::main(flavor = "current_thread")]
async fn download_projection<'a: 'b, 'b>(
projection: &[usize],
row_groups: &'a [RowGroupMetaData],
Expand Down Expand Up @@ -149,7 +148,7 @@ pub(crate) struct FetchRowGroupsFromObjectStore {
row_groups_metadata: Vec<RowGroupMetaData>,
projection: Vec<usize>,
logging: bool,
schema: ArrowSchema,
pub schema: ArrowSchema,
}

impl FetchRowGroupsFromObjectStore {
Expand All @@ -173,11 +172,12 @@ impl FetchRowGroupsFromObjectStore {
schema,
})
}
}

impl FetchRowGroups for FetchRowGroupsFromObjectStore {
fn fetch_row_groups(&mut self, row_groups: Range<usize>) -> PolarsResult<ColumnStore> {
// Fetch the required row groups.
/// Fetch the required row groups asynchronously.
pub async fn fetch_row_groups_async(
&mut self,
row_groups: Range<usize>,
) -> PolarsResult<CloudMapper> {
let row_groups = &self
.row_groups_metadata
.get(row_groups.clone())
Expand All @@ -194,9 +194,9 @@ impl FetchRowGroups for FetchRowGroupsFromObjectStore {
Ok,
)?;

// Package in the format required by ColumnStore.
// Package in the format required by ColumnAccess.
let downloaded =
download_projection(&self.projection, row_groups, &self.schema, &self.reader)?;
download_projection(&self.projection, row_groups, &self.schema, &self.reader).await?;
if self.logging {
eprintln!(
"BatchedParquetReader: fetched {} row_groups for {} fields, yielding {} column chunks.",
Expand Down Expand Up @@ -224,6 +224,15 @@ impl FetchRowGroups for FetchRowGroupsFromObjectStore {
);
}

Ok(mmap::ColumnStore::Fetched(downloaded_per_filepos))
Ok(mmap::CloudMapper::Fetched(downloaded_per_filepos))
}
}

impl FetchRowGroups for FetchRowGroupsFromObjectStore {
fn fetch_row_groups<'a>(
&'a mut self,
row_groups: Range<usize>,
) -> BoxFuture<'a, PolarsResult<CloudMapper>> {
self.fetch_row_groups_async(row_groups).boxed()
}
}
124 changes: 124 additions & 0 deletions polars/polars-io/src/parquet/async_manager.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
//! Polars is a heavily multi-threaded library. Some IO operations, specially cloud based ones,
//! are best served by an async module. The AsyncManager owns a multi-threaded Tokio runtime
//! and is responsible for managing the async calls to the object store and the associated state.

use std::ops::Range;

use arrow::io::parquet::read::RowGroupMetaData;
use arrow::io::parquet::write::FileMetaData;
use futures::channel::mpsc::Sender;
use futures::channel::oneshot;
use once_cell::sync::Lazy;
use polars_core::prelude::*;
use tokio::runtime::Runtime;

use super::async_impl::ParquetObjectStore;
use super::mmap::CloudMapper;

static GLOBAL_ASYNC_MANAGER: Lazy<AsyncManager> = Lazy::new(AsyncManager::default);

enum AsyncParquetReaderMessage {
/// Fetch the metadata of the parquet file, do not memoize it.
FetchMetadata {
/// The channel to send the result to.
tx: oneshot::Sender<PolarsResult<FileMetaData>>,
},
/// Fetch and memoize the metadata of the parquet file.
GetMetadata {
/// The channel to send the result to.
tx: oneshot::Sender<PolarsResult<FileMetaData>>,
},
/// Fetch the number of rows of the parquet file.
NumRows {
/// The channel to send the result to.
tx: oneshot::Sender<PolarsResult<usize>>,
},
/// Fetch the schema of the parquet file.
Schema {
/// The channel to send the result to.
tx: oneshot::Sender<PolarsResult<ArrowSchema>>,
},
/// Fetch the row groups of the parquet file.
RowGroups {
/// The channel to send the result to.
tx: oneshot::Sender<PolarsResult<Vec<RowGroupMetaData>>>,
},
/// Fetch the row groups of the parquet file.
FetchRowGroups {
/// The row groups to fetch.
row_groups: Range<usize>,
/// The channel to send the result to.
tx: oneshot::Sender<PolarsResult<CloudMapper>>,
},
}

/// Separate the async calls in their own manager and interact with the rest of the code with a channel.
pub(crate) struct AsyncManager {
/// The channel to communicate with the manager.
tx: Sender<AsyncParquetReaderMessage>,
/// A handle to the Tokio runtime running the manager.
runtime: Runtime,
/// Opened readers.
readers: PlHashMap<String, Arc<ParquetObjectStore>>,
}

impl AsyncManager {
/// Create a new async manager.
pub fn new() -> AsyncManager {
use futures::stream::StreamExt;

let (tx, rx) = futures::channel::mpsc::channel(1);
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap();
runtime.spawn(async move {
let mut reader = None;
while let Some(message) = rx.next().await {
match message {
AsyncParquetReaderMessage::FetchMetadata { tx } => {
let reader = reader.as_mut().unwrap();
let result = reader.fetch_metadata().await;
tx.send(result).unwrap();
}
AsyncParquetReaderMessage::GetMetadata { tx } => {
let reader = reader.as_mut().unwrap();
let result = reader.get_metadata().await;
tx.send(result).unwrap();
}
AsyncParquetReaderMessage::NumRows { tx } => {
let reader = reader.as_mut().unwrap();
let result = reader.num_rows().await;
tx.send(result).unwrap();
}
AsyncParquetReaderMessage::Schema { tx } => {
let reader = reader.as_mut().unwrap();
let result = reader.schema().await;
tx.send(result).unwrap();
}
AsyncParquetReaderMessage::RowGroups { tx } => {
let reader = reader.as_mut().unwrap();
let result = reader.row_groups().await;
tx.send(result).unwrap();
}
AsyncParquetReaderMessage::FetchRowGroups { row_groups, tx } => {
let reader = reader.as_mut().unwrap();
let result = reader.fetch_row_groups(row_groups).await;
tx.send(result).unwrap();
}
}
}
});
AsyncManager {
tx,
runtime,
readers: PlHashMap::new(),
}
}
}

impl Default for AsyncManager {
fn default() -> Self {
AsyncManager::new()
}
}
18 changes: 9 additions & 9 deletions polars/polars-io/src/parquet/mmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use polars_core::datatypes::PlHashMap;

use super::*;

/// Store columns data in two scenarios:
/// 1. a local memory mapped file
/// Map column data in two scenarios:
/// 1. a local memory mapped file, there is nothing to do in this case.
/// 2. data fetched from cloud storage on demand, in this case
/// a. the key in the hashmap is the start in the file
/// b. the value in the hashmap is the actual data.
Expand All @@ -19,16 +19,16 @@ use super::*;
/// b. asynchronously fetch them in parallel, for example using object_store
/// c. store the data in this data structure
/// d. when all the data is available deserialize on multiple threads, for example using rayon
pub enum ColumnStore<'a> {
Local(&'a [u8]),
pub enum CloudMapper<'a> {
PassThrough(&'a [u8]),
#[cfg(feature = "async")]
Fetched(PlHashMap<u64, Vec<u8>>),
}

/// For local files memory maps all columns that are part of the parquet field `field_name`.
/// For cloud files the relevant memory regions should have been prefetched.
pub(super) fn mmap_columns<'a>(
store: &'a ColumnStore,
store: &'a CloudMapper,
columns: &'a [ColumnChunkMetaData],
field_name: &str,
) -> Vec<(&'a ColumnChunkMetaData, &'a [u8])> {
Expand All @@ -39,17 +39,17 @@ pub(super) fn mmap_columns<'a>(
}

fn _mmap_single_column<'a>(
store: &'a ColumnStore,
store: &'a CloudMapper,
meta: &'a ColumnChunkMetaData,
) -> (&'a ColumnChunkMetaData, &'a [u8]) {
let (start, len) = meta.byte_range();
let chunk = match store {
ColumnStore::Local(file) => &file[start as usize..(start + len) as usize],
CloudMapper::PassThrough(file) => &file[start as usize..(start + len) as usize],
#[cfg(all(feature = "async", feature = "parquet"))]
ColumnStore::Fetched(fetched) => {
CloudMapper::Fetched(fetched) => {
let entry = fetched.get(&start).unwrap_or_else(|| {
panic!(
"mmap_columns: column with start {start} must be prefetched in ColumnStore.\n"
"mmap_columns: column with start {start} must be prefetched in ColumnAccess.\n"
)
});
entry.as_slice()
Expand Down
3 changes: 3 additions & 0 deletions polars/polars-io/src/parquet/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
//!
#[cfg(feature = "async")]
pub(super) mod async_impl;
#[cfg(feature = "async")]
pub(super) mod async_manager;

pub(super) mod mmap;
pub mod predicates;
mod read;
Expand Down
Loading