From 47a632c3a2ff7217df21ae53c3c7a7f1f5013aa9 Mon Sep 17 00:00:00 2001 From: carlos Date: Fri, 15 Oct 2021 23:06:02 +0800 Subject: [PATCH 01/23] Add support to read compressed avro (#512) --- Cargo.toml | 4 +++- src/io/avro/read/mod.rs | 6 +++++- tests/it/io/avro/read/mod.rs | 33 ++++++++++++++++++++++++++++----- 3 files changed, 36 insertions(+), 7 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 96c9d35a8c5..d5982d72553 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,6 +64,8 @@ parquet2 = { version = "0.5.2", optional = true, default_features = false, featu avro-rs = { version = "0.13", optional = true, default_features = false } +libflate = { version = "1.1.1", optional = true } + # for division/remainder optimization at runtime strength_reduce = { version = "0.2", optional = true } @@ -114,7 +116,7 @@ io_parquet_compression = [ "parquet2/lz4", "parquet2/brotli", ] -io_avro = ["avro-rs", "streaming-iterator", "serde_json"] +io_avro = ["avro-rs", "streaming-iterator", "serde_json", "libflate"] # io_json: its dependencies + error handling # serde_derive: there is some derive around io_json_integration = ["io_json", "serde_derive", "hex"] diff --git a/src/io/avro/read/mod.rs b/src/io/avro/read/mod.rs index 19440ee1559..069c40393c2 100644 --- a/src/io/avro/read/mod.rs +++ b/src/io/avro/read/mod.rs @@ -4,6 +4,7 @@ use std::io::Read; use std::sync::Arc; use avro_rs::{Codec, Schema as AvroSchema}; +use libflate::deflate::Decoder; use streaming_iterator::StreamingIterator; mod deserialize; @@ -74,7 +75,10 @@ fn decompress_block(buf: &mut Vec, decompress: &mut Vec, codec: Codec) - Ok(false) } Codec::Deflate => { - todo!() + decompress.clear(); + let mut decoder = Decoder::new(&buf[..]); + decoder.read_to_end(decompress)?; + Ok(true) } } } diff --git a/tests/it/io/avro/read/mod.rs b/tests/it/io/avro/read/mod.rs index c3b33ebd391..8435820d17a 100644 --- a/tests/it/io/avro/read/mod.rs +++ b/tests/it/io/avro/read/mod.rs @@ -2,7 +2,7 @@ use std::sync::Arc; use arrow2::types::months_days_ns; use avro_rs::types::{Record, Value}; -use avro_rs::Writer; +use avro_rs::{Codec, Writer}; use avro_rs::{Days, Duration, Millis, Months, Schema as AvroSchema}; use arrow2::array::*; @@ -82,10 +82,15 @@ fn schema() -> (AvroSchema, Schema) { (AvroSchema::parse_str(raw_schema).unwrap(), schema) } -fn write() -> Result<(Vec, RecordBatch)> { +fn write(has_codec: bool) -> Result<(Vec, RecordBatch)> { let (avro, schema) = schema(); // a writer needs a schema and something to write to - let mut writer = Writer::new(&avro, Vec::new()); + let mut writer: Writer>; + if has_codec { + writer = Writer::with_codec(&avro, Vec::new(), Codec::Deflate); + } else { + writer = Writer::new(&avro, Vec::new()); + } // the Record type models our Record schema let mut record = Record::new(writer.schema()).unwrap(); @@ -170,8 +175,26 @@ fn write() -> Result<(Vec, RecordBatch)> { } #[test] -fn read() -> Result<()> { - let (data, expected) = write().unwrap(); +fn read_without_codec() -> Result<()> { + let (data, expected) = write(false).unwrap(); + + let file = &mut &data[..]; + + let (avro_schema, schema, codec, file_marker) = read::read_metadata(file)?; + + let mut reader = read::Reader::new( + read::Decompressor::new(read::BlockStreamIterator::new(file, file_marker), codec), + avro_schema, + Arc::new(schema), + ); + + assert_eq!(reader.next().unwrap().unwrap(), expected); + Ok(()) +} + +#[test] +fn read_with_codec() -> Result<()> { + let (data, expected) = write(true).unwrap(); let file = &mut &data[..]; From fb8b6f3aad8d339a51730e15c97286d6a5aaf0f9 Mon Sep 17 00:00:00 2001 From: Afrizal Fikri Date: Fri, 15 Oct 2021 23:02:46 +0700 Subject: [PATCH 02/23] Improved performance of writing decimal to `parquet` (#528) --- src/io/parquet/write/mod.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/io/parquet/write/mod.rs b/src/io/parquet/write/mod.rs index a9537592b11..b40fccac59e 100644 --- a/src/io/parquet/write/mod.rs +++ b/src/io/parquet/write/mod.rs @@ -298,9 +298,7 @@ pub fn array_to_page( primitive::array_to_page::(&array, options, descriptor) } else { let size = decimal_length_from_precision(precision); - - let mut values = MutableBuffer::::new(); // todo: this can be estimated - + let mut values = MutableBuffer::::with_capacity(size * array.len()); array.values().iter().for_each(|x| { let bytes = &x.to_be_bytes()[16 - size..]; values.extend_from_slice(bytes) From ee68d18df120b2df389137f3d27f99a97f610fc4 Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Sat, 16 Oct 2021 13:45:15 +0200 Subject: [PATCH 03/23] Improved parquet read benches (#533) --- benches/read_parquet.rs | 53 ++++++++++++++++------- benchmarks/.gitignore | 1 + benchmarks/bench_read.py | 49 +++++++++++++++++++++ benchmarks/run.py | 20 +++++++++ benchmarks/summarize.py | 51 ++++++++++++++++++++++ parquet_integration/bench_read.py | 26 ----------- parquet_integration/write_parquet.py | 65 +++++++++++++++++++++------- 7 files changed, 207 insertions(+), 58 deletions(-) create mode 100644 benchmarks/.gitignore create mode 100644 benchmarks/bench_read.py create mode 100644 benchmarks/run.py create mode 100644 benchmarks/summarize.py delete mode 100644 parquet_integration/bench_read.py diff --git a/benches/read_parquet.rs b/benches/read_parquet.rs index 55ce25fc96d..8f536ed6842 100644 --- a/benches/read_parquet.rs +++ b/benches/read_parquet.rs @@ -6,9 +6,18 @@ use criterion::{criterion_group, criterion_main, Criterion}; use arrow2::error::Result; use arrow2::io::parquet::read; -fn to_buffer(size: usize) -> Vec { +fn to_buffer(size: usize, dict: bool, multi_page: bool, compressed: bool) -> Vec { let dir = env!("CARGO_MANIFEST_DIR"); - let path = PathBuf::from(dir).join(format!("fixtures/pyarrow3/v1/benches_{}.parquet", size)); + + let dict = if dict { "dict/" } else { "" }; + let multi_page = if multi_page { "multi/" } else { "" }; + let compressed = if compressed { "snappy/" } else { "" }; + + let path = PathBuf::from(dir).join(format!( + "fixtures/pyarrow3/v1/{}{}{}benches_{}.parquet", + dict, multi_page, compressed, size + )); + let metadata = fs::metadata(&path).expect("unable to read metadata"); let mut file = fs::File::open(path).unwrap(); let mut buffer = vec![0; metadata.len() as usize]; @@ -16,7 +25,7 @@ fn to_buffer(size: usize) -> Vec { buffer } -fn read_decompressed_pages(buffer: &[u8], size: usize, column: usize) -> Result<()> { +fn read_batch(buffer: &[u8], size: usize, column: usize) -> Result<()> { let file = Cursor::new(buffer); let reader = read::RecordReader::try_new(file, Some(vec![column]), None, None, None)?; @@ -31,26 +40,38 @@ fn read_decompressed_pages(buffer: &[u8], size: usize, column: usize) -> Result< fn add_benchmark(c: &mut Criterion) { (10..=20).step_by(2).for_each(|i| { let size = 2usize.pow(i); - let buffer = to_buffer(size); + let buffer = to_buffer(size, false, false, false); let a = format!("read i64 2^{}", i); - c.bench_function(&a, |b| { - b.iter(|| read_decompressed_pages(&buffer, size * 8, 0).unwrap()) - }); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap())); let a = format!("read utf8 2^{}", i); - c.bench_function(&a, |b| { - b.iter(|| read_decompressed_pages(&buffer, size * 8, 2).unwrap()) - }); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap())); let a = format!("read utf8 large 2^{}", i); - c.bench_function(&a, |b| { - b.iter(|| read_decompressed_pages(&buffer, size * 8, 6).unwrap()) - }); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 6).unwrap())); let a = format!("read bool 2^{}", i); - c.bench_function(&a, |b| { - b.iter(|| read_decompressed_pages(&buffer, size * 8, 3).unwrap()) - }); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 3).unwrap())); + + let buffer = to_buffer(size, true, false, false); + let a = format!("read utf8 dict 2^{}", i); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap())); + + let buffer = to_buffer(size, false, false, true); + let a = format!("read i64 snappy 2^{}", i); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap())); + + let buffer = to_buffer(size, false, true, false); + let a = format!("read utf8 multi 2^{}", i); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap())); + + let buffer = to_buffer(size, false, true, true); + let a = format!("read utf8 multi snappy 2^{}", i); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap())); + + let buffer = to_buffer(size, false, true, true); + let a = format!("read i64 multi snappy 2^{}", i); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap())); }); } diff --git a/benchmarks/.gitignore b/benchmarks/.gitignore new file mode 100644 index 00000000000..344f079e498 --- /dev/null +++ b/benchmarks/.gitignore @@ -0,0 +1 @@ +runs diff --git a/benchmarks/bench_read.py b/benchmarks/bench_read.py new file mode 100644 index 00000000000..cab190fe2cd --- /dev/null +++ b/benchmarks/bench_read.py @@ -0,0 +1,49 @@ +import timeit +import io +import os +import json + +import pyarrow.parquet + + +def _bench_single(log2_size: int, column: str, use_dict: bool) -> float: + if use_dict: + path = f"fixtures/pyarrow3/v1/dict/benches_{2**log2_size}.parquet" + else: + path = f"fixtures/pyarrow3/v1/benches_{2**log2_size}.parquet" + with open(path, "rb") as f: + data = f.read() + data = io.BytesIO(data) + + def f(): + pyarrow.parquet.read_table(data, columns=[column]) + + seconds = timeit.Timer(f).timeit(number=512) / 512 + ns = seconds * 1000 * 1000 * 1000 + return ns + + +def _report(name: str, result: float): + path = f"benchmarks/runs/{name}/new" + os.makedirs(path, exist_ok=True) + with open(f"{path}/estimates.json", "w") as f: + json.dump({"mean": {"point_estimate": result}}, f) + + +def _bench(size, ty): + column, use_dict = { + "i64": ("int64", False), + "bool": ("bool", False), + "utf8": ("string", False), + "utf8 dict": ("string", True), + }[ty] + + result = _bench_single(size, column, use_dict) + print(result) + _report(f"read {ty} 2_{size}", result) + + +for size in range(10, 22, 2): + for ty in ["i64", "bool", "utf8", "utf8 dict"]: + print(size, ty) + _bench(size, ty) diff --git a/benchmarks/run.py b/benchmarks/run.py new file mode 100644 index 00000000000..a707f23f1bd --- /dev/null +++ b/benchmarks/run.py @@ -0,0 +1,20 @@ +import subprocess + + +# run pyarrow +subprocess.call(["python", "benchmarks/bench_read.py"]) + + +for ty in ["i64", "bool", "utf8", "utf8 dict"]: + args = [ + "cargo", + "bench", + "--features", + "io_parquet,io_parquet_compression", + "--bench", + "read_parquet", + "--", + f"{ty} 2", + ] + + subprocess.call(args) diff --git a/benchmarks/summarize.py b/benchmarks/summarize.py new file mode 100644 index 00000000000..a44c0ac182f --- /dev/null +++ b/benchmarks/summarize.py @@ -0,0 +1,51 @@ +import json +import os + + +def _read_reports(engine: str): + root = { + "arrow2": "target/criterion", + "pyarrow": "benchmarks/runs", + }[engine] + + result = [] + for item in os.listdir(root): + if item == "report": + continue + + with open(os.path.join(root, item, "new", "estimates.json")) as f: + data = json.load(f) + + ms = data["mean"]["point_estimate"] / 1000 + task = item.split()[0] + type = " ".join(item.split()[1:-1]) + size = int(item.split()[-1].split("_")[1]) + result.append( + { + "engine": engine, + "task": task, + "type": type, + "size": size, + "time": ms, + } + ) + return result + + +def _print_report(result): + for ty in ["i64", "bool", "utf8", "utf8 dict"]: + print(ty) + r = filter(lambda x: x["type"] == ty, result) + r = sorted(r, key=lambda x: x["size"]) + for row in r: + print(row["time"]) + + +def print_report(): + for engine in ["arrow2", "pyarrow"]: + print(engine) + result = _read_reports(engine) + _print_report(result) + + +print_report() diff --git a/parquet_integration/bench_read.py b/parquet_integration/bench_read.py deleted file mode 100644 index f1db81addee..00000000000 --- a/parquet_integration/bench_read.py +++ /dev/null @@ -1,26 +0,0 @@ -import timeit -import io - -import pyarrow.parquet - - -def bench(log2_size: int, datatype: str): - with open(f"fixtures/pyarrow3/v1/benches_{2**log2_size}.parquet", "rb") as f: - data = f.read() - data = io.BytesIO(data) - - def f(): - pyarrow.parquet.read_table(data, columns=[datatype]) - - seconds = timeit.Timer(f).timeit(number=512) / 512 - microseconds = seconds * 1000 * 1000 - print(f"read {datatype} 2^{log2_size} time: {microseconds:.2f} us") - -#for i in range(10, 22, 2): -# bench(i, "int64") - -for i in range(10, 22, 2): - bench(i, "string") - -for i in range(10, 22, 2): - bench(i, "bool") diff --git a/parquet_integration/write_parquet.py b/parquet_integration/write_parquet.py index 0d9e556216d..d97ff1edc9d 100644 --- a/parquet_integration/write_parquet.py +++ b/parquet_integration/write_parquet.py @@ -11,7 +11,9 @@ def case_basic_nullable(size=1): float64 = [0.0, 1.0, None, 3.0, None, 5.0, 6.0, 7.0, None, 9.0] string = ["Hello", None, "aa", "", None, "abc", None, None, "def", "aaa"] boolean = [True, None, False, False, None, True, None, None, True, True] - string_large = ["ABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDšŸ˜ƒšŸŒššŸ•³šŸ‘Š"] * 10 + string_large = [ + "ABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDšŸ˜ƒšŸŒššŸ•³šŸ‘Š" + ] * 10 decimal = [Decimal(e) if e is not None else None for e in int64] fields = [ @@ -23,9 +25,9 @@ def case_basic_nullable(size=1): pa.field("uint32", pa.uint32()), pa.field("string_large", pa.utf8()), # decimal testing - pa.field("decimal_9", pa.decimal128(9,0)), - pa.field("decimal_18", pa.decimal128(18,0)), - pa.field("decimal_26", pa.decimal128(26,0)), + pa.field("decimal_9", pa.decimal128(9, 0)), + pa.field("decimal_18", pa.decimal128(18, 0)), + pa.field("decimal_26", pa.decimal128(26, 0)), ] schema = pa.schema(fields) @@ -67,9 +69,9 @@ def case_basic_required(size=1): nullable=False, ), pa.field("uint32", pa.uint32(), nullable=False), - pa.field("decimal_9", pa.decimal128(9,0), nullable=False), - pa.field("decimal_18", pa.decimal128(18,0), nullable=False), - pa.field("decimal_26", pa.decimal128(26,0), nullable=False), + pa.field("decimal_9", pa.decimal128(9, 0), nullable=False), + pa.field("decimal_18", pa.decimal128(18, 0), nullable=False), + pa.field("decimal_26", pa.decimal128(26, 0), nullable=False), ] schema = pa.schema(fields) @@ -156,13 +158,36 @@ def case_nested(size): ) -def write_pyarrow(case, size=1, page_version=1, use_dictionary=False): +def write_pyarrow( + case, + size: int, + page_version: int, + use_dictionary: bool, + multiple_pages: bool, + compression: bool, +): data, schema, path = case(size) base_path = f"{PYARROW_PATH}/v{page_version}" if use_dictionary: base_path = f"{base_path}/dict" + if multiple_pages: + base_path = f"{base_path}/multi" + + if compression: + base_path = f"{base_path}/snappy" + + if compression: + compression = "snappy" + else: + compression = None + + if multiple_pages: + data_page_size = 2 ** 10 # i.e. a small number to ensure multiple pages + else: + data_page_size = 2 ** 40 # i.e. a large number to ensure a single page + t = pa.table(data, schema=schema) os.makedirs(base_path, exist_ok=True) pa.parquet.write_table( @@ -170,9 +195,9 @@ def write_pyarrow(case, size=1, page_version=1, use_dictionary=False): f"{base_path}/{path}", row_group_size=2 ** 40, use_dictionary=use_dictionary, - compression=None, + compression=compression, write_statistics=True, - data_page_size=2 ** 40, # i.e. a large number to ensure a single page + data_page_size=data_page_size, data_page_version=f"{page_version}.0", ) @@ -180,18 +205,26 @@ def write_pyarrow(case, size=1, page_version=1, use_dictionary=False): for case in [case_basic_nullable, case_basic_required, case_nested]: for version in [1, 2]: for use_dict in [True, False]: - write_pyarrow(case, 1, version, use_dict) + write_pyarrow(case, 1, version, use_dict, False, False) def case_benches(size): assert size % 8 == 0 - size //= 8 - data, schema, path = case_basic_nullable(1) + data, schema, _ = case_basic_nullable(1) for k in data: - data[k] = data[k][:8] * size + data[k] = data[k][:8] * (size // 8) return data, schema, f"benches_{size}.parquet" # for read benchmarks -for i in range(3 + 10, 3 + 22, 2): - write_pyarrow(case_benches, 2 ** i, 1) # V1 +for i in range(10, 22, 2): + # two pages (dict) + write_pyarrow(case_benches, 2 ** i, 1, True, False, False) + # single page + write_pyarrow(case_benches, 2 ** i, 1, False, False, False) + # multiple pages + write_pyarrow(case_benches, 2 ** i, 1, False, True, False) + # multiple compressed pages + write_pyarrow(case_benches, 2 ** i, 1, False, True, True) + # single compressed page + write_pyarrow(case_benches, 2 ** i, 1, False, False, True) From 60ce40c50f068ad4be5e3fa7d12093a7a7d7d169 Mon Sep 17 00:00:00 2001 From: Hagai Har-Gil Date: Sun, 17 Oct 2021 09:09:21 +0300 Subject: [PATCH 04/23] Added more IPC documentation (#534) --- src/io/ipc/mod.rs | 77 ++++++++++++++++++++++++++++++++++++++ src/io/ipc/read/mod.rs | 6 +++ src/io/ipc/read/stream.rs | 26 ++++++++++++- src/io/ipc/write/stream.rs | 6 +++ src/io/ipc/write/writer.rs | 1 + 5 files changed, 114 insertions(+), 2 deletions(-) diff --git a/src/io/ipc/mod.rs b/src/io/ipc/mod.rs index f01ae010340..1e6e4d603fc 100644 --- a/src/io/ipc/mod.rs +++ b/src/io/ipc/mod.rs @@ -1,4 +1,81 @@ //! APIs to read from and write to Arrow's IPC format. +//! +//! Inter-process communication is a method through which different processes +//! share and pass data between them. Its use-cases include parallel +//! processing of chunks of data across different CPU cores, transferring +//! data between different Apache Arrow implementations in other languages and +//! more. Under the hood Apache Arrow uses [FlatBuffers](https://google.github.io/flatbuffers/) +//! as its binary protocol, so every Arrow-centered streaming or serialiation +//! problem that could be solved using FlatBuffers could probably be solved +//! using the more integrated approach that is exposed in this module. +//! +//! [Arrow's IPC protocol](https://arrow.apache.org/docs/format/Columnar.html#serialization-and-interprocess-communication-ipc) +//! allows only [`RecordBatch`](crate::record_batch::RecordBatch)es or +//! [`DictionaryBatch`](gen::Message::DictionaryBatch) to be passed +//! around due to its reliance on a pre-defined data scheme. This limitation +//! provides a large performance gain because serialized data will always have a +//! known structutre, i.e. the same fields and datatypes, with the only variance +//! being the number of rows and the actual data inside the Batch. This dramatically +//! increases the deserialization rate, as the bytes in the file or stream are already +//! structured "correctly". +//! +//! Reading and writing IPC messages is done using one of two variants - either +//! [`FileReader`](read::FileReader) <-> [`FileWriter`](struct@write::FileWriter) or +//! [`StreamReader`](read::StreamReader) <-> [`StreamWriter`](struct@write::StreamWriter). +//! These two variants wrap a type `T` that implements [`Read`](std::io::Read), and in +//! the case of the `File` variant it also implements [`Seek`](std::io::Seek). In +//! practice it means that `File`s can be arbitrarily accessed while `Stream`s are only +//! read in certain order - the one they were written in (first in, first out). +//! +//! # Examples +//! Read and write to a file: +//! ``` +//! use arrow2::io::ipc::{{read::{FileReader, read_file_metadata}}, {write::FileWriter}}; +//! # use std::fs::File; +//! # use std::sync::Arc; +//! # use arrow2::datatypes::{Field, Schema, DataType}; +//! # use arrow2::array::Int32Array; +//! # use arrow2::record_batch::RecordBatch; +//! # use arrow2::error::ArrowError; +//! // Setup the writer +//! let path = "example.arrow".to_string(); +//! let mut file = File::create(&path)?; +//! let x_coord = Field::new("x", DataType::Int32, false); +//! let y_coord = Field::new("y", DataType::Int32, false); +//! let schema = Schema::new(vec![x_coord, y_coord]); +//! let mut writer = FileWriter::try_new(file, &schema)?; +//! +//! // Setup the data +//! let x_data = Int32Array::from_slice([-1i32, 1]); +//! let y_data = Int32Array::from_slice([1i32, -1]); +//! let batch = RecordBatch::try_new( +//! Arc::new(schema), +//! vec![Arc::new(x_data), Arc::new(y_data)] +//! )?; +//! +//! // Write the messages and finalize the stream +//! for _ in 0..5 { +//! writer.write(&batch); +//! } +//! writer.finish(); +//! +//! // Fetch some of the data and get the reader back +//! let mut reader = File::open(&path)?; +//! let metadata = read_file_metadata(&mut reader)?; +//! let mut filereader = FileReader::new(reader, metadata, None); +//! let row1 = filereader.next().unwrap(); // [[-1, 1], [1, -1]] +//! let row2 = filereader.next().unwrap(); // [[-1, 1], [1, -1]] +//! let mut reader = filereader.into_inner(); +//! // Do more stuff with the reader, like seeking ahead. +//! # Ok::<(), ArrowError>(()) +//! ``` +//! +//! For further information and examples please consult the +//! [user guide](https://jorgecarleitao.github.io/arrow2/io/index.html). +//! For even more examples check the `examples` folder in the main repository +//! ([1](https://github.com/jorgecarleitao/arrow2/blob/main/examples/ipc_file_read.rs), +//! [2](https://github.com/jorgecarleitao/arrow2/blob/main/examples/ipc_file_write.rs), +//! [3](https://github.com/jorgecarleitao/arrow2/tree/main/examples/ipc_pyarrow)). #![allow(missing_debug_implementations)] #![allow(non_camel_case_types)] diff --git a/src/io/ipc/read/mod.rs b/src/io/ipc/read/mod.rs index aec51e1a8fd..adc2b41e477 100644 --- a/src/io/ipc/read/mod.rs +++ b/src/io/ipc/read/mod.rs @@ -1,4 +1,10 @@ //! APIs to read Arrow's IPC format. +//! +//! The two important structs here are the [`FileReader`](reader::FileReader), +//! which provides arbitrary access to any of its messages, and the +//! [`StreamReader`](stream::StreamReader), which only supports reading +//! data in the order it was written in. + mod array; mod common; mod deserialize; diff --git a/src/io/ipc/read/stream.rs b/src/io/ipc/read/stream.rs index ed7f5955dfe..e93d79d313a 100644 --- a/src/io/ipc/read/stream.rs +++ b/src/io/ipc/read/stream.rs @@ -76,12 +76,28 @@ pub fn read_stream_metadata(reader: &mut R) -> Result { }) } +/// Encodes the stream's status after each read. +/// +/// A stream is an iterator, and an iterator returns `Option`. The `Item` +/// type in the [`StreamReader`] case is `StreamState`, which means that an Arrow +/// stream may yield one of three values: (1) `None`, which signals that the stream +/// is done; (2) `Some(StreamState::Some(RecordBatch))`, which signals that there was +/// data waiting in the stream and we read it; and finally (3) +/// `Some(StreamState::Waiting)`, which means that the stream is still "live", it +/// just doesn't hold any data right now. pub enum StreamState { + /// A live stream without data Waiting, + /// Next item in the stream Some(RecordBatch), } impl StreamState { + /// Return the data inside this wrapper. + /// + /// # Panics + /// + /// If the `StreamState` was `Waiting`. pub fn unwrap(self) -> RecordBatch { if let StreamState::Some(batch) = self { batch @@ -91,7 +107,8 @@ impl StreamState { } } -/// Reads the next item +/// Reads the next item, yielding `None` if the stream is done, +/// and a [`StreamState`] otherwise. pub fn read_next( reader: &mut R, metadata: &StreamMetadata, @@ -191,7 +208,12 @@ pub fn read_next( } } -/// Arrow Stream reader +/// Arrow Stream reader. +/// +/// An [`Iterator`] over an Arrow stream that yields a result of [`StreamState`]s. +/// This is the recommended way to read an arrow stream (by iterating over its data). +/// +/// For a more thorough walkthrough consult [this example](https://github.com/jorgecarleitao/arrow2/tree/main/examples/ipc_pyarrow). pub struct StreamReader { reader: R, metadata: StreamMetadata, diff --git a/src/io/ipc/write/stream.rs b/src/io/ipc/write/stream.rs index 5a52341b092..7c86d7e3979 100644 --- a/src/io/ipc/write/stream.rs +++ b/src/io/ipc/write/stream.rs @@ -32,6 +32,12 @@ use crate::datatypes::*; use crate::error::{ArrowError, Result}; use crate::record_batch::RecordBatch; +/// Arrow stream writer +/// +/// The data written by this writer must be read in order. To signal that no more +/// data is arriving through the stream call [`self.finish()`](StreamWriter::finish); +/// +/// For a usage walkthrough consult [this example](https://github.com/jorgecarleitao/arrow2/tree/main/examples/ipc_pyarrow). pub struct StreamWriter { /// The object to write to writer: BufWriter, diff --git a/src/io/ipc/write/writer.rs b/src/io/ipc/write/writer.rs index 76811dc12a1..81a62491f2d 100644 --- a/src/io/ipc/write/writer.rs +++ b/src/io/ipc/write/writer.rs @@ -37,6 +37,7 @@ use crate::datatypes::*; use crate::error::{ArrowError, Result}; use crate::record_batch::RecordBatch; +/// Arrow file writer pub struct FileWriter { /// The object to write to writer: W, From a256efd58d27070fb9122ad765b07edea35b4aec Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Sun, 17 Oct 2021 22:49:56 +0200 Subject: [PATCH 05/23] Removed benches for pyarrow. (#535) --- benchmarks/.gitignore | 1 - benchmarks/bench_read.py | 49 -------------------- benchmarks/run.py | 20 -------- benchmarks/summarize.py | 51 -------------------- parquet_integration/bench_write.py | 74 ------------------------------ 5 files changed, 195 deletions(-) delete mode 100644 benchmarks/.gitignore delete mode 100644 benchmarks/bench_read.py delete mode 100644 benchmarks/run.py delete mode 100644 benchmarks/summarize.py delete mode 100644 parquet_integration/bench_write.py diff --git a/benchmarks/.gitignore b/benchmarks/.gitignore deleted file mode 100644 index 344f079e498..00000000000 --- a/benchmarks/.gitignore +++ /dev/null @@ -1 +0,0 @@ -runs diff --git a/benchmarks/bench_read.py b/benchmarks/bench_read.py deleted file mode 100644 index cab190fe2cd..00000000000 --- a/benchmarks/bench_read.py +++ /dev/null @@ -1,49 +0,0 @@ -import timeit -import io -import os -import json - -import pyarrow.parquet - - -def _bench_single(log2_size: int, column: str, use_dict: bool) -> float: - if use_dict: - path = f"fixtures/pyarrow3/v1/dict/benches_{2**log2_size}.parquet" - else: - path = f"fixtures/pyarrow3/v1/benches_{2**log2_size}.parquet" - with open(path, "rb") as f: - data = f.read() - data = io.BytesIO(data) - - def f(): - pyarrow.parquet.read_table(data, columns=[column]) - - seconds = timeit.Timer(f).timeit(number=512) / 512 - ns = seconds * 1000 * 1000 * 1000 - return ns - - -def _report(name: str, result: float): - path = f"benchmarks/runs/{name}/new" - os.makedirs(path, exist_ok=True) - with open(f"{path}/estimates.json", "w") as f: - json.dump({"mean": {"point_estimate": result}}, f) - - -def _bench(size, ty): - column, use_dict = { - "i64": ("int64", False), - "bool": ("bool", False), - "utf8": ("string", False), - "utf8 dict": ("string", True), - }[ty] - - result = _bench_single(size, column, use_dict) - print(result) - _report(f"read {ty} 2_{size}", result) - - -for size in range(10, 22, 2): - for ty in ["i64", "bool", "utf8", "utf8 dict"]: - print(size, ty) - _bench(size, ty) diff --git a/benchmarks/run.py b/benchmarks/run.py deleted file mode 100644 index a707f23f1bd..00000000000 --- a/benchmarks/run.py +++ /dev/null @@ -1,20 +0,0 @@ -import subprocess - - -# run pyarrow -subprocess.call(["python", "benchmarks/bench_read.py"]) - - -for ty in ["i64", "bool", "utf8", "utf8 dict"]: - args = [ - "cargo", - "bench", - "--features", - "io_parquet,io_parquet_compression", - "--bench", - "read_parquet", - "--", - f"{ty} 2", - ] - - subprocess.call(args) diff --git a/benchmarks/summarize.py b/benchmarks/summarize.py deleted file mode 100644 index a44c0ac182f..00000000000 --- a/benchmarks/summarize.py +++ /dev/null @@ -1,51 +0,0 @@ -import json -import os - - -def _read_reports(engine: str): - root = { - "arrow2": "target/criterion", - "pyarrow": "benchmarks/runs", - }[engine] - - result = [] - for item in os.listdir(root): - if item == "report": - continue - - with open(os.path.join(root, item, "new", "estimates.json")) as f: - data = json.load(f) - - ms = data["mean"]["point_estimate"] / 1000 - task = item.split()[0] - type = " ".join(item.split()[1:-1]) - size = int(item.split()[-1].split("_")[1]) - result.append( - { - "engine": engine, - "task": task, - "type": type, - "size": size, - "time": ms, - } - ) - return result - - -def _print_report(result): - for ty in ["i64", "bool", "utf8", "utf8 dict"]: - print(ty) - r = filter(lambda x: x["type"] == ty, result) - r = sorted(r, key=lambda x: x["size"]) - for row in r: - print(row["time"]) - - -def print_report(): - for engine in ["arrow2", "pyarrow"]: - print(engine) - result = _read_reports(engine) - _print_report(result) - - -print_report() diff --git a/parquet_integration/bench_write.py b/parquet_integration/bench_write.py deleted file mode 100644 index 2c47912205c..00000000000 --- a/parquet_integration/bench_write.py +++ /dev/null @@ -1,74 +0,0 @@ -""" -Benchmark of writing a pyarrow table of size N to parquet. -""" -import io -import os -import timeit - -import numpy -import pyarrow.parquet - - -def case_basic_nullable(size = 1): - int64 = [0, 1, None, 3, None, 5, 6, 7, None, 9] - float64 = [0.0, 1.0, None, 3.0, None, 5.0, 6.0, 7.0, None, 9.0] - string = ["Hello", None, "aa", "", None, "abc", None, None, "def", "aaa"] - boolean = [True, None, False, False, None, True, None, None, True, True] - - fields = [ - pa.field('int64', pa.int64()), - pa.field('float64', pa.float64()), - pa.field('string', pa.utf8()), - pa.field('bool', pa.bool_()), - pa.field('date', pa.timestamp('ms')), - pa.field('uint32', pa.uint32()), - ] - schema = pa.schema(fields) - - return { - "int64": int64 * size, - "float64": float64 * size, - "string": string * size, - "bool": boolean * size, - "date": int64 * size, - "uint32": int64 * size, - }, schema, f"basic_nullable_{size*10}.parquet" - -def bench(log2_size: int, datatype: str): - - if datatype == 'int64': - data = [0, 1, None, 3, 4, 5, 6, 7] * 128 # 1024 entries - field = pyarrow.field('int64', pyarrow.int64()) - elif datatype == 'utf8': - # 4 each because our own benches also use 4 - data = ["aaaa", "aaab", None, "aaac", "aaad", "aaae", "aaaf", "aaag"] * 128 # 1024 entries - field = pyarrow.field('utf8', pyarrow.utf8()) - elif datatype == 'bool': - data = [True, False, None, True, False, True, True, True] * 128 # 1024 entries - field = pyarrow.field('bool', pyarrow.bool_()) - - data = data * 2**log2_size - - t = pyarrow.table([data], schema=pyarrow.schema([field])) - - def f(): - pyarrow.parquet.write_table(t, - io.BytesIO(), - use_dictionary=False, - compression=None, - write_statistics=False, - data_page_size=2**40, # i.e. a large number to ensure a single page - data_page_version="1.0") - - seconds = timeit.Timer(f).timeit(number=512) / 512 - microseconds = seconds * 1000 * 1000 - print(f"write {datatype} 2^{10 + log2_size} time: {microseconds:.2f} us") - -for i in range(0, 12, 2): - bench(i, "int64") - -for i in range(0, 12, 2): - bench(i, "utf8") - -for i in range(0, 12, 2): - bench(i, "bool") From 13f8d094db0991af53d0f8eb74ff5b7ce3294aff Mon Sep 17 00:00:00 2001 From: Jorge Leitao Date: Mon, 18 Oct 2021 07:25:23 +0200 Subject: [PATCH 06/23] Migrate to `arrow_format` crate (#517) --- Cargo.toml | 9 +- arrow-flight/Cargo.toml | 39 - arrow-flight/src/arrow.flight.protocol.rs | 1071 ---- arrow-flight/src/lib.rs | 20 - integration-testing/Cargo.toml | 10 +- .../auth_basic_proto.rs | 7 +- .../integration_test.rs | 32 +- .../src/flight_client_scenarios/middleware.rs | 6 +- .../src/flight_server_scenarios.rs | 2 +- .../auth_basic_proto.rs | 12 +- .../integration_test.rs | 30 +- .../src/flight_server_scenarios/middleware.rs | 11 +- src/doc/lib.md | 1 + .../src/utils.rs => src/io/flight/mod.rs | 89 +- src/io/ipc/convert.rs | 33 +- src/io/ipc/gen/File.rs | 471 -- src/io/ipc/gen/Message.rs | 1344 ----- src/io/ipc/gen/Schema.rs | 4663 ----------------- src/io/ipc/gen/SparseTensor.rs | 1961 ------- src/io/ipc/gen/Tensor.rs | 977 ---- src/io/ipc/gen/mod.rs | 31 - src/io/ipc/mod.rs | 16 +- src/io/ipc/read/array/binary.rs | 10 +- src/io/ipc/read/array/boolean.rs | 7 +- src/io/ipc/read/array/dictionary.rs | 7 +- src/io/ipc/read/array/fixed_size_binary.rs | 10 +- src/io/ipc/read/array/fixed_size_list.rs | 12 +- src/io/ipc/read/array/list.rs | 12 +- src/io/ipc/read/array/map.rs | 12 +- src/io/ipc/read/array/primitive.rs | 10 +- src/io/ipc/read/array/struct_.rs | 12 +- src/io/ipc/read/array/union.rs | 14 +- src/io/ipc/read/array/utf8.rs | 10 +- src/io/ipc/read/common.rs | 10 +- src/io/ipc/read/deserialize.rs | 11 +- src/io/ipc/read/read_basic.rs | 15 +- src/io/ipc/read/reader.rs | 26 +- src/io/ipc/read/stream.rs | 19 +- src/io/ipc/write/common.rs | 73 +- src/io/ipc/write/mod.rs | 2 +- src/io/ipc/write/schema.rs | 9 +- src/io/ipc/write/serialize.rs | 6 +- src/io/ipc/write/writer.rs | 16 +- src/io/mod.rs | 4 + src/io/parquet/read/schema/metadata.rs | 8 +- 45 files changed, 270 insertions(+), 10880 deletions(-) delete mode 100644 arrow-flight/Cargo.toml delete mode 100644 arrow-flight/src/arrow.flight.protocol.rs delete mode 100644 arrow-flight/src/lib.rs rename arrow-flight/src/utils.rs => src/io/flight/mod.rs (51%) delete mode 100644 src/io/ipc/gen/File.rs delete mode 100644 src/io/ipc/gen/Message.rs delete mode 100644 src/io/ipc/gen/Schema.rs delete mode 100644 src/io/ipc/gen/SparseTensor.rs delete mode 100644 src/io/ipc/gen/Tensor.rs delete mode 100644 src/io/ipc/gen/mod.rs diff --git a/Cargo.toml b/Cargo.toml index d5982d72553..20a11dc9061 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,7 +39,8 @@ indexmap = { version = "^1.6", optional = true } # used to print columns in a nice columnar format comfy-table = { version = "4.0", optional = true, default-features = false } -flatbuffers = { version = "=2.0.0", optional = true } +arrow-format = { version = "*", optional = true, features = ["ipc"] } + hex = { version = "^0.4", optional = true } # for IPC compression @@ -90,6 +91,7 @@ full = [ "io_csv", "io_json", "io_ipc", + "io_flight", "io_ipc_compression", "io_json_integration", "io_print", @@ -107,8 +109,9 @@ io_csv = ["io_csv_read", "io_csv_write"] io_csv_read = ["csv", "lexical-core"] io_csv_write = ["csv", "streaming-iterator", "lexical-core"] io_json = ["serde", "serde_json", "indexmap"] -io_ipc = ["flatbuffers"] +io_ipc = ["arrow-format"] io_ipc_compression = ["lz4", "zstd"] +io_flight = ["io_ipc", "arrow-format/flight-data"] io_parquet_compression = [ "parquet2/zstd", "parquet2/snappy", @@ -145,6 +148,8 @@ skip_feature_sets = [ ["io_csv_write"], ["io_avro"], ["io_json"], + ["io_flight"], + ["io_ipc"], ["io_parquet"], ["io_json_integration"], # this does not change the public API diff --git a/arrow-flight/Cargo.toml b/arrow-flight/Cargo.toml deleted file mode 100644 index ad423b6d0d4..00000000000 --- a/arrow-flight/Cargo.toml +++ /dev/null @@ -1,39 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -[package] -name = "arrow-flight" -description = "Apache Arrow Flight" -version = "0.1.0" -edition = "2018" -authors = ["Apache Arrow "] -homepage = "https://github.com/apache/arrow" -repository = "https://github.com/apache/arrow" -license = "Apache-2.0" - -[dependencies] -arrow2 = { path = "../", features = ["io_ipc"], default-features = false } -tonic = "0.5.2" -bytes = "1" -prost = "0.8.0" -prost-derive = "0.8.0" -tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] } -futures = { version = "0.3", default-features = false, features = ["alloc"]} - -#[lib] -#name = "flight" -#path = "src/lib.rs" diff --git a/arrow-flight/src/arrow.flight.protocol.rs b/arrow-flight/src/arrow.flight.protocol.rs deleted file mode 100644 index 5db746f6fda..00000000000 --- a/arrow-flight/src/arrow.flight.protocol.rs +++ /dev/null @@ -1,1071 +0,0 @@ -// This file was automatically generated through the build.rs script, and should not be edited. - -/// -/// The request that a client provides to a server on handshake. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct HandshakeRequest { - /// - /// A defined protocol version - #[prost(uint64, tag = "1")] - pub protocol_version: u64, - /// - /// Arbitrary auth/handshake info. - #[prost(bytes = "vec", tag = "2")] - pub payload: ::prost::alloc::vec::Vec, -} -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct HandshakeResponse { - /// - /// A defined protocol version - #[prost(uint64, tag = "1")] - pub protocol_version: u64, - /// - /// Arbitrary auth/handshake info. - #[prost(bytes = "vec", tag = "2")] - pub payload: ::prost::alloc::vec::Vec, -} -/// -/// A message for doing simple auth. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct BasicAuth { - #[prost(string, tag = "2")] - pub username: ::prost::alloc::string::String, - #[prost(string, tag = "3")] - pub password: ::prost::alloc::string::String, -} -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Empty {} -/// -/// Describes an available action, including both the name used for execution -/// along with a short description of the purpose of the action. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct ActionType { - #[prost(string, tag = "1")] - pub r#type: ::prost::alloc::string::String, - #[prost(string, tag = "2")] - pub description: ::prost::alloc::string::String, -} -/// -/// A service specific expression that can be used to return a limited set -/// of available Arrow Flight streams. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Criteria { - #[prost(bytes = "vec", tag = "1")] - pub expression: ::prost::alloc::vec::Vec, -} -/// -/// An opaque action specific for the service. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Action { - #[prost(string, tag = "1")] - pub r#type: ::prost::alloc::string::String, - #[prost(bytes = "vec", tag = "2")] - pub body: ::prost::alloc::vec::Vec, -} -/// -/// An opaque result returned after executing an action. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Result { - #[prost(bytes = "vec", tag = "1")] - pub body: ::prost::alloc::vec::Vec, -} -/// -/// Wrap the result of a getSchema call -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct SchemaResult { - /// schema of the dataset as described in Schema.fbs::Schema. - #[prost(bytes = "vec", tag = "1")] - pub schema: ::prost::alloc::vec::Vec, -} -/// -/// The name or tag for a Flight. May be used as a way to retrieve or generate -/// a flight or be used to expose a set of previously defined flights. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct FlightDescriptor { - #[prost(enumeration = "flight_descriptor::DescriptorType", tag = "1")] - pub r#type: i32, - /// - /// Opaque value used to express a command. Should only be defined when - /// type = CMD. - #[prost(bytes = "vec", tag = "2")] - pub cmd: ::prost::alloc::vec::Vec, - /// - /// List of strings identifying a particular dataset. Should only be defined - /// when type = PATH. - #[prost(string, repeated, tag = "3")] - pub path: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, -} -/// Nested message and enum types in `FlightDescriptor`. -pub mod flight_descriptor { - /// - /// Describes what type of descriptor is defined. - #[derive( - Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, - )] - #[repr(i32)] - pub enum DescriptorType { - /// Protobuf pattern, not used. - Unknown = 0, - /// - /// A named path that identifies a dataset. A path is composed of a string - /// or list of strings describing a particular dataset. This is conceptually - /// similar to a path inside a filesystem. - Path = 1, - /// - /// An opaque command to generate a dataset. - Cmd = 2, - } -} -/// -/// The access coordinates for retrieval of a dataset. With a FlightInfo, a -/// consumer is able to determine how to retrieve a dataset. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct FlightInfo { - /// schema of the dataset as described in Schema.fbs::Schema. - #[prost(bytes = "vec", tag = "1")] - pub schema: ::prost::alloc::vec::Vec, - /// - /// The descriptor associated with this info. - #[prost(message, optional, tag = "2")] - pub flight_descriptor: ::core::option::Option, - /// - /// A list of endpoints associated with the flight. To consume the whole - /// flight, all endpoints must be consumed. - #[prost(message, repeated, tag = "3")] - pub endpoint: ::prost::alloc::vec::Vec, - /// Set these to -1 if unknown. - #[prost(int64, tag = "4")] - pub total_records: i64, - #[prost(int64, tag = "5")] - pub total_bytes: i64, -} -/// -/// A particular stream or split associated with a flight. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct FlightEndpoint { - /// - /// Token used to retrieve this stream. - #[prost(message, optional, tag = "1")] - pub ticket: ::core::option::Option, - /// - /// A list of URIs where this ticket can be redeemed. If the list is - /// empty, the expectation is that the ticket can only be redeemed on the - /// current service where the ticket was generated. - #[prost(message, repeated, tag = "2")] - pub location: ::prost::alloc::vec::Vec, -} -/// -/// A location where a Flight service will accept retrieval of a particular -/// stream given a ticket. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Location { - #[prost(string, tag = "1")] - pub uri: ::prost::alloc::string::String, -} -/// -/// An opaque identifier that the service can use to retrieve a particular -/// portion of a stream. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Ticket { - #[prost(bytes = "vec", tag = "1")] - pub ticket: ::prost::alloc::vec::Vec, -} -/// -/// A batch of Arrow data as part of a stream of batches. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct FlightData { - /// - /// The descriptor of the data. This is only relevant when a client is - /// starting a new DoPut stream. - #[prost(message, optional, tag = "1")] - pub flight_descriptor: ::core::option::Option, - /// - /// Header for message data as described in Message.fbs::Message. - #[prost(bytes = "vec", tag = "2")] - pub data_header: ::prost::alloc::vec::Vec, - /// - /// Application-defined metadata. - #[prost(bytes = "vec", tag = "3")] - pub app_metadata: ::prost::alloc::vec::Vec, - /// - /// The actual batch of Arrow data. Preferably handled with minimal-copies - /// coming last in the definition to help with sidecar patterns (it is - /// expected that some implementations will fetch this field off the wire - /// with specialized code to avoid extra memory copies). - #[prost(bytes = "vec", tag = "1000")] - pub data_body: ::prost::alloc::vec::Vec, -} -///* -/// The response message associated with the submission of a DoPut. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct PutResult { - #[prost(bytes = "vec", tag = "1")] - pub app_metadata: ::prost::alloc::vec::Vec, -} -#[doc = r" Generated client implementations."] -pub mod flight_service_client { - #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::*; - #[doc = ""] - #[doc = " A flight service is an endpoint for retrieving or storing Arrow data. A"] - #[doc = " flight service can expose one or more predefined endpoints that can be"] - #[doc = " accessed using the Arrow Flight Protocol. Additionally, a flight service"] - #[doc = " can expose a set of actions that are available."] - #[derive(Debug, Clone)] - pub struct FlightServiceClient { - inner: tonic::client::Grpc, - } - impl FlightServiceClient { - #[doc = r" Attempt to create a new client by connecting to a given endpoint."] - pub async fn connect(dst: D) -> Result - where - D: std::convert::TryInto, - D::Error: Into, - { - let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; - Ok(Self::new(conn)) - } - } - impl FlightServiceClient - where - T: tonic::client::GrpcService, - T::ResponseBody: Body + Send + Sync + 'static, - T::Error: Into, - ::Error: Into + Send, - { - pub fn new(inner: T) -> Self { - let inner = tonic::client::Grpc::new(inner); - Self { inner } - } - pub fn with_interceptor( - inner: T, - interceptor: F, - ) -> FlightServiceClient> - where - F: tonic::service::Interceptor, - T: tonic::codegen::Service< - http::Request, - Response = http::Response< - >::ResponseBody, - >, - >, - >>::Error: - Into + Send + Sync, - { - FlightServiceClient::new(InterceptedService::new(inner, interceptor)) - } - #[doc = r" Compress requests with `gzip`."] - #[doc = r""] - #[doc = r" This requires the server to support it otherwise it might respond with an"] - #[doc = r" error."] - pub fn send_gzip(mut self) -> Self { - self.inner = self.inner.send_gzip(); - self - } - #[doc = r" Enable decompressing responses with `gzip`."] - pub fn accept_gzip(mut self) -> Self { - self.inner = self.inner.accept_gzip(); - self - } - #[doc = ""] - #[doc = " Handshake between client and server. Depending on the server, the"] - #[doc = " handshake may be required to determine the token that should be used for"] - #[doc = " future operations. Both request and response are streams to allow multiple"] - #[doc = " round-trips depending on auth mechanism."] - pub async fn handshake( - &mut self, - request: impl tonic::IntoStreamingRequest, - ) -> Result< - tonic::Response>, - tonic::Status, - > { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/arrow.flight.protocol.FlightService/Handshake", - ); - self.inner - .streaming(request.into_streaming_request(), path, codec) - .await - } - #[doc = ""] - #[doc = " Get a list of available streams given a particular criteria. Most flight"] - #[doc = " services will expose one or more streams that are readily available for"] - #[doc = " retrieval. This api allows listing the streams available for"] - #[doc = " consumption. A user can also provide a criteria. The criteria can limit"] - #[doc = " the subset of streams that can be listed via this interface. Each flight"] - #[doc = " service allows its own definition of how to consume criteria."] - pub async fn list_flights( - &mut self, - request: impl tonic::IntoRequest, - ) -> Result< - tonic::Response>, - tonic::Status, - > { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/arrow.flight.protocol.FlightService/ListFlights", - ); - self.inner - .server_streaming(request.into_request(), path, codec) - .await - } - #[doc = ""] - #[doc = " For a given FlightDescriptor, get information about how the flight can be"] - #[doc = " consumed. This is a useful interface if the consumer of the interface"] - #[doc = " already can identify the specific flight to consume. This interface can"] - #[doc = " also allow a consumer to generate a flight stream through a specified"] - #[doc = " descriptor. For example, a flight descriptor might be something that"] - #[doc = " includes a SQL statement or a Pickled Python operation that will be"] - #[doc = " executed. In those cases, the descriptor will not be previously available"] - #[doc = " within the list of available streams provided by ListFlights but will be"] - #[doc = " available for consumption for the duration defined by the specific flight"] - #[doc = " service."] - pub async fn get_flight_info( - &mut self, - request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/arrow.flight.protocol.FlightService/GetFlightInfo", - ); - self.inner.unary(request.into_request(), path, codec).await - } - #[doc = ""] - #[doc = " For a given FlightDescriptor, get the Schema as described in Schema.fbs::Schema"] - #[doc = " This is used when a consumer needs the Schema of flight stream. Similar to"] - #[doc = " GetFlightInfo this interface may generate a new flight that was not previously"] - #[doc = " available in ListFlights."] - pub async fn get_schema( - &mut self, - request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/arrow.flight.protocol.FlightService/GetSchema", - ); - self.inner.unary(request.into_request(), path, codec).await - } - #[doc = ""] - #[doc = " Retrieve a single stream associated with a particular descriptor"] - #[doc = " associated with the referenced ticket. A Flight can be composed of one or"] - #[doc = " more streams where each stream can be retrieved using a separate opaque"] - #[doc = " ticket that the flight service uses for managing a collection of streams."] - pub async fn do_get( - &mut self, - request: impl tonic::IntoRequest, - ) -> Result< - tonic::Response>, - tonic::Status, - > { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/arrow.flight.protocol.FlightService/DoGet", - ); - self.inner - .server_streaming(request.into_request(), path, codec) - .await - } - #[doc = ""] - #[doc = " Push a stream to the flight service associated with a particular"] - #[doc = " flight stream. This allows a client of a flight service to upload a stream"] - #[doc = " of data. Depending on the particular flight service, a client consumer"] - #[doc = " could be allowed to upload a single stream per descriptor or an unlimited"] - #[doc = " number. In the latter, the service might implement a 'seal' action that"] - #[doc = " can be applied to a descriptor once all streams are uploaded."] - pub async fn do_put( - &mut self, - request: impl tonic::IntoStreamingRequest, - ) -> Result< - tonic::Response>, - tonic::Status, - > { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/arrow.flight.protocol.FlightService/DoPut", - ); - self.inner - .streaming(request.into_streaming_request(), path, codec) - .await - } - #[doc = ""] - #[doc = " Open a bidirectional data channel for a given descriptor. This"] - #[doc = " allows clients to send and receive arbitrary Arrow data and"] - #[doc = " application-specific metadata in a single logical stream. In"] - #[doc = " contrast to DoGet/DoPut, this is more suited for clients"] - #[doc = " offloading computation (rather than storage) to a Flight service."] - pub async fn do_exchange( - &mut self, - request: impl tonic::IntoStreamingRequest, - ) -> Result< - tonic::Response>, - tonic::Status, - > { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/arrow.flight.protocol.FlightService/DoExchange", - ); - self.inner - .streaming(request.into_streaming_request(), path, codec) - .await - } - #[doc = ""] - #[doc = " Flight services can support an arbitrary number of simple actions in"] - #[doc = " addition to the possible ListFlights, GetFlightInfo, DoGet, DoPut"] - #[doc = " operations that are potentially available. DoAction allows a flight client"] - #[doc = " to do a specific action against a flight service. An action includes"] - #[doc = " opaque request and response objects that are specific to the type action"] - #[doc = " being undertaken."] - pub async fn do_action( - &mut self, - request: impl tonic::IntoRequest, - ) -> Result>, tonic::Status> - { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/arrow.flight.protocol.FlightService/DoAction", - ); - self.inner - .server_streaming(request.into_request(), path, codec) - .await - } - #[doc = ""] - #[doc = " A flight service exposes all of the available action types that it has"] - #[doc = " along with descriptions. This allows different flight consumers to"] - #[doc = " understand the capabilities of the flight service."] - pub async fn list_actions( - &mut self, - request: impl tonic::IntoRequest, - ) -> Result< - tonic::Response>, - tonic::Status, - > { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/arrow.flight.protocol.FlightService/ListActions", - ); - self.inner - .server_streaming(request.into_request(), path, codec) - .await - } - } -} -#[doc = r" Generated server implementations."] -pub mod flight_service_server { - #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::*; - #[doc = "Generated trait containing gRPC methods that should be implemented for use with FlightServiceServer."] - #[async_trait] - pub trait FlightService: Send + Sync + 'static { - #[doc = "Server streaming response type for the Handshake method."] - type HandshakeStream: futures_core::Stream> - + Send - + Sync - + 'static; - #[doc = ""] - #[doc = " Handshake between client and server. Depending on the server, the"] - #[doc = " handshake may be required to determine the token that should be used for"] - #[doc = " future operations. Both request and response are streams to allow multiple"] - #[doc = " round-trips depending on auth mechanism."] - async fn handshake( - &self, - request: tonic::Request>, - ) -> Result, tonic::Status>; - #[doc = "Server streaming response type for the ListFlights method."] - type ListFlightsStream: futures_core::Stream> - + Send - + Sync - + 'static; - #[doc = ""] - #[doc = " Get a list of available streams given a particular criteria. Most flight"] - #[doc = " services will expose one or more streams that are readily available for"] - #[doc = " retrieval. This api allows listing the streams available for"] - #[doc = " consumption. A user can also provide a criteria. The criteria can limit"] - #[doc = " the subset of streams that can be listed via this interface. Each flight"] - #[doc = " service allows its own definition of how to consume criteria."] - async fn list_flights( - &self, - request: tonic::Request, - ) -> Result, tonic::Status>; - #[doc = ""] - #[doc = " For a given FlightDescriptor, get information about how the flight can be"] - #[doc = " consumed. This is a useful interface if the consumer of the interface"] - #[doc = " already can identify the specific flight to consume. This interface can"] - #[doc = " also allow a consumer to generate a flight stream through a specified"] - #[doc = " descriptor. For example, a flight descriptor might be something that"] - #[doc = " includes a SQL statement or a Pickled Python operation that will be"] - #[doc = " executed. In those cases, the descriptor will not be previously available"] - #[doc = " within the list of available streams provided by ListFlights but will be"] - #[doc = " available for consumption for the duration defined by the specific flight"] - #[doc = " service."] - async fn get_flight_info( - &self, - request: tonic::Request, - ) -> Result, tonic::Status>; - #[doc = ""] - #[doc = " For a given FlightDescriptor, get the Schema as described in Schema.fbs::Schema"] - #[doc = " This is used when a consumer needs the Schema of flight stream. Similar to"] - #[doc = " GetFlightInfo this interface may generate a new flight that was not previously"] - #[doc = " available in ListFlights."] - async fn get_schema( - &self, - request: tonic::Request, - ) -> Result, tonic::Status>; - #[doc = "Server streaming response type for the DoGet method."] - type DoGetStream: futures_core::Stream> - + Send - + Sync - + 'static; - #[doc = ""] - #[doc = " Retrieve a single stream associated with a particular descriptor"] - #[doc = " associated with the referenced ticket. A Flight can be composed of one or"] - #[doc = " more streams where each stream can be retrieved using a separate opaque"] - #[doc = " ticket that the flight service uses for managing a collection of streams."] - async fn do_get( - &self, - request: tonic::Request, - ) -> Result, tonic::Status>; - #[doc = "Server streaming response type for the DoPut method."] - type DoPutStream: futures_core::Stream> - + Send - + Sync - + 'static; - #[doc = ""] - #[doc = " Push a stream to the flight service associated with a particular"] - #[doc = " flight stream. This allows a client of a flight service to upload a stream"] - #[doc = " of data. Depending on the particular flight service, a client consumer"] - #[doc = " could be allowed to upload a single stream per descriptor or an unlimited"] - #[doc = " number. In the latter, the service might implement a 'seal' action that"] - #[doc = " can be applied to a descriptor once all streams are uploaded."] - async fn do_put( - &self, - request: tonic::Request>, - ) -> Result, tonic::Status>; - #[doc = "Server streaming response type for the DoExchange method."] - type DoExchangeStream: futures_core::Stream> - + Send - + Sync - + 'static; - #[doc = ""] - #[doc = " Open a bidirectional data channel for a given descriptor. This"] - #[doc = " allows clients to send and receive arbitrary Arrow data and"] - #[doc = " application-specific metadata in a single logical stream. In"] - #[doc = " contrast to DoGet/DoPut, this is more suited for clients"] - #[doc = " offloading computation (rather than storage) to a Flight service."] - async fn do_exchange( - &self, - request: tonic::Request>, - ) -> Result, tonic::Status>; - #[doc = "Server streaming response type for the DoAction method."] - type DoActionStream: futures_core::Stream> - + Send - + Sync - + 'static; - #[doc = ""] - #[doc = " Flight services can support an arbitrary number of simple actions in"] - #[doc = " addition to the possible ListFlights, GetFlightInfo, DoGet, DoPut"] - #[doc = " operations that are potentially available. DoAction allows a flight client"] - #[doc = " to do a specific action against a flight service. An action includes"] - #[doc = " opaque request and response objects that are specific to the type action"] - #[doc = " being undertaken."] - async fn do_action( - &self, - request: tonic::Request, - ) -> Result, tonic::Status>; - #[doc = "Server streaming response type for the ListActions method."] - type ListActionsStream: futures_core::Stream> - + Send - + Sync - + 'static; - #[doc = ""] - #[doc = " A flight service exposes all of the available action types that it has"] - #[doc = " along with descriptions. This allows different flight consumers to"] - #[doc = " understand the capabilities of the flight service."] - async fn list_actions( - &self, - request: tonic::Request, - ) -> Result, tonic::Status>; - } - #[doc = ""] - #[doc = " A flight service is an endpoint for retrieving or storing Arrow data. A"] - #[doc = " flight service can expose one or more predefined endpoints that can be"] - #[doc = " accessed using the Arrow Flight Protocol. Additionally, a flight service"] - #[doc = " can expose a set of actions that are available."] - #[derive(Debug)] - pub struct FlightServiceServer { - inner: _Inner, - accept_compression_encodings: (), - send_compression_encodings: (), - } - struct _Inner(Arc); - impl FlightServiceServer { - pub fn new(inner: T) -> Self { - let inner = Arc::new(inner); - let inner = _Inner(inner); - Self { - inner, - accept_compression_encodings: Default::default(), - send_compression_encodings: Default::default(), - } - } - pub fn with_interceptor( - inner: T, - interceptor: F, - ) -> InterceptedService - where - F: tonic::service::Interceptor, - { - InterceptedService::new(Self::new(inner), interceptor) - } - } - impl tonic::codegen::Service> for FlightServiceServer - where - T: FlightService, - B: Body + Send + Sync + 'static, - B::Error: Into + Send + 'static, - { - type Response = http::Response; - type Error = Never; - type Future = BoxFuture; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - fn call(&mut self, req: http::Request) -> Self::Future { - let inner = self.inner.clone(); - match req.uri().path() { - "/arrow.flight.protocol.FlightService/Handshake" => { - #[allow(non_camel_case_types)] - struct HandshakeSvc(pub Arc); - impl - tonic::server::StreamingService - for HandshakeSvc - { - type Response = super::HandshakeResponse; - type ResponseStream = T::HandshakeStream; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request< - tonic::Streaming, - >, - ) -> Self::Future { - let inner = self.0.clone(); - let fut = async move { (*inner).handshake(request).await }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = HandshakeSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); - let res = grpc.streaming(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - "/arrow.flight.protocol.FlightService/ListFlights" => { - #[allow(non_camel_case_types)] - struct ListFlightsSvc(pub Arc); - impl - tonic::server::ServerStreamingService - for ListFlightsSvc - { - type Response = super::FlightInfo; - type ResponseStream = T::ListFlightsStream; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = self.0.clone(); - let fut = async move { (*inner).list_flights(request).await }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = ListFlightsSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); - let res = grpc.server_streaming(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - "/arrow.flight.protocol.FlightService/GetFlightInfo" => { - #[allow(non_camel_case_types)] - struct GetFlightInfoSvc(pub Arc); - impl - tonic::server::UnaryService - for GetFlightInfoSvc - { - type Response = super::FlightInfo; - type Future = - BoxFuture, tonic::Status>; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = self.0.clone(); - let fut = - async move { (*inner).get_flight_info(request).await }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = GetFlightInfoSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); - let res = grpc.unary(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - "/arrow.flight.protocol.FlightService/GetSchema" => { - #[allow(non_camel_case_types)] - struct GetSchemaSvc(pub Arc); - impl - tonic::server::UnaryService - for GetSchemaSvc - { - type Response = super::SchemaResult; - type Future = - BoxFuture, tonic::Status>; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = self.0.clone(); - let fut = async move { (*inner).get_schema(request).await }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = GetSchemaSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); - let res = grpc.unary(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - "/arrow.flight.protocol.FlightService/DoGet" => { - #[allow(non_camel_case_types)] - struct DoGetSvc(pub Arc); - impl - tonic::server::ServerStreamingService - for DoGetSvc - { - type Response = super::FlightData; - type ResponseStream = T::DoGetStream; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = self.0.clone(); - let fut = async move { (*inner).do_get(request).await }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = DoGetSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); - let res = grpc.server_streaming(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - "/arrow.flight.protocol.FlightService/DoPut" => { - #[allow(non_camel_case_types)] - struct DoPutSvc(pub Arc); - impl - tonic::server::StreamingService - for DoPutSvc - { - type Response = super::PutResult; - type ResponseStream = T::DoPutStream; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request>, - ) -> Self::Future { - let inner = self.0.clone(); - let fut = async move { (*inner).do_put(request).await }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = DoPutSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); - let res = grpc.streaming(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - "/arrow.flight.protocol.FlightService/DoExchange" => { - #[allow(non_camel_case_types)] - struct DoExchangeSvc(pub Arc); - impl - tonic::server::StreamingService - for DoExchangeSvc - { - type Response = super::FlightData; - type ResponseStream = T::DoExchangeStream; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request>, - ) -> Self::Future { - let inner = self.0.clone(); - let fut = async move { (*inner).do_exchange(request).await }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = DoExchangeSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); - let res = grpc.streaming(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - "/arrow.flight.protocol.FlightService/DoAction" => { - #[allow(non_camel_case_types)] - struct DoActionSvc(pub Arc); - impl - tonic::server::ServerStreamingService - for DoActionSvc - { - type Response = super::Result; - type ResponseStream = T::DoActionStream; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = self.0.clone(); - let fut = async move { (*inner).do_action(request).await }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = DoActionSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); - let res = grpc.server_streaming(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - "/arrow.flight.protocol.FlightService/ListActions" => { - #[allow(non_camel_case_types)] - struct ListActionsSvc(pub Arc); - impl - tonic::server::ServerStreamingService - for ListActionsSvc - { - type Response = super::ActionType; - type ResponseStream = T::ListActionsStream; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = self.0.clone(); - let fut = async move { (*inner).list_actions(request).await }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = ListActionsSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); - let res = grpc.server_streaming(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - _ => Box::pin(async move { - Ok(http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap()) - }), - } - } - } - impl Clone for FlightServiceServer { - fn clone(&self) -> Self { - let inner = self.inner.clone(); - Self { - inner, - accept_compression_encodings: self.accept_compression_encodings, - send_compression_encodings: self.send_compression_encodings, - } - } - } - impl Clone for _Inner { - fn clone(&self) -> Self { - Self(self.0.clone()) - } - } - impl std::fmt::Debug for _Inner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } - } - impl tonic::transport::NamedService for FlightServiceServer { - const NAME: &'static str = "arrow.flight.protocol.FlightService"; - } -} \ No newline at end of file diff --git a/arrow-flight/src/lib.rs b/arrow-flight/src/lib.rs deleted file mode 100644 index 6af2e748678..00000000000 --- a/arrow-flight/src/lib.rs +++ /dev/null @@ -1,20 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -include!("arrow.flight.protocol.rs"); - -pub mod utils; diff --git a/integration-testing/Cargo.toml b/integration-testing/Cargo.toml index e07ae2c141f..9c2782e760c 100644 --- a/integration-testing/Cargo.toml +++ b/integration-testing/Cargo.toml @@ -18,10 +18,8 @@ [package] name = "arrow-integration-testing" description = "Binaries used in the Arrow integration tests" -version = "4.0.0-SNAPSHOT" -homepage = "https://github.com/apache/arrow" -repository = "https://github.com/apache/arrow" -authors = ["Apache Arrow "] +version = "0.1.0" +authors = ["Jorge C Leitao", "Apache Arrow "] license = "Apache-2.0" edition = "2018" publish = false @@ -30,8 +28,8 @@ publish = false logging = ["tracing-subscriber"] [dependencies] -arrow2 = { path = "../", features = ["io_ipc", "io_ipc_compression", "io_json_integration"], default-features = false } -arrow-flight = { path = "../arrow-flight" } +arrow2 = { path = "../", features = ["io_ipc", "io_ipc_compression", "io_flight", "io_json_integration"] } +arrow-format = { version = "*", features = ["ipc", "flight-service"] } async-trait = "0.1.41" clap = "2.33" futures = "0.3" diff --git a/integration-testing/src/flight_client_scenarios/auth_basic_proto.rs b/integration-testing/src/flight_client_scenarios/auth_basic_proto.rs index 5e8cd467198..1b07e6358d5 100644 --- a/integration-testing/src/flight_client_scenarios/auth_basic_proto.rs +++ b/integration-testing/src/flight_client_scenarios/auth_basic_proto.rs @@ -17,8 +17,9 @@ use crate::{AUTH_PASSWORD, AUTH_USERNAME}; -use arrow_flight::{ - flight_service_client::FlightServiceClient, BasicAuth, HandshakeRequest, +use arrow_format::flight::data::{Action, HandshakeRequest, BasicAuth}; +use arrow_format::flight::service::{ + flight_service_client::FlightServiceClient, }; use futures::{stream, StreamExt}; use prost::Message; @@ -33,7 +34,7 @@ pub async fn run_scenario(host: &str, port: &str) -> Result { let url = format!("http://{}:{}", host, port); let mut client = FlightServiceClient::connect(url).await?; - let action = arrow_flight::Action::default(); + let action = Action::default(); let resp = client.do_action(Request::new(action.clone())).await; // This client is unauthenticated and should fail. diff --git a/integration-testing/src/flight_client_scenarios/integration_test.rs b/integration-testing/src/flight_client_scenarios/integration_test.rs index 820c82114b1..98fe0c2192b 100644 --- a/integration-testing/src/flight_client_scenarios/integration_test.rs +++ b/integration-testing/src/flight_client_scenarios/integration_test.rs @@ -17,17 +17,14 @@ use crate::{read_json_file, ArrowFile}; -use arrow2::{ - array::*, - datatypes::*, - io::ipc, - io::ipc::{gen::Message::MessageHeader, read, write}, - record_batch::RecordBatch, -}; -use arrow_flight::{ - flight_descriptor::DescriptorType, flight_service_client::FlightServiceClient, - utils::flight_data_to_arrow_batch, FlightData, FlightDescriptor, Location, Ticket, +use arrow2::{array::*, datatypes::*, io::flight::{self, deserialize_batch, serialize_batch}, io::ipc::{read, write}, record_batch::RecordBatch}; +use arrow_format::ipc; +use arrow_format::ipc::Message::MessageHeader; +use arrow_format::flight::data::{ + flight_descriptor::DescriptorType, + FlightData, FlightDescriptor, Location, Ticket, }; +use arrow_format::flight::service::flight_service_client::FlightServiceClient; use futures::{channel::mpsc, sink::SinkExt, stream, StreamExt}; use tonic::{Request, Streaming}; @@ -77,10 +74,9 @@ async fn upload_data( let (mut upload_tx, upload_rx) = mpsc::channel(10); let options = write::IpcWriteOptions::default(); - let mut schema_flight_data = - arrow_flight::utils::flight_data_from_arrow_schema(&schema, &options); - schema_flight_data.flight_descriptor = Some(descriptor.clone()); - upload_tx.send(schema_flight_data).await?; + let mut schema = flight::serialize_schema(&schema, &options); + schema.flight_descriptor = Some(descriptor.clone()); + upload_tx.send(schema).await?; let mut original_data_iter = original_data.iter().enumerate(); @@ -131,7 +127,7 @@ async fn send_batch( options: &write::IpcWriteOptions, ) -> Result { let (dictionary_flight_data, mut batch_flight_data) = - arrow_flight::utils::flight_data_from_arrow_batch(batch, options); + serialize_batch(batch, options); upload_tx .send_all(&mut stream::iter(dictionary_flight_data).map(Ok)) @@ -215,7 +211,7 @@ async fn consume_flight_location( assert_eq!(metadata, data.app_metadata); let actual_batch = - flight_data_to_arrow_batch(&data, schema.clone(), true, &dictionaries_by_field) + deserialize_batch(&data, schema.clone(), true, &dictionaries_by_field) .expect("Unable to convert flight data to Arrow batch"); assert_eq!(expected_batch.schema(), actual_batch.schema()); @@ -249,7 +245,7 @@ async fn receive_batch_flight_data( ) -> Option { let mut data = resp.next().await?.ok()?; let mut message = - ipc::root_as_message(&data.data_header[..]).expect("Error parsing first message"); + ipc::Message::root_as_message(&data.data_header[..]).expect("Error parsing first message"); while message.header_type() == MessageHeader::DictionaryBatch { let mut reader = std::io::Cursor::new(&data.data_body); @@ -266,7 +262,7 @@ async fn receive_batch_flight_data( .expect("Error reading dictionary"); data = resp.next().await?.ok()?; - message = ipc::root_as_message(&data.data_header[..]).expect("Error parsing message"); + message = ipc::Message::root_as_message(&data.data_header[..]).expect("Error parsing message"); } Some(data) diff --git a/integration-testing/src/flight_client_scenarios/middleware.rs b/integration-testing/src/flight_client_scenarios/middleware.rs index cbca879dca5..0694fef55c7 100644 --- a/integration-testing/src/flight_client_scenarios/middleware.rs +++ b/integration-testing/src/flight_client_scenarios/middleware.rs @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. -use arrow_flight::{ - flight_descriptor::DescriptorType, flight_service_client::FlightServiceClient, - FlightDescriptor, +use arrow_format::flight::service::{ + flight_service_client::FlightServiceClient, }; +use arrow_format::flight::data::{flight_descriptor::DescriptorType, FlightDescriptor}; use tonic::{Request, Status}; type Error = Box; diff --git a/integration-testing/src/flight_server_scenarios.rs b/integration-testing/src/flight_server_scenarios.rs index 9163b692086..a8aab14712e 100644 --- a/integration-testing/src/flight_server_scenarios.rs +++ b/integration-testing/src/flight_server_scenarios.rs @@ -17,7 +17,7 @@ use std::net::SocketAddr; -use arrow_flight::{FlightEndpoint, Location, Ticket}; +use arrow_format::flight::data::{FlightEndpoint, Location, Ticket}; use tokio::net::TcpListener; pub mod auth_basic_proto; diff --git a/integration-testing/src/flight_server_scenarios/auth_basic_proto.rs b/integration-testing/src/flight_server_scenarios/auth_basic_proto.rs index ea7ad3c3385..5223aaa297b 100644 --- a/integration-testing/src/flight_server_scenarios/auth_basic_proto.rs +++ b/integration-testing/src/flight_server_scenarios/auth_basic_proto.rs @@ -18,16 +18,14 @@ use std::pin::Pin; use std::sync::Arc; -use arrow_flight::{ - flight_service_server::FlightService, flight_service_server::FlightServiceServer, - Action, ActionType, BasicAuth, Criteria, Empty, FlightData, FlightDescriptor, - FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaResult, Ticket, -}; +use arrow_format::flight::service::flight_service_server::{FlightService, FlightServiceServer}; +use arrow_format::flight::data::*; use futures::{channel::mpsc, sink::SinkExt, Stream, StreamExt}; use tokio::sync::Mutex; use tonic::{ metadata::MetadataMap, transport::Server, Request, Response, Status, Streaming, }; + type TonicStream = Pin + Send + Sync + 'static>>; type Error = Box; @@ -102,7 +100,7 @@ impl FlightService for AuthBasicProtoScenarioImpl { type ListFlightsStream = TonicStream>; type DoGetStream = TonicStream>; type DoPutStream = TonicStream>; - type DoActionStream = TonicStream>; + type DoActionStream = TonicStream>; type ListActionsStream = TonicStream>; type DoExchangeStream = TonicStream>; @@ -202,7 +200,7 @@ impl FlightService for AuthBasicProtoScenarioImpl { let flight_context = self.check_auth(request.metadata()).await?; // Respond with the authenticated username. let buf = flight_context.peer_identity().as_bytes().to_vec(); - let result = arrow_flight::Result { body: buf }; + let result = arrow_format::flight::data::Result { body: buf }; let output = futures::stream::once(async { Ok(result) }); Ok(Response::new(Box::pin(output) as Self::DoActionStream)) } diff --git a/integration-testing/src/flight_server_scenarios/integration_test.rs b/integration-testing/src/flight_server_scenarios/integration_test.rs index 16954647090..28ded196fc0 100644 --- a/integration-testing/src/flight_server_scenarios/integration_test.rs +++ b/integration-testing/src/flight_server_scenarios/integration_test.rs @@ -16,21 +16,25 @@ // under the License. use std::collections::HashMap; -use std::convert::TryFrom; use std::pin::Pin; use std::sync::Arc; +use std::convert::TryFrom; + +use arrow2::io::flight::{serialize_batch, serialize_schema}; +use arrow_format::flight::data::flight_descriptor::*; +use arrow_format::flight::service::flight_service_server::*; +use arrow_format::flight::data::*; +use arrow_format::ipc::Schema as ArrowSchema; +use arrow_format::ipc::Message::{Message, MessageHeader, root_as_message}; use arrow2::{ array::Array, datatypes::*, - io::ipc, - io::ipc::gen::Message::{Message, MessageHeader}, - io::ipc::gen::Schema::MetadataVersion, record_batch::RecordBatch, + io::ipc, + io::flight::serialize_schema_to_info }; -use arrow_flight::flight_descriptor::*; -use arrow_flight::flight_service_server::*; -use arrow_flight::*; + use futures::{channel::mpsc, sink::SinkExt, Stream, StreamExt}; use tokio::sync::Mutex; use tonic::{transport::Server, Request, Response, Status, Streaming}; @@ -81,7 +85,7 @@ impl FlightService for FlightServiceImpl { type ListFlightsStream = TonicStream>; type DoGetStream = TonicStream>; type DoPutStream = TonicStream>; - type DoActionStream = TonicStream>; + type DoActionStream = TonicStream>; type ListActionsStream = TonicStream>; type DoExchangeStream = TonicStream>; @@ -110,7 +114,7 @@ impl FlightService for FlightServiceImpl { let options = ipc::write::IpcWriteOptions::default(); let schema = std::iter::once({ - Ok(arrow_flight::utils::flight_data_from_arrow_schema( + Ok(serialize_schema( &flight.schema, &options, )) @@ -122,7 +126,7 @@ impl FlightService for FlightServiceImpl { .enumerate() .flat_map(|(counter, batch)| { let (dictionary_flight_data, mut batch_flight_data) = - arrow_flight::utils::flight_data_from_arrow_batch(batch, &options); + serialize_batch(batch, &options); // Only the record batch's FlightData gets app_metadata let metadata = counter.to_string().into_bytes(); @@ -177,7 +181,7 @@ impl FlightService for FlightServiceImpl { let options = ipc::write::IpcWriteOptions::default(); let schema = - arrow_flight::utils::ipc_message_from_arrow_schema(&flight.schema, &options) + serialize_schema_to_info(&flight.schema, &options) .expect( "Could not generate schema bytes from schema stored by a DoPut; \ this should be impossible", @@ -296,7 +300,7 @@ async fn record_batch_from_message( None, true, dictionaries_by_field, - MetadataVersion::V5, + ArrowSchema::MetadataVersion::V5, &mut reader, 0, ); @@ -343,7 +347,7 @@ async fn save_uploaded_chunks( let mut dictionaries_by_field = vec![None; schema_ref.fields().len()]; while let Some(Ok(data)) = input_stream.next().await { - let message = ipc::root_as_message(&data.data_header[..]) + let message = root_as_message(&data.data_header[..]) .map_err(|e| Status::internal(format!("Could not parse message: {:?}", e)))?; match message.header_type() { diff --git a/integration-testing/src/flight_server_scenarios/middleware.rs b/integration-testing/src/flight_server_scenarios/middleware.rs index 1416acc4088..ed686cfc7a4 100644 --- a/integration-testing/src/flight_server_scenarios/middleware.rs +++ b/integration-testing/src/flight_server_scenarios/middleware.rs @@ -17,12 +17,9 @@ use std::pin::Pin; -use arrow_flight::{ - flight_descriptor::DescriptorType, flight_service_server::FlightService, - flight_service_server::FlightServiceServer, Action, ActionType, Criteria, Empty, - FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, - PutResult, SchemaResult, Ticket, -}; +use arrow_format::flight::data::flight_descriptor::DescriptorType; +use arrow_format::flight::service::flight_service_server::{FlightService, FlightServiceServer}; +use arrow_format::flight::data::*; use futures::Stream; use tonic::{transport::Server, Request, Response, Status, Streaming}; @@ -53,7 +50,7 @@ impl FlightService for MiddlewareScenarioImpl { type ListFlightsStream = TonicStream>; type DoGetStream = TonicStream>; type DoPutStream = TonicStream>; - type DoActionStream = TonicStream>; + type DoActionStream = TonicStream>; type ListActionsStream = TonicStream>; type DoExchangeStream = TonicStream>; diff --git a/src/doc/lib.md b/src/doc/lib.md index d37adce76ca..270fe26810e 100644 --- a/src/doc/lib.md +++ b/src/doc/lib.md @@ -77,6 +77,7 @@ functionality, such as: * `io_ipc_compression`: to read and write compressed Arrow IPC (v2) * `io_csv` to read and write CSV * `io_json` to read and write JSON +* `io_flight` to read and write to Arrow's Flight protocol * `io_parquet` to read and write parquet * `io_parquet_compression` to read and write compressed parquet * `io_print` to write batches to formatted ASCII tables diff --git a/arrow-flight/src/utils.rs b/src/io/flight/mod.rs similarity index 51% rename from arrow-flight/src/utils.rs rename to src/io/flight/mod.rs index 25f01837ee3..ab4b0eb9283 100644 --- a/arrow-flight/src/utils.rs +++ b/src/io/flight/mod.rs @@ -1,48 +1,30 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Utilities to assist with reading and writing Arrow data as Flight messages - -use std::{convert::TryFrom, sync::Arc}; - -use crate::{FlightData, SchemaResult}; - -use arrow2::{ +use std::convert::TryFrom; +use std::sync::Arc; + +use arrow_format::flight::data::{FlightData, SchemaResult}; +use arrow_format::ipc; + +use crate::{ array::*, datatypes::*, error::{ArrowError, Result}, - io::ipc, - io::ipc::gen::Schema::MetadataVersion, + io::ipc::fb_to_schema, io::ipc::read::read_record_batch, io::ipc::write, io::ipc::write::common::{encoded_batch, DictionaryTracker, EncodedData, IpcWriteOptions}, record_batch::RecordBatch, }; -/// Convert a `RecordBatch` to a vector of `FlightData` representing the bytes of the dictionaries -/// and a `FlightData` representing the bytes of the batch's values -pub fn flight_data_from_arrow_batch( +/// Serializes a [`RecordBatch`] to a vector of [`FlightData`] representing the serialized dictionaries +/// and a [`FlightData`] representing the batch. +pub fn serialize_batch( batch: &RecordBatch, options: &IpcWriteOptions, ) -> (Vec, FlightData) { let mut dictionary_tracker = DictionaryTracker::new(false); let (encoded_dictionaries, encoded_batch) = - encoded_batch(batch, &mut dictionary_tracker, &options) + encoded_batch(batch, &mut dictionary_tracker, options) .expect("DictionaryTracker configured above to not error on replacement"); let flight_dictionaries = encoded_dictionaries.into_iter().map(Into::into).collect(); @@ -61,40 +43,37 @@ impl From for FlightData { } } -/// Convert a `Schema` to `SchemaResult` by converting to an IPC message -pub fn flight_schema_from_arrow_schema(schema: &Schema, options: &IpcWriteOptions) -> SchemaResult { +/// Serializes a [`Schema`] to [`SchemaResult`]. +pub fn serialize_schema_to_result(schema: &Schema, options: &IpcWriteOptions) -> SchemaResult { SchemaResult { - schema: flight_schema_as_flatbuffer(schema, options), + schema: schema_as_flatbuffer(schema, options), } } -/// Convert a `Schema` to `FlightData` by converting to an IPC message -pub fn flight_data_from_arrow_schema(schema: &Schema, options: &IpcWriteOptions) -> FlightData { - let data_header = flight_schema_as_flatbuffer(schema, options); +/// Serializes a [`Schema`] to [`FlightData`]. +pub fn serialize_schema(schema: &Schema, options: &IpcWriteOptions) -> FlightData { + let data_header = schema_as_flatbuffer(schema, options); FlightData { data_header, ..Default::default() } } -/// Convert a `Schema` to bytes in the format expected in `FlightInfo.schema` -pub fn ipc_message_from_arrow_schema( - arrow_schema: &Schema, - options: &IpcWriteOptions, -) -> Result> { - let encoded_data = flight_schema_as_encoded_data(arrow_schema, options); +/// Convert a [`Schema`] to bytes in the format expected in [`arrow_format::flight::FlightInfo`]. +pub fn serialize_schema_to_info(schema: &Schema, options: &IpcWriteOptions) -> Result> { + let encoded_data = schema_as_encoded_data(schema, options); let mut schema = vec![]; write::common::write_message(&mut schema, encoded_data, options)?; Ok(schema) } -fn flight_schema_as_flatbuffer(arrow_schema: &Schema, options: &IpcWriteOptions) -> Vec { - let encoded_data = flight_schema_as_encoded_data(arrow_schema, options); +fn schema_as_flatbuffer(schema: &Schema, options: &IpcWriteOptions) -> Vec { + let encoded_data = schema_as_encoded_data(schema, options); encoded_data.ipc_message } -fn flight_schema_as_encoded_data(arrow_schema: &Schema, options: &IpcWriteOptions) -> EncodedData { +fn schema_as_encoded_data(arrow_schema: &Schema, options: &IpcWriteOptions) -> EncodedData { EncodedData { ipc_message: write::schema_to_bytes(arrow_schema, *options.metadata_version()), arrow_data: vec![], @@ -103,8 +82,8 @@ fn flight_schema_as_encoded_data(arrow_schema: &Schema, options: &IpcWriteOption /// Deserialize an IPC message into a schema fn schema_from_bytes(bytes: &[u8]) -> Result { - if let Ok(ipc) = ipc::root_as_message(bytes) { - if let Some((schema, _)) = ipc.header_as_schema().map(ipc::fb_to_schema) { + if let Ok(ipc) = ipc::Message::root_as_message(bytes) { + if let Some((schema, _)) = ipc.header_as_schema().map(fb_to_schema) { Ok(schema) } else { Err(ArrowError::Ipc("Unable to get head as schema".to_string())) @@ -114,9 +93,6 @@ fn schema_from_bytes(bytes: &[u8]) -> Result { } } -/// Try convert `FlightData` into an Arrow Schema -/// -/// Returns an error if the `FlightData` header is not a valid IPC schema impl TryFrom<&FlightData> for Schema { type Error = ArrowError; fn try_from(data: &FlightData) -> Result { @@ -129,9 +105,6 @@ impl TryFrom<&FlightData> for Schema { } } -/// Try convert `SchemaResult` into an Arrow Schema -/// -/// Returns an error if the `FlightData` header is not a valid IPC schema impl TryFrom<&SchemaResult> for Schema { type Error = ArrowError; fn try_from(data: &SchemaResult) -> Result { @@ -144,15 +117,15 @@ impl TryFrom<&SchemaResult> for Schema { } } -/// Convert a FlightData message to a RecordBatch -pub fn flight_data_to_arrow_batch( +/// Deserializes [`FlightData`] to a [`RecordBatch`]. +pub fn deserialize_batch( data: &FlightData, schema: Arc, is_little_endian: bool, dictionaries_by_field: &[Option>], ) -> Result { // check that the data_header is a record batch message - let message = ipc::root_as_message(&data.data_header[..]) + let message = ipc::Message::root_as_message(&data.data_header[..]) .map_err(|err| ArrowError::Ipc(format!("Unable to get root as message: {:?}", err)))?; let mut reader = std::io::Cursor::new(&data.data_body); @@ -168,8 +141,8 @@ pub fn flight_data_to_arrow_batch( schema.clone(), None, is_little_endian, - &dictionaries_by_field, - MetadataVersion::V5, + dictionaries_by_field, + ipc::Schema::MetadataVersion::V5, &mut reader, 0, ) diff --git a/src/io/ipc/convert.rs b/src/io/ipc/convert.rs index d6f3edfc1b9..b69cc83bad6 100644 --- a/src/io/ipc/convert.rs +++ b/src/io/ipc/convert.rs @@ -17,22 +17,20 @@ //! Utilities for converting between IPC types and native Arrow types -use crate::datatypes::{ - get_extension, DataType, Extension, Field, IntervalUnit, Metadata, Schema, TimeUnit, +use arrow_format::ipc::flatbuffers::{ + FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, WIPOffset, }; -use crate::io::ipc::convert::ipc::UnionMode; -use crate::io::ipc::endianess::is_native_little_endian; - +use std::collections::{BTreeMap, HashMap}; mod ipc { - pub use super::super::gen::File::*; - pub use super::super::gen::Message::*; - pub use super::super::gen::Schema::*; + pub use arrow_format::ipc::File::*; + pub use arrow_format::ipc::Message::*; + pub use arrow_format::ipc::Schema::*; } -use flatbuffers::{FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, WIPOffset}; -use std::collections::{BTreeMap, HashMap}; - -use DataType::*; +use crate::datatypes::{ + get_extension, DataType, Extension, Field, IntervalUnit, Metadata, Schema, TimeUnit, +}; +use crate::io::ipc::endianess::is_native_little_endian; pub fn schema_to_fb_offset<'a>( fbb: &mut FlatBufferBuilder<'a>, @@ -294,7 +292,7 @@ fn get_data_type(field: ipc::Field, extension: Extension, may_be_dictionary: boo ipc::Type::Union => { let type_ = field.type_as_union().unwrap(); - let is_sparse = type_.mode() == UnionMode::Sparse; + let is_sparse = type_.mode() == ipc::UnionMode::Sparse; let ids = type_.typeIds().map(|x| x.iter().collect()); @@ -378,7 +376,7 @@ pub(crate) fn build_field<'a>( let fb_field_name = fbb.create_string(field.name().as_str()); let field_type = get_fb_field_type(field.data_type(), field.is_nullable(), fbb); - let fb_dictionary = if let Dictionary(index_type, inner) = field.data_type() { + let fb_dictionary = if let DataType::Dictionary(index_type, inner) = field.data_type() { if let DataType::Extension(name, _, metadata) = inner.as_ref() { write_extension(fbb, name, metadata, &mut kv_vec); } @@ -428,6 +426,7 @@ pub(crate) fn build_field<'a>( } fn type_to_field_type(data_type: &DataType) -> ipc::Type { + use DataType::*; match data_type { Null => ipc::Type::Null, Boolean => ipc::Type::Bool, @@ -461,6 +460,7 @@ pub(crate) fn get_fb_field_type<'a>( is_nullable: bool, fbb: &mut FlatBufferBuilder<'a>, ) -> FbFieldType<'a> { + use DataType::*; let type_type = type_to_field_type(data_type); // some IPC implementations expect an empty list for child data, instead of a null value. @@ -711,9 +711,9 @@ pub(crate) fn get_fb_field_type<'a>( let mut builder = ipc::UnionBuilder::new(fbb); builder.add_mode(if *is_sparse { - UnionMode::Sparse + ipc::UnionMode::Sparse } else { - UnionMode::Dense + ipc::UnionMode::Dense }); if let Some(ids) = ids { @@ -745,6 +745,7 @@ pub(crate) fn get_fb_dictionary<'a>( dict_is_ordered: bool, fbb: &mut FlatBufferBuilder<'a>, ) -> WIPOffset> { + use DataType::*; // We assume that the dictionary index type (as an integer) has already been // validated elsewhere, and can safely assume we are dealing with integers let mut index_builder = ipc::IntBuilder::new(fbb); diff --git a/src/io/ipc/gen/File.rs b/src/io/ipc/gen/File.rs deleted file mode 100644 index a5a1512ce95..00000000000 --- a/src/io/ipc/gen/File.rs +++ /dev/null @@ -1,471 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#![allow(dead_code)] -#![allow(unused_imports)] - -use super::Schema::*; -use flatbuffers::EndianScalar; -use std::{cmp::Ordering, mem}; -// automatically generated by the FlatBuffers compiler, do not modify - -// struct Block, aligned to 8 -#[repr(transparent)] -#[derive(Clone, Copy, PartialEq)] -pub struct Block(pub [u8; 24]); -impl std::fmt::Debug for Block { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - f.debug_struct("Block") - .field("offset", &self.offset()) - .field("metaDataLength", &self.metaDataLength()) - .field("bodyLength", &self.bodyLength()) - .finish() - } -} - -impl flatbuffers::SimpleToVerifyInSlice for Block {} -impl flatbuffers::SafeSliceAccess for Block {} -impl<'a> flatbuffers::Follow<'a> for Block { - type Inner = &'a Block; - #[inline] - fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { - <&'a Block>::follow(buf, loc) - } -} -impl<'a> flatbuffers::Follow<'a> for &'a Block { - type Inner = &'a Block; - #[inline] - fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { - flatbuffers::follow_cast_ref::(buf, loc) - } -} -impl<'b> flatbuffers::Push for Block { - type Output = Block; - #[inline] - fn push(&self, dst: &mut [u8], _rest: &[u8]) { - let src = unsafe { - ::std::slice::from_raw_parts(self as *const Block as *const u8, Self::size()) - }; - dst.copy_from_slice(src); - } -} -impl<'b> flatbuffers::Push for &'b Block { - type Output = Block; - - #[inline] - fn push(&self, dst: &mut [u8], _rest: &[u8]) { - let src = unsafe { - ::std::slice::from_raw_parts(*self as *const Block as *const u8, Self::size()) - }; - dst.copy_from_slice(src); - } -} - -impl<'a> flatbuffers::Verifiable for Block { - #[inline] - fn run_verifier( - v: &mut flatbuffers::Verifier, - pos: usize, - ) -> Result<(), flatbuffers::InvalidFlatbuffer> { - use flatbuffers::Verifiable; - v.in_buffer::(pos) - } -} -impl Block { - #[allow(clippy::too_many_arguments)] - pub fn new(offset: i64, metaDataLength: i32, bodyLength: i64) -> Self { - let mut s = Self([0; 24]); - s.set_offset(offset); - s.set_metaDataLength(metaDataLength); - s.set_bodyLength(bodyLength); - s - } - - /// Index to the start of the RecordBlock (note this is past the Message header) - pub fn offset(&self) -> i64 { - let mut mem = core::mem::MaybeUninit::::uninit(); - unsafe { - core::ptr::copy_nonoverlapping( - self.0[0..].as_ptr(), - mem.as_mut_ptr() as *mut u8, - core::mem::size_of::(), - ); - mem.assume_init() - } - .from_little_endian() - } - - pub fn set_offset(&mut self, x: i64) { - let x_le = x.to_little_endian(); - unsafe { - core::ptr::copy_nonoverlapping( - &x_le as *const i64 as *const u8, - self.0[0..].as_mut_ptr(), - core::mem::size_of::(), - ); - } - } - - /// Length of the metadata - pub fn metaDataLength(&self) -> i32 { - let mut mem = core::mem::MaybeUninit::::uninit(); - unsafe { - core::ptr::copy_nonoverlapping( - self.0[8..].as_ptr(), - mem.as_mut_ptr() as *mut u8, - core::mem::size_of::(), - ); - mem.assume_init() - } - .from_little_endian() - } - - pub fn set_metaDataLength(&mut self, x: i32) { - let x_le = x.to_little_endian(); - unsafe { - core::ptr::copy_nonoverlapping( - &x_le as *const i32 as *const u8, - self.0[8..].as_mut_ptr(), - core::mem::size_of::(), - ); - } - } - - /// Length of the data (this is aligned so there can be a gap between this and - /// the metadata). - pub fn bodyLength(&self) -> i64 { - let mut mem = core::mem::MaybeUninit::::uninit(); - unsafe { - core::ptr::copy_nonoverlapping( - self.0[16..].as_ptr(), - mem.as_mut_ptr() as *mut u8, - core::mem::size_of::(), - ); - mem.assume_init() - } - .from_little_endian() - } - - pub fn set_bodyLength(&mut self, x: i64) { - let x_le = x.to_little_endian(); - unsafe { - core::ptr::copy_nonoverlapping( - &x_le as *const i64 as *const u8, - self.0[16..].as_mut_ptr(), - core::mem::size_of::(), - ); - } - } -} - -pub enum FooterOffset {} -#[derive(Copy, Clone, PartialEq)] - -/// ---------------------------------------------------------------------- -/// Arrow File metadata -/// -pub struct Footer<'a> { - pub _tab: flatbuffers::Table<'a>, -} - -impl<'a> flatbuffers::Follow<'a> for Footer<'a> { - type Inner = Footer<'a>; - #[inline] - fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { - Self { - _tab: flatbuffers::Table { buf, loc }, - } - } -} - -impl<'a> Footer<'a> { - #[inline] - pub fn init_from_table(table: flatbuffers::Table<'a>) -> Self { - Footer { _tab: table } - } - #[allow(unused_mut)] - pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr>( - _fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr>, - args: &'args FooterArgs<'args>, - ) -> flatbuffers::WIPOffset> { - let mut builder = FooterBuilder::new(_fbb); - if let Some(x) = args.custom_metadata { - builder.add_custom_metadata(x); - } - if let Some(x) = args.recordBatches { - builder.add_recordBatches(x); - } - if let Some(x) = args.dictionaries { - builder.add_dictionaries(x); - } - if let Some(x) = args.schema { - builder.add_schema(x); - } - builder.add_version(args.version); - builder.finish() - } - - pub const VT_VERSION: flatbuffers::VOffsetT = 4; - pub const VT_SCHEMA: flatbuffers::VOffsetT = 6; - pub const VT_DICTIONARIES: flatbuffers::VOffsetT = 8; - pub const VT_RECORDBATCHES: flatbuffers::VOffsetT = 10; - pub const VT_CUSTOM_METADATA: flatbuffers::VOffsetT = 12; - - #[inline] - pub fn version(&self) -> MetadataVersion { - self._tab - .get::(Footer::VT_VERSION, Some(MetadataVersion::V1)) - .unwrap() - } - #[inline] - pub fn schema(&self) -> Option> { - self._tab - .get::>(Footer::VT_SCHEMA, None) - } - #[inline] - pub fn dictionaries(&self) -> Option<&'a [Block]> { - self._tab - .get::>>( - Footer::VT_DICTIONARIES, - None, - ) - .map(|v| v.safe_slice()) - } - #[inline] - pub fn recordBatches(&self) -> Option<&'a [Block]> { - self._tab - .get::>>( - Footer::VT_RECORDBATCHES, - None, - ) - .map(|v| v.safe_slice()) - } - /// User-defined metadata - #[inline] - pub fn custom_metadata( - &self, - ) -> Option>>> { - self._tab.get::>, - >>(Footer::VT_CUSTOM_METADATA, None) - } -} - -impl flatbuffers::Verifiable for Footer<'_> { - #[inline] - fn run_verifier( - v: &mut flatbuffers::Verifier, - pos: usize, - ) -> Result<(), flatbuffers::InvalidFlatbuffer> { - use flatbuffers::Verifiable; - v.visit_table(pos)? - .visit_field::(&"version", Self::VT_VERSION, false)? - .visit_field::>(&"schema", Self::VT_SCHEMA, false)? - .visit_field::>>( - &"dictionaries", - Self::VT_DICTIONARIES, - false, - )? - .visit_field::>>( - &"recordBatches", - Self::VT_RECORDBATCHES, - false, - )? - .visit_field::>, - >>(&"custom_metadata", Self::VT_CUSTOM_METADATA, false)? - .finish(); - Ok(()) - } -} -pub struct FooterArgs<'a> { - pub version: MetadataVersion, - pub schema: Option>>, - pub dictionaries: Option>>, - pub recordBatches: Option>>, - pub custom_metadata: Option< - flatbuffers::WIPOffset>>>, - >, -} -impl<'a> Default for FooterArgs<'a> { - #[inline] - fn default() -> Self { - FooterArgs { - version: MetadataVersion::V1, - schema: None, - dictionaries: None, - recordBatches: None, - custom_metadata: None, - } - } -} -pub struct FooterBuilder<'a: 'b, 'b> { - fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a>, - start_: flatbuffers::WIPOffset, -} -impl<'a: 'b, 'b> FooterBuilder<'a, 'b> { - #[inline] - pub fn add_version(&mut self, version: MetadataVersion) { - self.fbb_ - .push_slot::(Footer::VT_VERSION, version, MetadataVersion::V1); - } - #[inline] - pub fn add_schema(&mut self, schema: flatbuffers::WIPOffset>) { - self.fbb_ - .push_slot_always::>(Footer::VT_SCHEMA, schema); - } - #[inline] - pub fn add_dictionaries( - &mut self, - dictionaries: flatbuffers::WIPOffset>, - ) { - self.fbb_ - .push_slot_always::>(Footer::VT_DICTIONARIES, dictionaries); - } - #[inline] - pub fn add_recordBatches( - &mut self, - recordBatches: flatbuffers::WIPOffset>, - ) { - self.fbb_ - .push_slot_always::>(Footer::VT_RECORDBATCHES, recordBatches); - } - #[inline] - pub fn add_custom_metadata( - &mut self, - custom_metadata: flatbuffers::WIPOffset< - flatbuffers::Vector<'b, flatbuffers::ForwardsUOffset>>, - >, - ) { - self.fbb_.push_slot_always::>( - Footer::VT_CUSTOM_METADATA, - custom_metadata, - ); - } - #[inline] - pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>) -> FooterBuilder<'a, 'b> { - let start = _fbb.start_table(); - FooterBuilder { - fbb_: _fbb, - start_: start, - } - } - #[inline] - pub fn finish(self) -> flatbuffers::WIPOffset> { - let o = self.fbb_.end_table(self.start_); - flatbuffers::WIPOffset::new(o.value()) - } -} - -impl std::fmt::Debug for Footer<'_> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let mut ds = f.debug_struct("Footer"); - ds.field("version", &self.version()); - ds.field("schema", &self.schema()); - ds.field("dictionaries", &self.dictionaries()); - ds.field("recordBatches", &self.recordBatches()); - ds.field("custom_metadata", &self.custom_metadata()); - ds.finish() - } -} -#[inline] -#[deprecated(since = "2.0.0", note = "Deprecated in favor of `root_as...` methods.")] -pub fn get_root_as_footer<'a>(buf: &'a [u8]) -> Footer<'a> { - unsafe { flatbuffers::root_unchecked::>(buf) } -} - -#[inline] -#[deprecated(since = "2.0.0", note = "Deprecated in favor of `root_as...` methods.")] -pub fn get_size_prefixed_root_as_footer<'a>(buf: &'a [u8]) -> Footer<'a> { - unsafe { flatbuffers::size_prefixed_root_unchecked::>(buf) } -} - -#[inline] -/// Verifies that a buffer of bytes contains a `Footer` -/// and returns it. -/// Note that verification is still experimental and may not -/// catch every error, or be maximally performant. For the -/// previous, unchecked, behavior use -/// `root_as_footer_unchecked`. -pub fn root_as_footer(buf: &[u8]) -> Result { - flatbuffers::root::