Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Replace flatbuffers dependency by Planus #732

Merged
merged 3 commits into from
Jan 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions .cargo/audit.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,4 @@ ignore = [
# Therefore, this advisory does not affect us.
"RUSTSEC-2020-0071",
"RUSTSEC-2020-0159", # same as previous

# this cannot be addressed, only mitigated.
# See [.github/workflows/security.yml] for details on how we mitigate this.
"RUSTSEC-2021-0122",
]
26 changes: 0 additions & 26 deletions .github/workflows/security.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,3 @@ jobs:
- uses: actions-rs/audit-check@v1
with:
token: ${{ secrets.GITHUB_TOKEN }}

# mitigation for RUSTSEC-2021-0122
# flatbuffers' usage of `unsafe` is problematic and a risk.
# This performs a round-trip over IPC (that uses flatbuffers) for some arrow types
# using miri, which hits much of `flatbuffers` usage in this crate.
miri-checks:
name: RUSTSEC-2021-0122 mitigation
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
with:
submodules: true # needed to test IPC, which are located in a submodule
- uses: actions-rs/toolchain@v1
with:
toolchain: nightly-2021-12-10
override: true
- uses: Swatinem/rust-cache@v1
with:
key: key1
- name: Install Miri
run: |
rustup component add miri
cargo miri setup
- name: Run
run: MIRIFLAGS="-Zmiri-disable-stacked-borrows -Zmiri-disable-isolation" cargo miri test --tests --features io_ipc,io_ipc_compression,io_json_integration io::ipc::write::file::write_100_nested
21 changes: 21 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,27 @@ jobs:
# --skip io: miri can't handle opening of files, so we skip those
run: cargo miri test --features full -- --skip io::parquet --skip io::ipc

miri-checks-io:
name: MIRI on IO IPC
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
with:
submodules: true # needed to test IPC, which are located in a submodule
- uses: actions-rs/toolchain@v1
with:
toolchain: nightly-2021-12-10
override: true
- uses: Swatinem/rust-cache@v1
with:
key: key1
- name: Install Miri
run: |
rustup component add miri
cargo miri setup
- name: Run
run: MIRIFLAGS="-Zmiri-disable-isolation" cargo miri test --tests --features io_ipc,io_json_integration io::ipc::write::write_sliced_list

coverage:
name: Coverage
runs-on: ubuntu-latest
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ indexmap = { version = "^1.6", optional = true }
# used to print columns in a nice columnar format
comfy-table = { version = "5.0", optional = true, default-features = false }

arrow-format = { version = "0.3.0", optional = true, features = ["ipc"] }
arrow-format = { version = "0.4", optional = true, features = ["ipc"] }

hex = { version = "^0.4", optional = true }

Expand Down
2 changes: 1 addition & 1 deletion integration-testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ logging = ["tracing-subscriber"]

[dependencies]
arrow2 = { path = "../", features = ["io_ipc", "io_ipc_compression", "io_flight", "io_json_integration"] }
arrow-format = { version = "0.3.0", features = ["full"] }
arrow-format = { version = "0.4", features = ["full"] }
async-trait = "0.1.41"
clap = "2.33"
futures = "0.3"
Expand Down
34 changes: 15 additions & 19 deletions integration-testing/src/flight_client_scenarios/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ use arrow2::{
ipc::IpcField,
},
};
use arrow_format::flight::data::{
flight_descriptor::DescriptorType, FlightData, FlightDescriptor, Location, Ticket,
};
use arrow_format::flight::service::flight_service_client::FlightServiceClient;
use arrow_format::ipc;
use arrow_format::ipc::Message::MessageHeader;
use arrow_format::{
flight::data::{
flight_descriptor::DescriptorType, FlightData, FlightDescriptor, Location, Ticket,
},
ipc::planus::ReadAsRoot,
};
use futures::{channel::mpsc, sink::SinkExt, stream, StreamExt};
use tonic::{Request, Streaming};

