Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Added support to read ORC (#1189)
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao authored Jul 31, 2022
1 parent 4da1690 commit 644a1de
Show file tree
Hide file tree
Showing 13 changed files with 704 additions and 9 deletions.
3 changes: 2 additions & 1 deletion .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ jobs:
python3 -m venv venv
source venv/bin/activate
pip install pip --upgrade
pip install pyarrow==6
pip install pyarrow==6 pyorc
python parquet_integration/write_parquet.py
python tests/it/io/orc/write.py
deactivate
- uses: Swatinem/rust-cache@v1
- name: Generate code coverage
Expand Down
9 changes: 6 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,17 @@ jobs:
submodules: true # needed to test IPC, which are located in a submodule
- name: Install Rust
run: rustup update stable
- uses: Swatinem/rust-cache@v1
- name: Setup parquet files
run: |
apt update && apt install python3-pip python3-venv -y -q
python3 -m venv venv
source venv/bin/activate
pip install pip --upgrade
pip install pyarrow==6
pip install pyarrow==6 pyorc
python parquet_integration/write_parquet.py
python tests/it/io/orc/write.py
deactivate
- uses: Swatinem/rust-cache@v1
- name: Run
run: cargo test --features full

Expand All @@ -41,7 +42,9 @@ jobs:
- uses: Swatinem/rust-cache@v1
- name: Run
shell: bash
run: ARROW2_IGNORE_PARQUET= cargo test --features full
run: |
cargo check --features full
cargo test --tests
clippy:
name: Clippy
Expand Down
17 changes: 14 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -72,16 +72,21 @@ parquet2 = { version = "0.14.0", optional = true, default_features = false }

# avro support
avro-schema = { version = "0.2", optional = true }
serde = { version = "^1.0", features = ["rc"], optional = true }
serde_derive = { version = "^1.0", optional = true }
serde_json = { version = "^1.0", features = ["preserve_order"], optional = true }
# compression of avro
libflate = { version = "1.1.1", optional = true }
snap = { version = "1", optional = true }
crc = { version = "2", optional = true }
# async avro
async-stream = { version = "0.3.2", optional = true }

# ORC support
orc-format = { version = "0.3.0", optional = true }

# Arrow integration tests support
serde = { version = "^1.0", features = ["rc"], optional = true }
serde_derive = { version = "^1.0", optional = true }
serde_json = { version = "^1.0", features = ["preserve_order"], optional = true }

# for division/remainder optimization at runtime
strength_reduce = { version = "0.2", optional = true }

Expand Down Expand Up @@ -126,6 +131,7 @@ full = [
"io_parquet",
"io_parquet_compression",
"io_avro",
"io_orc",
"io_avro_compression",
"io_avro_async",
"regex",
Expand All @@ -145,6 +151,7 @@ io_ipc_write_async = ["io_ipc", "futures"]
io_ipc_read_async = ["io_ipc", "futures", "async-stream"]
io_ipc_compression = ["lz4", "zstd"]
io_flight = ["io_ipc", "arrow-format/flight-data"]

# base64 + io_ipc because arrow schemas are stored as base64-encoded ipc format.
io_parquet = ["parquet2", "io_ipc", "base64", "futures", "streaming-iterator", "fallible-streaming-iterator"]
io_parquet_compression = [
Expand All @@ -154,13 +161,17 @@ io_parquet_compression = [
"parquet2/lz4",
"parquet2/brotli",
]

io_avro = ["avro-schema", "streaming-iterator", "fallible-streaming-iterator", "serde_json"]
io_avro_compression = [
"libflate",
"snap",
"crc",
]
io_avro_async = ["io_avro", "futures", "async-stream"]

io_orc = [ "orc-format" ]

# serde+serde_json: its dependencies + error handling
# serde_derive: there is some derive around
io_json_integration = ["hex", "serde", "serde_derive", "serde_json", "io_ipc"]
Expand Down
4 changes: 3 additions & 1 deletion DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@ source venv/bin/activate
pip install pip --upgrade

# Install pyarrow, version 6
pip install pyarrow==6
pip install pyarrow==6 pyorc

# Generate the parquet files (this might take some time, depending on your computer setup)
python parquet_integration/write_parquet.py
# generate ORC files
python parquet_integration/write_parquet.py

# Get out of venv, back to normal terminal
deactivate
Expand Down
55 changes: 55 additions & 0 deletions examples/orc_read.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use arrow2::array::*;
use arrow2::error::Error;
use arrow2::io::orc::{format, read};

fn deserialize_column(path: &str, column_name: &str) -> Result<Box<dyn Array>, Error> {
// open the file
let mut reader = std::fs::File::open(path).unwrap();

// read its metadata (IO-bounded)
let metadata = format::read::read_metadata(&mut reader)?;

// infer its (Arrow) [`Schema`]
let schema = read::infer_schema(&metadata.footer)?;

// find the position of the column in the schema
let (pos, field) = schema
.fields
.iter()
.enumerate()
.find(|f| f.1.name == column_name)
.unwrap();

// pick a stripe (basically a set of rows)
let stripe = 0;

// read the stripe's footer (IO-bounded)
let footer = format::read::read_stripe_footer(&mut reader, &metadata, stripe, &mut vec![])?;

// read the column's data from the stripe (IO-bounded)
let data_type = field.data_type.clone();
let column = format::read::read_stripe_column(
&mut reader,
&metadata,
0,
footer,
// 1 because ORC schemas always start with a struct, which we ignore
1 + pos as u32,
vec![],
)?;

// finally, deserialize to Arrow (CPU-bounded)
read::deserialize(data_type, &column)
}

fn main() -> Result<(), Error> {
use std::env;
let args: Vec<String> = env::args().collect();

let file_path = &args[1];
let column = &args[2];

let array = deserialize_column(file_path, column)?;
println!("{array:?}");
Ok(())
}
4 changes: 4 additions & 0 deletions src/io/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,10 @@
#[cfg(feature = "io_odbc")]
pub mod odbc;

#[cfg(feature = "io_orc")]
#[cfg_attr(docsrs, doc(cfg(feature = "io_orc")))]
pub mod orc;

#[cfg(any(
feature = "io_csv_read",
feature = "io_csv_read_async",
Expand Down
12 changes: 12 additions & 0 deletions src/io/orc/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
//! APIs to read from [ORC format](https://orc.apache.org).
pub mod read;

pub use orc_format as format;

use crate::error::Error;

impl From<format::error::Error> for Error {
fn from(error: format::error::Error) -> Self {
Error::ExternalFormat(format!("{:?}", error))
}
}
Loading

0 comments on commit 644a1de

Please sign in to comment.