Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Added support to read ORC #1189

Merged
merged 8 commits into from
Jul 31, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ jobs:
python3 -m venv venv
source venv/bin/activate
pip install pip --upgrade
pip install pyarrow==6
pip install pyarrow==6 pyorc
python parquet_integration/write_parquet.py
python tests/it/io/orc/write.py
deactivate
- uses: Swatinem/rust-cache@v1
- name: Generate code coverage
Expand Down
9 changes: 6 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,17 @@ jobs:
submodules: true # needed to test IPC, which are located in a submodule
- name: Install Rust
run: rustup update stable
- uses: Swatinem/rust-cache@v1
- name: Setup parquet files
run: |
apt update && apt install python3-pip python3-venv -y -q
python3 -m venv venv
source venv/bin/activate
pip install pip --upgrade
pip install pyarrow==6
pip install pyarrow==6 pyorc
python parquet_integration/write_parquet.py
python tests/it/io/orc/write.py
deactivate
- uses: Swatinem/rust-cache@v1
- name: Run
run: cargo test --features full

Expand All @@ -41,7 +42,9 @@ jobs:
- uses: Swatinem/rust-cache@v1
- name: Run
shell: bash
run: ARROW2_IGNORE_PARQUET= cargo test --features full
run: |
cargo check --features full
cargo test --tests

clippy:
name: Clippy
Expand Down
17 changes: 14 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,21 @@ parquet2 = { version = "0.14.0", optional = true, default_features = false }

# avro support
avro-schema = { version = "0.2", optional = true }
serde = { version = "^1.0", features = ["rc"], optional = true }
serde_derive = { version = "^1.0", optional = true }
serde_json = { version = "^1.0", features = ["preserve_order"], optional = true }
# compression of avro
libflate = { version = "1.1.1", optional = true }
snap = { version = "1", optional = true }
crc = { version = "2", optional = true }
# async avro
async-stream = { version = "0.3.2", optional = true }

# ORC support
orc-format = { version = "0.3.0", optional = true }

# Arrow integration tests support
serde = { version = "^1.0", features = ["rc"], optional = true }
serde_derive = { version = "^1.0", optional = true }
serde_json = { version = "^1.0", features = ["preserve_order"], optional = true }

# for division/remainder optimization at runtime
strength_reduce = { version = "0.2", optional = true }

Expand Down Expand Up @@ -126,6 +131,7 @@ full = [
"io_parquet",
"io_parquet_compression",
"io_avro",
"io_orc",
"io_avro_compression",
"io_avro_async",
"regex",
Expand All @@ -145,6 +151,7 @@ io_ipc_write_async = ["io_ipc", "futures"]
io_ipc_read_async = ["io_ipc", "futures", "async-stream"]
io_ipc_compression = ["lz4", "zstd"]
io_flight = ["io_ipc", "arrow-format/flight-data"]

# base64 + io_ipc because arrow schemas are stored as base64-encoded ipc format.
io_parquet = ["parquet2", "io_ipc", "base64", "futures", "streaming-iterator", "fallible-streaming-iterator"]
io_parquet_compression = [
Expand All @@ -154,13 +161,17 @@ io_parquet_compression = [
"parquet2/lz4",
"parquet2/brotli",
]

io_avro = ["avro-schema", "streaming-iterator", "fallible-streaming-iterator", "serde_json"]
io_avro_compression = [
"libflate",
"snap",
"crc",
]
io_avro_async = ["io_avro", "futures", "async-stream"]

io_orc = [ "orc-format" ]

# serde+serde_json: its dependencies + error handling
# serde_derive: there is some derive around
io_json_integration = ["hex", "serde", "serde_derive", "serde_json", "io_ipc"]
Expand Down
4 changes: 3 additions & 1 deletion DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@ source venv/bin/activate
pip install pip --upgrade

# Install pyarrow, version 6
pip install pyarrow==6
pip install pyarrow==6 pyorc

# Generate the parquet files (this might take some time, depending on your computer setup)
python parquet_integration/write_parquet.py
# generate ORC files
python parquet_integration/write_parquet.py

# Get out of venv, back to normal terminal
deactivate
Expand Down
55 changes: 55 additions & 0 deletions examples/orc_read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use arrow2::array::*;
use arrow2::error::Error;
use arrow2::io::orc::{format, read};

fn deserialize_column(path: &str, column_name: &str) -> Result<Box<dyn Array>, Error> {
// open the file
let mut reader = std::fs::File::open(path).unwrap();

// read its metadata (IO-bounded)
let metadata = format::read::read_metadata(&mut reader)?;

// infer its (Arrow) [`Schema`]
let schema = read::infer_schema(&metadata.footer)?;

// find the position of the column in the schema
let (pos, field) = schema
.fields
.iter()
.enumerate()
.find(|f| f.1.name == column_name)
.unwrap();

// pick a stripe (basically a set of rows)
let stripe = 0;

// read the stripe's footer (IO-bounded)
let footer = format::read::read_stripe_footer(&mut reader, &metadata, stripe, &mut vec![])?;

// read the column's data from the stripe (IO-bounded)
let data_type = field.data_type.clone();
let column = format::read::read_stripe_column(
&mut reader,
&metadata,
0,
footer,
// 1 because ORC schemas always start with a struct, which we ignore
1 + pos as u32,
vec![],
)?;

// finally, deserialize to Arrow (CPU-bounded)
read::deserialize(data_type, &column)
}

fn main() -> Result<(), Error> {
use std::env;
let args: Vec<String> = env::args().collect();

let file_path = &args[1];
let column = &args[2];

let array = deserialize_column(file_path, column)?;
println!("{array:?}");
Ok(())
}
4 changes: 4 additions & 0 deletions src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
#[cfg(feature = "io_odbc")]
pub mod odbc;

#[cfg(feature = "io_orc")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_orc")))]
pub mod orc;

#[cfg(any(
feature = "io_csv_read",
feature = "io_csv_read_async",
Expand Down
12 changes: 12 additions & 0 deletions src/io/orc/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
//! APIs to read from [ORC format](https://orc.apache.org).
pub mod read;

pub use orc_format as format;

use crate::error::Error;

impl From<format::error::Error> for Error {
fn from(error: format::error::Error) -> Self {
Error::ExternalFormat(format!("{:?}", error))
}
}
Loading