Expand Down Expand Up @@ -267,25 +269,19 @@ async fn receive_batch_flight_data(
) -> Option<FlightData> {
let mut data = resp.next().await?.ok()?;
let mut message =
ipc::Message::root_as_message(&data.data_header[..]).expect("Error parsing first message");
ipc::MessageRef::read_as_root(&data.data_header).expect("Error parsing first message");

while message.header_type() == MessageHeader::DictionaryBatch {
while let ipc::MessageHeaderRef::DictionaryBatch(batch) = message
.header()
.expect("Header to be valid flatbuffers")
.expect("Header to be present")
{
let mut reader = std::io::Cursor::new(&data.data_body);
read::read_dictionary(
message
.header_as_dictionary_batch()
.expect("Error parsing dictionary"),
fields,
ipc_schema,
dictionaries,
&mut reader,
0,
)
.expect("Error reading dictionary");
read::read_dictionary(batch, fields, ipc_schema, dictionaries, &mut reader, 0)
.expect("Error reading dictionary");

data = resp.next().await?.ok()?;
message =
ipc::Message::root_as_message(&data.data_header[..]).expect("Error parsing message");
message = ipc::MessageRef::read_as_root(&data.data_header).expect("Error parsing message");
}

Some(data)
Expand Down
44 changes: 22 additions & 22 deletions integration-testing/src/flight_server_scenarios/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ use arrow2::io::ipc::IpcSchema;
use arrow_format::flight::data::flight_descriptor::*;
use arrow_format::flight::data::*;
use arrow_format::flight::service::flight_service_server::*;
use arrow_format::ipc::Message::{root_as_message, Message, MessageHeader};
use arrow_format::ipc::Schema as ArrowSchema;
use arrow_format::ipc::planus::ReadAsRoot;
use arrow_format::ipc::MessageHeaderRef;

use arrow2::{datatypes::*, io::flight::serialize_schema_to_info, io::ipc};

Expand Down Expand Up @@ -276,25 +276,21 @@ async fn send_app_metadata(
}

async fn record_batch_from_message(
message: Message<'_>,
batch: arrow_format::ipc::RecordBatchRef<'_>,
data_body: &[u8],
fields: &[Field],
ipc_schema: &IpcSchema,
dictionaries: &mut Dictionaries,
) -> Result<Chunk<Arc<dyn Array>>, Status> {
let ipc_batch = message
.header_as_record_batch()
.ok_or_else(|| Status::internal("Could not parse message header as record batch"))?;

let mut reader = std::io::Cursor::new(data_body);

let arrow_batch_result = ipc::read::read_record_batch(
ipc_batch,
batch,
fields,
ipc_schema,
None,
dictionaries,
ArrowSchema::MetadataVersion::V5,
arrow_format::ipc::MetadataVersion::V5,
&mut reader,
0,
);
Expand All @@ -303,20 +299,16 @@ async fn record_batch_from_message(
}

async fn dictionary_from_message(
message: Message<'_>,
dict_batch: arrow_format::ipc::DictionaryBatchRef<'_>,
data_body: &[u8],
fields: &[Field],
ipc_schema: &IpcSchema,
dictionaries: &mut Dictionaries,
) -> Result<(), Status> {
let ipc_batch = message
.header_as_dictionary_batch()
.ok_or_else(|| Status::internal("Could not parse message header as dictionary batch"))?;

let mut reader = std::io::Cursor::new(data_body);

let dictionary_batch_result =
ipc::read::read_dictionary(ipc_batch, fields, ipc_schema, dictionaries, &mut reader, 0);
ipc::read::read_dictionary(dict_batch, fields, ipc_schema, dictionaries, &mut reader, 0);
dictionary_batch_result
.map_err(|e| Status::internal(format!("Could not convert to Dictionary: {:?}", e)))
}
Expand All @@ -335,20 +327,28 @@ async fn save_uploaded_chunks(
let mut dictionaries = Default::default();

while let Some(Ok(data)) = input_stream.next().await {
let message = root_as_message(&data.data_header[..])
let message = arrow_format::ipc::MessageRef::read_as_root(&data.data_header)
.map_err(|e| Status::internal(format!("Could not parse message: {:?}", e)))?;
let header = message
.header()
.map_err(|x| Status::internal(x.to_string()))?
.ok_or_else(|| {
Status::internal(
"Unable to convert flight data header to a record batch".to_string(),
)
})?;

match message.header_type() {
MessageHeader::Schema => {
match header {
MessageHeaderRef::Schema(_) => {
return Err(Status::internal(
"Not expecting a schema when messages are read",
))
}
MessageHeader::RecordBatch => {
MessageHeaderRef::RecordBatch(batch) => {
send_app_metadata(&mut response_tx, &data.app_metadata).await?;

let batch = record_batch_from_message(
message,
batch,
&data.data_body,
&schema.fields,
&ipc_schema,
Expand All @@ -358,9 +358,9 @@ async fn save_uploaded_chunks(

chunks.push(batch);
}
MessageHeader::DictionaryBatch => {
MessageHeaderRef::DictionaryBatch(dict_batch) => {
dictionary_from_message(
message,
dict_batch,
&data.data_body,
&schema.fields,
&ipc_schema,
Expand Down
7 changes: 7 additions & 0 deletions src/ffi/ffi.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,13 @@ pub struct Ffi_ArrowArray {
private_data: *mut ::std::os::raw::c_void,
}

// Sound because the arrow specification does not allow multiple implementations
// to change this struct
// This is intrinsically impossible to prove because the implementations agree
// on this as part of the Arrow specification
unsafe impl Send for Ffi_ArrowArray {}
unsafe impl Sync for Ffi_ArrowArray {}

impl Drop for Ffi_ArrowArray {
fn drop(&mut self) {
match self.release {
Expand Down
60 changes: 25 additions & 35 deletions src/io/flight/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::sync::Arc;

use arrow_format::flight::data::{FlightData, SchemaResult};
use arrow_format::ipc;
use arrow_format::ipc::planus::ReadAsRoot;

use crate::{
array::Array,
Expand Down Expand Up @@ -84,19 +85,7 @@ fn schema_as_encoded_data(schema: &Schema, ipc_fields: &[IpcField]) -> EncodedDa
/// Deserialize an IPC message into [`Schema`], [`IpcSchema`].
/// Use to deserialize [`FlightData::data_header`] and [`SchemaResult::schema`].
pub fn deserialize_schemas(bytes: &[u8]) -> Result<(Schema, IpcSchema)> {
if let Ok(ipc) = ipc::Message::root_as_message(bytes) {
if let Some(schemas) = ipc.header_as_schema().map(read::fb_to_schema) {
schemas
} else {
Err(ArrowError::OutOfSpec(
"Unable to get head as schema".to_string(),
))
}
} else {
Err(ArrowError::OutOfSpec(
"Unable to get root as message".to_string(),
))
}
read::deserialize_schema(bytes)
}

/// Deserializes [`FlightData`] to [`Chunk`].
Expand All @@ -107,29 +96,30 @@ pub fn deserialize_batch(
dictionaries: &read::Dictionaries,
) -> Result<Chunk<Arc<dyn Array>>> {
// check that the data_header is a record batch message
let message = ipc::Message::root_as_message(&data.data_header[..]).map_err(|err| {
ArrowError::OutOfSpec(format!("Unable to get root as message: {:?}", err))
})?;
let message =
arrow_format::ipc::MessageRef::read_as_root(&data.data_header).map_err(|err| {
ArrowError::OutOfSpec(format!("Unable to get root as message: {:?}", err))
})?;

let mut reader = std::io::Cursor::new(&data.data_body);

message
.header_as_record_batch()
.ok_or_else(|| {
ArrowError::OutOfSpec(
"Unable to convert flight data header to a record batch".to_string(),
)
})
.map(|batch| {
read::read_record_batch(
batch,
fields,
ipc_schema,
None,
dictionaries,
ipc::Schema::MetadataVersion::V5,
&mut reader,
0,
)
})?
let version = message.version()?;

match message.header()?.ok_or_else(|| {
ArrowError::oos("Unable to convert flight data header to a record batch".to_string())
})? {
ipc::MessageHeaderRef::RecordBatch(batch) => read::read_record_batch(
batch,
fields,
ipc_schema,
None,
dictionaries,
version,
&mut reader,
0,
),
_ => Err(ArrowError::nyi(
"flight currently only supports reading RecordBatch messages",
)),
}
}
2 changes: 2 additions & 0 deletions src/io/ipc/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ mod tests {

#[cfg(feature = "io_ipc_compression")]
#[test]
#[cfg_attr(miri, ignore)] // ZSTD uses foreign calls that miri does not support
fn round_trip_zstd() {
let data: Vec<u8> = (0..200u8).map(|x| x % 10).collect();
let mut buffer = vec![];
Expand All @@ -79,6 +80,7 @@ mod tests {

#[cfg(feature = "io_ipc_compression")]
#[test]
#[cfg_attr(miri, ignore)] // LZ4 uses foreign calls that miri does not support
fn round_trip_lz4() {
let data: Vec<u8> = (0..200u8).map(|x| x % 10).collect();
let mut buffer = vec![];
Expand Down
8 changes: 8 additions & 0 deletions src/io/ipc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@
//! [2](https://github.com/jorgecarleitao/arrow2/blob/main/examples/ipc_file_write.rs),
//! [3](https://github.com/jorgecarleitao/arrow2/tree/main/examples/ipc_pyarrow)).
use crate::error::ArrowError;

mod compression;
mod endianess;

Expand All @@ -100,3 +102,9 @@ pub struct IpcSchema {
pub fields: Vec<IpcField>,
pub is_little_endian: bool,
}

impl From<arrow_format::ipc::planus::Error> for ArrowError {
fn from(error: arrow_format::ipc::planus::Error) -> Self {
ArrowError::OutOfSpec(error.to_string())
}
}
Loading