diff --git a/.cargo/audit.toml b/.cargo/audit.toml new file mode 100644 index 00000000000..aa5492c1beb --- /dev/null +++ b/.cargo/audit.toml @@ -0,0 +1,13 @@ +[advisories] +ignore = [ + # title: Potential segfault in the time crate + # This can be ignored because it only affects users that use the feature flag "clock" of "chrono", + # which we do not. Specifically: + # * the call of "localtime_r" [is unsound](https://github.com/chronotope/chrono/issues/602#issuecomment-940445390) + # * that call [is part of the module "sys"](https://docs.rs/chrono/0.4.19/src/chrono/sys/unix.rs.html#84) + # * "sys" is only available on feature "clock": https://docs.rs/chrono/0.4.19/src/chrono/lib.rs.html#456 + # + # Therefore, this advisory does not affect us. + "RUSTSEC-2020-0071", + "RUSTSEC-2020-0159", # same as previous +] diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index c588c236422..1e147fcc9f5 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -74,7 +74,7 @@ jobs: - uses: actions/checkout@v2 - uses: actions-rs/toolchain@v1 with: - toolchain: nightly-2021-07-09 + toolchain: nightly-2021-10-24 override: true - uses: Swatinem/rust-cache@v1 - name: Install Miri @@ -93,7 +93,7 @@ jobs: - uses: actions/checkout@v2 - uses: actions-rs/toolchain@v1 with: - toolchain: nightly-2021-07-09 + toolchain: nightly-2021-10-24 override: true - uses: Swatinem/rust-cache@v1 - name: Install Miri @@ -112,6 +112,8 @@ jobs: - uses: actions/checkout@v2 with: submodules: true + - name: Install Rust + run: rustup update stable - name: Setup parquet files run: | apt update && apt install python3-pip python3-venv -y -q diff --git a/.github_changelog_generator b/.github_changelog_generator index 1f4f3769095..4a2b170145c 100644 --- a/.github_changelog_generator +++ b/.github_changelog_generator @@ -1,5 +1,5 @@ -since-tag=v0.6.1 -future-release=v0.6.2 +since-tag=v0.6.2 +future-release=v0.7.0 pr-wo-labels=false exclude-labels=no-changelog,question add-sections={"features":{"prefix":"**Enhancements:**","labels":["enhancement"]}, "documentation":{"prefix":"**Documentation updates:**","labels":["documentation"]}, "testing":{"prefix":"**Testing updates:**","labels":["testing"]}} diff --git a/CHANGELOG.md b/CHANGELOG.md index 941d356228b..ab91fb7ed94 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,50 @@ # Changelog +## [v0.7.0](https://github.com/jorgecarleitao/arrow2/tree/v0.7.0) (2021-10-29) + +[Full Changelog](https://github.com/jorgecarleitao/arrow2/compare/v0.6.2...v0.7.0) + +**Breaking changes:** + +- Simplified reading parquet [\#532](https://github.com/jorgecarleitao/arrow2/pull/532) ([jorgecarleitao](https://github.com/jorgecarleitao)) +- Change IPC `FileReader` to own the underlying reader [\#518](https://github.com/jorgecarleitao/arrow2/pull/518) ([blakesmith](https://github.com/blakesmith)) +- Migrate to `arrow_format` crate [\#517](https://github.com/jorgecarleitao/arrow2/pull/517) ([jorgecarleitao](https://github.com/jorgecarleitao)) + +**New features:** + +- Added read of 2-level nested lists from parquet [\#548](https://github.com/jorgecarleitao/arrow2/pull/548) ([jorgecarleitao](https://github.com/jorgecarleitao)) +- add dictionary serialization for csv-writer [\#515](https://github.com/jorgecarleitao/arrow2/pull/515) ([ritchie46](https://github.com/ritchie46)) +- Added `checked_negate` and `wrapping_negate` for `PrimitiveArray` [\#506](https://github.com/jorgecarleitao/arrow2/pull/506) ([yjhmelody](https://github.com/yjhmelody)) + +**Fixed bugs:** + +- Fixed error in reading fixed len binary from parquet [\#549](https://github.com/jorgecarleitao/arrow2/pull/549) ([jorgecarleitao](https://github.com/jorgecarleitao)) +- Fixed ffi of sliced arrays [\#540](https://github.com/jorgecarleitao/arrow2/pull/540) ([jorgecarleitao](https://github.com/jorgecarleitao)) +- Fixed s3 example [\#536](https://github.com/jorgecarleitao/arrow2/pull/536) ([jorgecarleitao](https://github.com/jorgecarleitao)) +- Fixed error in writing compressed parquet dict pages [\#523](https://github.com/jorgecarleitao/arrow2/pull/523) ([jorgecarleitao](https://github.com/jorgecarleitao)) +- Validity taken into account when writing `StructArray` to json [\#511](https://github.com/jorgecarleitao/arrow2/pull/511) ([VasanthakumarV](https://github.com/VasanthakumarV)) + +**Enhancements:** + +- Bumped Prost and Tonic [\#550](https://github.com/jorgecarleitao/arrow2/pull/550) ([PsiACE](https://github.com/PsiACE)) +- Speedup scalar boolean operations [\#546](https://github.com/jorgecarleitao/arrow2/pull/546) ([Dandandan](https://github.com/Dandandan)) +- Added fast path for validating ASCII text \(~1.12-1.89x improvement on reading ASCII parquet data\) [\#542](https://github.com/jorgecarleitao/arrow2/pull/542) ([Dandandan](https://github.com/Dandandan)) +- Exposed missing APIs to write parquet in parallel [\#539](https://github.com/jorgecarleitao/arrow2/pull/539) ([jorgecarleitao](https://github.com/jorgecarleitao)) +- improve utf8 init validity [\#530](https://github.com/jorgecarleitao/arrow2/pull/530) ([ritchie46](https://github.com/ritchie46)) +- export missing `BinaryValueIter` [\#526](https://github.com/jorgecarleitao/arrow2/pull/526) ([yjhmelody](https://github.com/yjhmelody)) + +**Documentation updates:** + +- Added more IPC documentation [\#534](https://github.com/jorgecarleitao/arrow2/pull/534) ([HagaiHargil](https://github.com/HagaiHargil)) +- Fixed clippy and fmt [\#521](https://github.com/jorgecarleitao/arrow2/pull/521) ([ritchie46](https://github.com/ritchie46)) + +**Testing updates:** + +- Added more tests for `utf8` [\#543](https://github.com/jorgecarleitao/arrow2/pull/543) ([jorgecarleitao](https://github.com/jorgecarleitao)) +- Ignored RUSTSEC-2020-0071 and RUSTSEC-2020-0159 [\#537](https://github.com/jorgecarleitao/arrow2/pull/537) ([jorgecarleitao](https://github.com/jorgecarleitao)) +- Improved parquet read benches [\#533](https://github.com/jorgecarleitao/arrow2/pull/533) ([jorgecarleitao](https://github.com/jorgecarleitao)) +- Added fmt and clippy checks to CI. [\#522](https://github.com/jorgecarleitao/arrow2/pull/522) ([xudong963](https://github.com/xudong963)) + ## [v0.6.2](https://github.com/jorgecarleitao/arrow2/tree/v0.6.2) (2021-10-09) [Full Changelog](https://github.com/jorgecarleitao/arrow2/compare/v0.6.1...v0.6.2) diff --git a/Cargo.toml b/Cargo.toml index 35a1aa7cc15..38ff035869b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "arrow2" -version = "0.6.2" +version = "0.7.0" license = "Apache-2.0" description = "Unofficial implementation of Apache Arrow spec in safe Rust" homepage = "https://github.com/jorgecarleitao/arrow2" @@ -17,7 +17,7 @@ bench = false [dependencies] num-traits = "0.2" chrono = { version = "0.4", default_features = false, features = ["std"] } -chrono-tz = { version = "0.5", optional = true } +chrono-tz = { version = "0.6", optional = true } # To efficiently cast numbers to strings lexical-core = { version = "0.8", optional = true } # We need to Hash values before sending them to an hasher. This @@ -30,6 +30,7 @@ csv = { version = "^1.1", optional = true } regex = { version = "^1.3", optional = true } lazy_static = { version = "^1.4", optional = true } streaming-iterator = { version = "0.1", optional = true } +fallible-streaming-iterator = { version = "0.1", optional = true } serde = { version = "^1.0", features = ["rc"], optional = true } serde_derive = { version = "^1.0", optional = true } @@ -39,7 +40,8 @@ indexmap = { version = "^1.6", optional = true } # used to print columns in a nice columnar format comfy-table = { version = "4.0", optional = true, default-features = false } -flatbuffers = { version = "=2.0.0", optional = true } +arrow-format = { version = "0.3.0", optional = true, features = ["ipc"] } + hex = { version = "^0.4", optional = true } # for IPC compression @@ -60,10 +62,12 @@ futures = { version = "0.3", optional = true } # for faster hashing ahash = { version = "0.7", optional = true } -parquet2 = { version = "0.5.2", optional = true, default_features = false, features = ["stream"] } +parquet2 = { version = "0.6", optional = true, default_features = false, features = ["stream"] } avro-rs = { version = "0.13", optional = true, default_features = false } +libflate = { version = "1.1.1", optional = true } + # for division/remainder optimization at runtime strength_reduce = { version = "0.2", optional = true } @@ -88,6 +92,7 @@ full = [ "io_csv", "io_json", "io_ipc", + "io_flight", "io_ipc_compression", "io_json_integration", "io_print", @@ -105,8 +110,9 @@ io_csv = ["io_csv_read", "io_csv_write"] io_csv_read = ["csv", "lexical-core"] io_csv_write = ["csv", "streaming-iterator", "lexical-core"] io_json = ["serde", "serde_json", "indexmap"] -io_ipc = ["flatbuffers"] +io_ipc = ["arrow-format"] io_ipc_compression = ["lz4", "zstd"] +io_flight = ["io_ipc", "arrow-format/flight-data"] io_parquet_compression = [ "parquet2/zstd", "parquet2/snappy", @@ -114,7 +120,7 @@ io_parquet_compression = [ "parquet2/lz4", "parquet2/brotli", ] -io_avro = ["avro-rs", "streaming-iterator", "serde_json"] +io_avro = ["avro-rs", "fallible-streaming-iterator", "serde_json", "libflate"] # io_json: its dependencies + error handling # serde_derive: there is some derive around io_json_integration = ["io_json", "serde_derive", "hex"] @@ -143,6 +149,8 @@ skip_feature_sets = [ ["io_csv_write"], ["io_avro"], ["io_json"], + ["io_flight"], + ["io_ipc"], ["io_parquet"], ["io_json_integration"], # this does not change the public API @@ -238,6 +246,10 @@ harness = false name = "iter_list" harness = false +[[bench]] +name = "avro_read" +harness = false + [[bench]] name = "bitwise" harness = false \ No newline at end of file diff --git a/arrow-flight/Cargo.toml b/arrow-flight/Cargo.toml deleted file mode 100644 index ad423b6d0d4..00000000000 --- a/arrow-flight/Cargo.toml +++ /dev/null @@ -1,39 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -[package] -name = "arrow-flight" -description = "Apache Arrow Flight" -version = "0.1.0" -edition = "2018" -authors = ["Apache Arrow "] -homepage = "https://github.com/apache/arrow" -repository = "https://github.com/apache/arrow" -license = "Apache-2.0" - -[dependencies] -arrow2 = { path = "../", features = ["io_ipc"], default-features = false } -tonic = "0.5.2" -bytes = "1" -prost = "0.8.0" -prost-derive = "0.8.0" -tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] } -futures = { version = "0.3", default-features = false, features = ["alloc"]} - -#[lib] -#name = "flight" -#path = "src/lib.rs" diff --git a/arrow-flight/src/arrow.flight.protocol.rs b/arrow-flight/src/arrow.flight.protocol.rs deleted file mode 100644 index 5db746f6fda..00000000000 --- a/arrow-flight/src/arrow.flight.protocol.rs +++ /dev/null @@ -1,1071 +0,0 @@ -// This file was automatically generated through the build.rs script, and should not be edited. - -/// -/// The request that a client provides to a server on handshake. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct HandshakeRequest { - /// - /// A defined protocol version - #[prost(uint64, tag = "1")] - pub protocol_version: u64, - /// - /// Arbitrary auth/handshake info. - #[prost(bytes = "vec", tag = "2")] - pub payload: ::prost::alloc::vec::Vec, -} -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct HandshakeResponse { - /// - /// A defined protocol version - #[prost(uint64, tag = "1")] - pub protocol_version: u64, - /// - /// Arbitrary auth/handshake info. - #[prost(bytes = "vec", tag = "2")] - pub payload: ::prost::alloc::vec::Vec, -} -/// -/// A message for doing simple auth. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct BasicAuth { - #[prost(string, tag = "2")] - pub username: ::prost::alloc::string::String, - #[prost(string, tag = "3")] - pub password: ::prost::alloc::string::String, -} -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Empty {} -/// -/// Describes an available action, including both the name used for execution -/// along with a short description of the purpose of the action. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct ActionType { - #[prost(string, tag = "1")] - pub r#type: ::prost::alloc::string::String, - #[prost(string, tag = "2")] - pub description: ::prost::alloc::string::String, -} -/// -/// A service specific expression that can be used to return a limited set -/// of available Arrow Flight streams. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Criteria { - #[prost(bytes = "vec", tag = "1")] - pub expression: ::prost::alloc::vec::Vec, -} -/// -/// An opaque action specific for the service. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Action { - #[prost(string, tag = "1")] - pub r#type: ::prost::alloc::string::String, - #[prost(bytes = "vec", tag = "2")] - pub body: ::prost::alloc::vec::Vec, -} -/// -/// An opaque result returned after executing an action. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Result { - #[prost(bytes = "vec", tag = "1")] - pub body: ::prost::alloc::vec::Vec, -} -/// -/// Wrap the result of a getSchema call -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct SchemaResult { - /// schema of the dataset as described in Schema.fbs::Schema. - #[prost(bytes = "vec", tag = "1")] - pub schema: ::prost::alloc::vec::Vec, -} -/// -/// The name or tag for a Flight. May be used as a way to retrieve or generate -/// a flight or be used to expose a set of previously defined flights. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct FlightDescriptor { - #[prost(enumeration = "flight_descriptor::DescriptorType", tag = "1")] - pub r#type: i32, - /// - /// Opaque value used to express a command. Should only be defined when - /// type = CMD. - #[prost(bytes = "vec", tag = "2")] - pub cmd: ::prost::alloc::vec::Vec, - /// - /// List of strings identifying a particular dataset. Should only be defined - /// when type = PATH. - #[prost(string, repeated, tag = "3")] - pub path: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, -} -/// Nested message and enum types in `FlightDescriptor`. -pub mod flight_descriptor { - /// - /// Describes what type of descriptor is defined. - #[derive( - Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, - )] - #[repr(i32)] - pub enum DescriptorType { - /// Protobuf pattern, not used. - Unknown = 0, - /// - /// A named path that identifies a dataset. A path is composed of a string - /// or list of strings describing a particular dataset. This is conceptually - /// similar to a path inside a filesystem. - Path = 1, - /// - /// An opaque command to generate a dataset. - Cmd = 2, - } -} -/// -/// The access coordinates for retrieval of a dataset. With a FlightInfo, a -/// consumer is able to determine how to retrieve a dataset. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct FlightInfo { - /// schema of the dataset as described in Schema.fbs::Schema. - #[prost(bytes = "vec", tag = "1")] - pub schema: ::prost::alloc::vec::Vec, - /// - /// The descriptor associated with this info. - #[prost(message, optional, tag = "2")] - pub flight_descriptor: ::core::option::Option, - /// - /// A list of endpoints associated with the flight. To consume the whole - /// flight, all endpoints must be consumed. - #[prost(message, repeated, tag = "3")] - pub endpoint: ::prost::alloc::vec::Vec, - /// Set these to -1 if unknown. - #[prost(int64, tag = "4")] - pub total_records: i64, - #[prost(int64, tag = "5")] - pub total_bytes: i64, -} -/// -/// A particular stream or split associated with a flight. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct FlightEndpoint { - /// - /// Token used to retrieve this stream. - #[prost(message, optional, tag = "1")] - pub ticket: ::core::option::Option, - /// - /// A list of URIs where this ticket can be redeemed. If the list is - /// empty, the expectation is that the ticket can only be redeemed on the - /// current service where the ticket was generated. - #[prost(message, repeated, tag = "2")] - pub location: ::prost::alloc::vec::Vec, -} -/// -/// A location where a Flight service will accept retrieval of a particular -/// stream given a ticket. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Location { - #[prost(string, tag = "1")] - pub uri: ::prost::alloc::string::String, -} -/// -/// An opaque identifier that the service can use to retrieve a particular -/// portion of a stream. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Ticket { - #[prost(bytes = "vec", tag = "1")] - pub ticket: ::prost::alloc::vec::Vec, -} -/// -/// A batch of Arrow data as part of a stream of batches. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct FlightData { - /// - /// The descriptor of the data. This is only relevant when a client is - /// starting a new DoPut stream. - #[prost(message, optional, tag = "1")] - pub flight_descriptor: ::core::option::Option, - /// - /// Header for message data as described in Message.fbs::Message. - #[prost(bytes = "vec", tag = "2")] - pub data_header: ::prost::alloc::vec::Vec, - /// - /// Application-defined metadata. - #[prost(bytes = "vec", tag = "3")] - pub app_metadata: ::prost::alloc::vec::Vec, - /// - /// The actual batch of Arrow data. Preferably handled with minimal-copies - /// coming last in the definition to help with sidecar patterns (it is - /// expected that some implementations will fetch this field off the wire - /// with specialized code to avoid extra memory copies). - #[prost(bytes = "vec", tag = "1000")] - pub data_body: ::prost::alloc::vec::Vec, -} -///* -/// The response message associated with the submission of a DoPut. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct PutResult { - #[prost(bytes = "vec", tag = "1")] - pub app_metadata: ::prost::alloc::vec::Vec, -} -#[doc = r" Generated client implementations."] -pub mod flight_service_client { - #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::*; - #[doc = ""] - #[doc = " A flight service is an endpoint for retrieving or storing Arrow data. A"] - #[doc = " flight service can expose one or more predefined endpoints that can be"] - #[doc = " accessed using the Arrow Flight Protocol. Additionally, a flight service"] - #[doc = " can expose a set of actions that are available."] - #[derive(Debug, Clone)] - pub struct FlightServiceClient { - inner: tonic::client::Grpc, - } - impl FlightServiceClient { - #[doc = r" Attempt to create a new client by connecting to a given endpoint."] - pub async fn connect(dst: D) -> Result - where - D: std::convert::TryInto, - D::Error: Into, - { - let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; - Ok(Self::new(conn)) - } - } - impl FlightServiceClient - where - T: tonic::client::GrpcService, - T::ResponseBody: Body + Send + Sync + 'static, - T::Error: Into, - ::Error: Into + Send, - { - pub fn new(inner: T) -> Self { - let inner = tonic::client::Grpc::new(inner); - Self { inner } - } - pub fn with_interceptor( - inner: T, - interceptor: F, - ) -> FlightServiceClient> - where - F: tonic::service::Interceptor, - T: tonic::codegen::Service< - http::Request, - Response = http::Response< - >::ResponseBody, - >, - >, - >>::Error: - Into + Send + Sync, - { - FlightServiceClient::new(InterceptedService::new(inner, interceptor)) - } - #[doc = r" Compress requests with `gzip`."] - #[doc = r""] - #[doc = r" This requires the server to support it otherwise it might respond with an"] - #[doc = r" error."] - pub fn send_gzip(mut self) -> Self { - self.inner = self.inner.send_gzip(); - self - } - #[doc = r" Enable decompressing responses with `gzip`."] - pub fn accept_gzip(mut self) -> Self { - self.inner = self.inner.accept_gzip(); - self - } - #[doc = ""] - #[doc = " Handshake between client and server. Depending on the server, the"] - #[doc = " handshake may be required to determine the token that should be used for"] - #[doc = " future operations. Both request and response are streams to allow multiple"] - #[doc = " round-trips depending on auth mechanism."] - pub async fn handshake( - &mut self, - request: impl tonic::IntoStreamingRequest, - ) -> Result< - tonic::Response>, - tonic::Status, - > { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/arrow.flight.protocol.FlightService/Handshake", - ); - self.inner - .streaming(request.into_streaming_request(), path, codec) - .await - } - #[doc = ""] - #[doc = " Get a list of available streams given a particular criteria. Most flight"] - #[doc = " services will expose one or more streams that are readily available for"] - #[doc = " retrieval. This api allows listing the streams available for"] - #[doc = " consumption. A user can also provide a criteria. The criteria can limit"] - #[doc = " the subset of streams that can be listed via this interface. Each flight"] - #[doc = " service allows its own definition of how to consume criteria."] - pub async fn list_flights( - &mut self, - request: impl tonic::IntoRequest, - ) -> Result< - tonic::Response>, - tonic::Status, - > { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/arrow.flight.protocol.FlightService/ListFlights", - ); - self.inner - .server_streaming(request.into_request(), path, codec) - .await - } - #[doc = ""] - #[doc = " For a given FlightDescriptor, get information about how the flight can be"] - #[doc = " consumed. This is a useful interface if the consumer of the interface"] - #[doc = " already can identify the specific flight to consume. This interface can"] - #[doc = " also allow a consumer to generate a flight stream through a specified"] - #[doc = " descriptor. For example, a flight descriptor might be something that"] - #[doc = " includes a SQL statement or a Pickled Python operation that will be"] - #[doc = " executed. In those cases, the descriptor will not be previously available"] - #[doc = " within the list of available streams provided by ListFlights but will be"] - #[doc = " available for consumption for the duration defined by the specific flight"] - #[doc = " service."] - pub async fn get_flight_info( - &mut self, - request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/arrow.flight.protocol.FlightService/GetFlightInfo", - ); - self.inner.unary(request.into_request(), path, codec).await - } - #[doc = ""] - #[doc = " For a given FlightDescriptor, get the Schema as described in Schema.fbs::Schema"] - #[doc = " This is used when a consumer needs the Schema of flight stream. Similar to"] - #[doc = " GetFlightInfo this interface may generate a new flight that was not previously"] - #[doc = " available in ListFlights."] - pub async fn get_schema( - &mut self, - request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/arrow.flight.protocol.FlightService/GetSchema", - ); - self.inner.unary(request.into_request(), path, codec).await - } - #[doc = ""] - #[doc = " Retrieve a single stream associated with a particular descriptor"] - #[doc = " associated with the referenced ticket. A Flight can be composed of one or"] - #[doc = " more streams where each stream can be retrieved using a separate opaque"] - #[doc = " ticket that the flight service uses for managing a collection of streams."] - pub async fn do_get( - &mut self, - request: impl tonic::IntoRequest, - ) -> Result< - tonic::Response>, - tonic::Status, - > { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/arrow.flight.protocol.FlightService/DoGet", - ); - self.inner - .server_streaming(request.into_request(), path, codec) - .await - } - #[doc = ""] - #[doc = " Push a stream to the flight service associated with a particular"] - #[doc = " flight stream. This allows a client of a flight service to upload a stream"] - #[doc = " of data. Depending on the particular flight service, a client consumer"] - #[doc = " could be allowed to upload a single stream per descriptor or an unlimited"] - #[doc = " number. In the latter, the service might implement a 'seal' action that"] - #[doc = " can be applied to a descriptor once all streams are uploaded."] - pub async fn do_put( - &mut self, - request: impl tonic::IntoStreamingRequest, - ) -> Result< - tonic::Response>, - tonic::Status, - > { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/arrow.flight.protocol.FlightService/DoPut", - ); - self.inner - .streaming(request.into_streaming_request(), path, codec) - .await - } - #[doc = ""] - #[doc = " Open a bidirectional data channel for a given descriptor. This"] - #[doc = " allows clients to send and receive arbitrary Arrow data and"] - #[doc = " application-specific metadata in a single logical stream. In"] - #[doc = " contrast to DoGet/DoPut, this is more suited for clients"] - #[doc = " offloading computation (rather than storage) to a Flight service."] - pub async fn do_exchange( - &mut self, - request: impl tonic::IntoStreamingRequest, - ) -> Result< - tonic::Response>, - tonic::Status, - > { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/arrow.flight.protocol.FlightService/DoExchange", - ); - self.inner - .streaming(request.into_streaming_request(), path, codec) - .await - } - #[doc = ""] - #[doc = " Flight services can support an arbitrary number of simple actions in"] - #[doc = " addition to the possible ListFlights, GetFlightInfo, DoGet, DoPut"] - #[doc = " operations that are potentially available. DoAction allows a flight client"] - #[doc = " to do a specific action against a flight service. An action includes"] - #[doc = " opaque request and response objects that are specific to the type action"] - #[doc = " being undertaken."] - pub async fn do_action( - &mut self, - request: impl tonic::IntoRequest, - ) -> Result>, tonic::Status> - { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/arrow.flight.protocol.FlightService/DoAction", - ); - self.inner - .server_streaming(request.into_request(), path, codec) - .await - } - #[doc = ""] - #[doc = " A flight service exposes all of the available action types that it has"] - #[doc = " along with descriptions. This allows different flight consumers to"] - #[doc = " understand the capabilities of the flight service."] - pub async fn list_actions( - &mut self, - request: impl tonic::IntoRequest, - ) -> Result< - tonic::Response>, - tonic::Status, - > { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/arrow.flight.protocol.FlightService/ListActions", - ); - self.inner - .server_streaming(request.into_request(), path, codec) - .await - } - } -} -#[doc = r" Generated server implementations."] -pub mod flight_service_server { - #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::*; - #[doc = "Generated trait containing gRPC methods that should be implemented for use with FlightServiceServer."] - #[async_trait] - pub trait FlightService: Send + Sync + 'static { - #[doc = "Server streaming response type for the Handshake method."] - type HandshakeStream: futures_core::Stream> - + Send - + Sync - + 'static; - #[doc = ""] - #[doc = " Handshake between client and server. Depending on the server, the"] - #[doc = " handshake may be required to determine the token that should be used for"] - #[doc = " future operations. Both request and response are streams to allow multiple"] - #[doc = " round-trips depending on auth mechanism."] - async fn handshake( - &self, - request: tonic::Request>, - ) -> Result, tonic::Status>; - #[doc = "Server streaming response type for the ListFlights method."] - type ListFlightsStream: futures_core::Stream> - + Send - + Sync - + 'static; - #[doc = ""] - #[doc = " Get a list of available streams given a particular criteria. Most flight"] - #[doc = " services will expose one or more streams that are readily available for"] - #[doc = " retrieval. This api allows listing the streams available for"] - #[doc = " consumption. A user can also provide a criteria. The criteria can limit"] - #[doc = " the subset of streams that can be listed via this interface. Each flight"] - #[doc = " service allows its own definition of how to consume criteria."] - async fn list_flights( - &self, - request: tonic::Request, - ) -> Result, tonic::Status>; - #[doc = ""] - #[doc = " For a given FlightDescriptor, get information about how the flight can be"] - #[doc = " consumed. This is a useful interface if the consumer of the interface"] - #[doc = " already can identify the specific flight to consume. This interface can"] - #[doc = " also allow a consumer to generate a flight stream through a specified"] - #[doc = " descriptor. For example, a flight descriptor might be something that"] - #[doc = " includes a SQL statement or a Pickled Python operation that will be"] - #[doc = " executed. In those cases, the descriptor will not be previously available"] - #[doc = " within the list of available streams provided by ListFlights but will be"] - #[doc = " available for consumption for the duration defined by the specific flight"] - #[doc = " service."] - async fn get_flight_info( - &self, - request: tonic::Request, - ) -> Result, tonic::Status>; - #[doc = ""] - #[doc = " For a given FlightDescriptor, get the Schema as described in Schema.fbs::Schema"] - #[doc = " This is used when a consumer needs the Schema of flight stream. Similar to"] - #[doc = " GetFlightInfo this interface may generate a new flight that was not previously"] - #[doc = " available in ListFlights."] - async fn get_schema( - &self, - request: tonic::Request, - ) -> Result, tonic::Status>; - #[doc = "Server streaming response type for the DoGet method."] - type DoGetStream: futures_core::Stream> - + Send - + Sync - + 'static; - #[doc = ""] - #[doc = " Retrieve a single stream associated with a particular descriptor"] - #[doc = " associated with the referenced ticket. A Flight can be composed of one or"] - #[doc = " more streams where each stream can be retrieved using a separate opaque"] - #[doc = " ticket that the flight service uses for managing a collection of streams."] - async fn do_get( - &self, - request: tonic::Request, - ) -> Result, tonic::Status>; - #[doc = "Server streaming response type for the DoPut method."] - type DoPutStream: futures_core::Stream> - + Send - + Sync - + 'static; - #[doc = ""] - #[doc = " Push a stream to the flight service associated with a particular"] - #[doc = " flight stream. This allows a client of a flight service to upload a stream"] - #[doc = " of data. Depending on the particular flight service, a client consumer"] - #[doc = " could be allowed to upload a single stream per descriptor or an unlimited"] - #[doc = " number. In the latter, the service might implement a 'seal' action that"] - #[doc = " can be applied to a descriptor once all streams are uploaded."] - async fn do_put( - &self, - request: tonic::Request>, - ) -> Result, tonic::Status>; - #[doc = "Server streaming response type for the DoExchange method."] - type DoExchangeStream: futures_core::Stream> - + Send - + Sync - + 'static; - #[doc = ""] - #[doc = " Open a bidirectional data channel for a given descriptor. This"] - #[doc = " allows clients to send and receive arbitrary Arrow data and"] - #[doc = " application-specific metadata in a single logical stream. In"] - #[doc = " contrast to DoGet/DoPut, this is more suited for clients"] - #[doc = " offloading computation (rather than storage) to a Flight service."] - async fn do_exchange( - &self, - request: tonic::Request>, - ) -> Result, tonic::Status>; - #[doc = "Server streaming response type for the DoAction method."] - type DoActionStream: futures_core::Stream> - + Send - + Sync - + 'static; - #[doc = ""] - #[doc = " Flight services can support an arbitrary number of simple actions in"] - #[doc = " addition to the possible ListFlights, GetFlightInfo, DoGet, DoPut"] - #[doc = " operations that are potentially available. DoAction allows a flight client"] - #[doc = " to do a specific action against a flight service. An action includes"] - #[doc = " opaque request and response objects that are specific to the type action"] - #[doc = " being undertaken."] - async fn do_action( - &self, - request: tonic::Request, - ) -> Result, tonic::Status>; - #[doc = "Server streaming response type for the ListActions method."] - type ListActionsStream: futures_core::Stream> - + Send - + Sync - + 'static; - #[doc = ""] - #[doc = " A flight service exposes all of the available action types that it has"] - #[doc = " along with descriptions. This allows different flight consumers to"] - #[doc = " understand the capabilities of the flight service."] - async fn list_actions( - &self, - request: tonic::Request, - ) -> Result, tonic::Status>; - } - #[doc = ""] - #[doc = " A flight service is an endpoint for retrieving or storing Arrow data. A"] - #[doc = " flight service can expose one or more predefined endpoints that can be"] - #[doc = " accessed using the Arrow Flight Protocol. Additionally, a flight service"] - #[doc = " can expose a set of actions that are available."] - #[derive(Debug)] - pub struct FlightServiceServer { - inner: _Inner, - accept_compression_encodings: (), - send_compression_encodings: (), - } - struct _Inner(Arc); - impl FlightServiceServer { - pub fn new(inner: T) -> Self { - let inner = Arc::new(inner); - let inner = _Inner(inner); - Self { - inner, - accept_compression_encodings: Default::default(), - send_compression_encodings: Default::default(), - } - } - pub fn with_interceptor( - inner: T, - interceptor: F, - ) -> InterceptedService - where - F: tonic::service::Interceptor, - { - InterceptedService::new(Self::new(inner), interceptor) - } - } - impl tonic::codegen::Service> for FlightServiceServer - where - T: FlightService, - B: Body + Send + Sync + 'static, - B::Error: Into + Send + 'static, - { - type Response = http::Response; - type Error = Never; - type Future = BoxFuture; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - fn call(&mut self, req: http::Request) -> Self::Future { - let inner = self.inner.clone(); - match req.uri().path() { - "/arrow.flight.protocol.FlightService/Handshake" => { - #[allow(non_camel_case_types)] - struct HandshakeSvc(pub Arc); - impl - tonic::server::StreamingService - for HandshakeSvc - { - type Response = super::HandshakeResponse; - type ResponseStream = T::HandshakeStream; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request< - tonic::Streaming, - >, - ) -> Self::Future { - let inner = self.0.clone(); - let fut = async move { (*inner).handshake(request).await }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = HandshakeSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); - let res = grpc.streaming(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - "/arrow.flight.protocol.FlightService/ListFlights" => { - #[allow(non_camel_case_types)] - struct ListFlightsSvc(pub Arc); - impl - tonic::server::ServerStreamingService - for ListFlightsSvc - { - type Response = super::FlightInfo; - type ResponseStream = T::ListFlightsStream; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = self.0.clone(); - let fut = async move { (*inner).list_flights(request).await }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = ListFlightsSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); - let res = grpc.server_streaming(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - "/arrow.flight.protocol.FlightService/GetFlightInfo" => { - #[allow(non_camel_case_types)] - struct GetFlightInfoSvc(pub Arc); - impl - tonic::server::UnaryService - for GetFlightInfoSvc - { - type Response = super::FlightInfo; - type Future = - BoxFuture, tonic::Status>; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = self.0.clone(); - let fut = - async move { (*inner).get_flight_info(request).await }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = GetFlightInfoSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); - let res = grpc.unary(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - "/arrow.flight.protocol.FlightService/GetSchema" => { - #[allow(non_camel_case_types)] - struct GetSchemaSvc(pub Arc); - impl - tonic::server::UnaryService - for GetSchemaSvc - { - type Response = super::SchemaResult; - type Future = - BoxFuture, tonic::Status>; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = self.0.clone(); - let fut = async move { (*inner).get_schema(request).await }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = GetSchemaSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); - let res = grpc.unary(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - "/arrow.flight.protocol.FlightService/DoGet" => { - #[allow(non_camel_case_types)] - struct DoGetSvc(pub Arc); - impl - tonic::server::ServerStreamingService - for DoGetSvc - { - type Response = super::FlightData; - type ResponseStream = T::DoGetStream; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = self.0.clone(); - let fut = async move { (*inner).do_get(request).await }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = DoGetSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); - let res = grpc.server_streaming(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - "/arrow.flight.protocol.FlightService/DoPut" => { - #[allow(non_camel_case_types)] - struct DoPutSvc(pub Arc); - impl - tonic::server::StreamingService - for DoPutSvc - { - type Response = super::PutResult; - type ResponseStream = T::DoPutStream; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request>, - ) -> Self::Future { - let inner = self.0.clone(); - let fut = async move { (*inner).do_put(request).await }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = DoPutSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); - let res = grpc.streaming(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - "/arrow.flight.protocol.FlightService/DoExchange" => { - #[allow(non_camel_case_types)] - struct DoExchangeSvc(pub Arc); - impl - tonic::server::StreamingService - for DoExchangeSvc - { - type Response = super::FlightData; - type ResponseStream = T::DoExchangeStream; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request>, - ) -> Self::Future { - let inner = self.0.clone(); - let fut = async move { (*inner).do_exchange(request).await }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = DoExchangeSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); - let res = grpc.streaming(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - "/arrow.flight.protocol.FlightService/DoAction" => { - #[allow(non_camel_case_types)] - struct DoActionSvc(pub Arc); - impl - tonic::server::ServerStreamingService - for DoActionSvc - { - type Response = super::Result; - type ResponseStream = T::DoActionStream; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = self.0.clone(); - let fut = async move { (*inner).do_action(request).await }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = DoActionSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); - let res = grpc.server_streaming(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - "/arrow.flight.protocol.FlightService/ListActions" => { - #[allow(non_camel_case_types)] - struct ListActionsSvc(pub Arc); - impl - tonic::server::ServerStreamingService - for ListActionsSvc - { - type Response = super::ActionType; - type ResponseStream = T::ListActionsStream; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = self.0.clone(); - let fut = async move { (*inner).list_actions(request).await }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = ListActionsSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); - let res = grpc.server_streaming(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - _ => Box::pin(async move { - Ok(http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap()) - }), - } - } - } - impl Clone for FlightServiceServer { - fn clone(&self) -> Self { - let inner = self.inner.clone(); - Self { - inner, - accept_compression_encodings: self.accept_compression_encodings, - send_compression_encodings: self.send_compression_encodings, - } - } - } - impl Clone for _Inner { - fn clone(&self) -> Self { - Self(self.0.clone()) - } - } - impl std::fmt::Debug for _Inner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } - } - impl tonic::transport::NamedService for FlightServiceServer { - const NAME: &'static str = "arrow.flight.protocol.FlightService"; - } -} \ No newline at end of file diff --git a/arrow-flight/src/lib.rs b/arrow-flight/src/lib.rs deleted file mode 100644 index 6af2e748678..00000000000 --- a/arrow-flight/src/lib.rs +++ /dev/null @@ -1,20 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -include!("arrow.flight.protocol.rs"); - -pub mod utils; diff --git a/benches/avro_read.rs b/benches/avro_read.rs new file mode 100644 index 00000000000..23a9e9e1602 --- /dev/null +++ b/benches/avro_read.rs @@ -0,0 +1,83 @@ +use std::io::Cursor; +use std::sync::Arc; + +use avro_rs::types::Record; +use criterion::*; + +use arrow2::error::Result; +use arrow2::io::avro::read; +use avro_rs::*; +use avro_rs::{Codec, Schema as AvroSchema}; + +fn schema() -> AvroSchema { + let raw_schema = r#" + { + "type": "record", + "name": "test", + "fields": [ + {"name": "a", "type": "string"} + ] + } +"#; + AvroSchema::parse_str(raw_schema).unwrap() +} + +fn write(size: usize, has_codec: bool) -> Result> { + let avro = schema(); + // a writer needs a schema and something to write to + let mut writer: Writer>; + if has_codec { + writer = Writer::with_codec(&avro, Vec::new(), Codec::Deflate); + } else { + writer = Writer::new(&avro, Vec::new()); + } + + (0..size).for_each(|_| { + let mut record = Record::new(writer.schema()).unwrap(); + record.put("a", "foo"); + writer.append(record).unwrap(); + }); + + Ok(writer.into_inner().unwrap()) +} + +fn read_batch(buffer: &[u8], size: usize) -> Result<()> { + let mut file = Cursor::new(buffer); + + let (avro_schema, schema, codec, file_marker) = read::read_metadata(&mut file)?; + + let reader = read::Reader::new( + read::Decompressor::new( + read::BlockStreamIterator::new(&mut file, file_marker), + codec, + ), + avro_schema, + Arc::new(schema), + ); + + let mut rows = 0; + for maybe_batch in reader { + let batch = maybe_batch?; + rows += batch.num_rows(); + } + assert_eq!(rows, size); + Ok(()) +} + +fn add_benchmark(c: &mut Criterion) { + let mut group = c.benchmark_group("avro_read"); + + for log2_size in (10..=20).step_by(2) { + let size = 2usize.pow(log2_size); + let buffer = write(size, false).unwrap(); + + group.throughput(Throughput::Elements(size as u64)); + + group.bench_with_input(BenchmarkId::new("utf8", log2_size), &buffer, |b, buffer| { + b.iter(|| read_batch(buffer, size).unwrap()) + }); + } +} + +criterion_group!(benches, add_benchmark); +criterion_main!(benches); diff --git a/benches/comparison_kernels.rs b/benches/comparison_kernels.rs index 843570d3943..95a00326f44 100644 --- a/benches/comparison_kernels.rs +++ b/benches/comparison_kernels.rs @@ -40,7 +40,7 @@ fn add_benchmark(c: &mut Criterion) { b.iter(|| bench_op(&arr_a, &arr_b, Operator::Eq)) }); c.bench_function(&format!("bool scalar 2^{}", log2_size), |b| { - b.iter(|| bench_op_scalar(&arr_a, &BooleanScalar::from(Some(true)), Operator::Eq)) + b.iter(|| bench_op_scalar(&arr_a, &BooleanScalar::from(Some(false)), Operator::Eq)) }); let arr_a = create_string_array::(size, 4, 0.1, 42); diff --git a/benches/read_parquet.rs b/benches/read_parquet.rs index 55ce25fc96d..8f536ed6842 100644 --- a/benches/read_parquet.rs +++ b/benches/read_parquet.rs @@ -6,9 +6,18 @@ use criterion::{criterion_group, criterion_main, Criterion}; use arrow2::error::Result; use arrow2::io::parquet::read; -fn to_buffer(size: usize) -> Vec { +fn to_buffer(size: usize, dict: bool, multi_page: bool, compressed: bool) -> Vec { let dir = env!("CARGO_MANIFEST_DIR"); - let path = PathBuf::from(dir).join(format!("fixtures/pyarrow3/v1/benches_{}.parquet", size)); + + let dict = if dict { "dict/" } else { "" }; + let multi_page = if multi_page { "multi/" } else { "" }; + let compressed = if compressed { "snappy/" } else { "" }; + + let path = PathBuf::from(dir).join(format!( + "fixtures/pyarrow3/v1/{}{}{}benches_{}.parquet", + dict, multi_page, compressed, size + )); + let metadata = fs::metadata(&path).expect("unable to read metadata"); let mut file = fs::File::open(path).unwrap(); let mut buffer = vec![0; metadata.len() as usize]; @@ -16,7 +25,7 @@ fn to_buffer(size: usize) -> Vec { buffer } -fn read_decompressed_pages(buffer: &[u8], size: usize, column: usize) -> Result<()> { +fn read_batch(buffer: &[u8], size: usize, column: usize) -> Result<()> { let file = Cursor::new(buffer); let reader = read::RecordReader::try_new(file, Some(vec![column]), None, None, None)?; @@ -31,26 +40,38 @@ fn read_decompressed_pages(buffer: &[u8], size: usize, column: usize) -> Result< fn add_benchmark(c: &mut Criterion) { (10..=20).step_by(2).for_each(|i| { let size = 2usize.pow(i); - let buffer = to_buffer(size); + let buffer = to_buffer(size, false, false, false); let a = format!("read i64 2^{}", i); - c.bench_function(&a, |b| { - b.iter(|| read_decompressed_pages(&buffer, size * 8, 0).unwrap()) - }); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap())); let a = format!("read utf8 2^{}", i); - c.bench_function(&a, |b| { - b.iter(|| read_decompressed_pages(&buffer, size * 8, 2).unwrap()) - }); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap())); let a = format!("read utf8 large 2^{}", i); - c.bench_function(&a, |b| { - b.iter(|| read_decompressed_pages(&buffer, size * 8, 6).unwrap()) - }); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 6).unwrap())); let a = format!("read bool 2^{}", i); - c.bench_function(&a, |b| { - b.iter(|| read_decompressed_pages(&buffer, size * 8, 3).unwrap()) - }); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 3).unwrap())); + + let buffer = to_buffer(size, true, false, false); + let a = format!("read utf8 dict 2^{}", i); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap())); + + let buffer = to_buffer(size, false, false, true); + let a = format!("read i64 snappy 2^{}", i); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap())); + + let buffer = to_buffer(size, false, true, false); + let a = format!("read utf8 multi 2^{}", i); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap())); + + let buffer = to_buffer(size, false, true, true); + let a = format!("read utf8 multi snappy 2^{}", i); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 2).unwrap())); + + let buffer = to_buffer(size, false, true, true); + let a = format!("read i64 multi snappy 2^{}", i); + c.bench_function(&a, |b| b.iter(|| read_batch(&buffer, size, 0).unwrap())); }); } diff --git a/examples/parquet_read.rs b/examples/parquet_read.rs index 1e0183ba1e3..8ab3ccdac2c 100644 --- a/examples/parquet_read.rs +++ b/examples/parquet_read.rs @@ -1,11 +1,12 @@ use std::fs::File; +use std::io::BufReader; use arrow2::io::parquet::read; use arrow2::{array::Array, error::Result}; fn read_column_chunk(path: &str, row_group: usize, column: usize) -> Result> { // Open a file, a common operation in Rust - let mut file = File::open(path)?; + let mut file = BufReader::new(File::open(path)?); // Read the files' metadata. This has a small IO cost because it requires seeking to the end // of the file to read its footer. diff --git a/examples/parquet_read_parallel.rs b/examples/parquet_read_parallel.rs index defaf6bb411..9dead6c43ac 100644 --- a/examples/parquet_read_parallel.rs +++ b/examples/parquet_read_parallel.rs @@ -47,16 +47,14 @@ fn parallel_read(path: &str) -> Result>> { let metadata_consumer = file_metadata.clone(); let arrow_schema_consumer = arrow_schema.clone(); let child = thread::spawn(move || { - let (column, row_group, iter) = rx_consumer.recv().unwrap(); + let (column, row_group, pages) = rx_consumer.recv().unwrap(); let start = SystemTime::now(); println!("consumer start - {} {}", column, row_group); let metadata = metadata_consumer.row_groups[row_group].column(column); let data_type = arrow_schema_consumer.fields()[column].data_type().clone(); - let pages = iter - .into_iter() - .map(|x| x.and_then(|x| read::decompress(x, &mut vec![]))); - let mut pages = read::streaming_iterator::convert(pages); + let mut pages = read::BasicDecompressor::new(pages.into_iter(), vec![]); + let array = read::page_iter_to_array(&mut pages, metadata, data_type); println!( "consumer end - {:?}: {} {}", diff --git a/examples/parquet_write.rs b/examples/parquet_write.rs index d6df8d736c2..888fce79242 100644 --- a/examples/parquet_write.rs +++ b/examples/parquet_write.rs @@ -1,13 +1,15 @@ use std::fs::File; use std::iter::once; +use arrow2::error::ArrowError; use arrow2::io::parquet::write::to_parquet_schema; use arrow2::{ array::{Array, Int32Array}, datatypes::{Field, Schema}, error::Result, io::parquet::write::{ - array_to_page, write_file, Compression, DynIter, Encoding, Version, WriteOptions, + array_to_pages, write_file, Compression, Compressor, DynIter, DynStreamingIterator, + Encoding, FallibleStreamingIterator, Version, WriteOptions, }, }; @@ -24,17 +26,22 @@ fn write_single_array(path: &str, array: &dyn Array, field: Field) -> Result<()> // map arrow fields to parquet fields let parquet_schema = to_parquet_schema(&schema)?; - // Declare the row group iterator. This must be an iterator of iterators of iterators: - // * first iterator of row groups - // * second iterator of column chunks - // * third iterator of pages - // an array can be divided in multiple pages via `.slice(offset, length)` (`O(1)`). - // All column chunks within a row group MUST have the same length. - let row_groups = once(Result::Ok(DynIter::new(once(Ok(DynIter::new( - once(array) - .zip(parquet_schema.columns().to_vec().into_iter()) - .map(|(array, descriptor)| array_to_page(array, descriptor, options, encoding)), - )))))); + let descriptor = parquet_schema.columns()[0].clone(); + + // Declare the row group iterator. This must be an iterator of iterators of streaming iterators + // * first iterator over row groups + let row_groups = once(Result::Ok(DynIter::new( + // * second iterator over column chunks (we assume no struct arrays -> `once` column) + once( + // * third iterator over (compressed) pages; dictionary encoding may lead to multiple pages per array. + array_to_pages(array, descriptor, options, encoding).map(move |pages| { + let encoded_pages = DynIter::new(pages.map(|x| Ok(x?))); + let compressed_pages = Compressor::new(encoded_pages, options.compression, vec![]) + .map_err(ArrowError::from); + DynStreamingIterator::new(compressed_pages) + }), + ), + ))); // Create a new empty file let mut file = File::create(path)?; diff --git a/examples/parquet_write_parallel/src/main.rs b/examples/parquet_write_parallel/src/main.rs index 204de4e835e..415da272c59 100644 --- a/examples/parquet_write_parallel/src/main.rs +++ b/examples/parquet_write_parallel/src/main.rs @@ -35,20 +35,21 @@ fn parallel_write(path: &str, batch: &RecordBatch) -> Result<()> { .zip(parquet_schema.columns().to_vec().into_par_iter()) .zip(encodings) .map(|((array, descriptor), encoding)| { - let array = array.clone(); - // create encoded and compressed pages this column - Ok(array_to_pages(array, descriptor, options, encoding)?.collect::>()) + let encoded_pages = array_to_pages(array.as_ref(), descriptor, options, encoding)?; + encoded_pages + .map(|page| compress(page?, vec![], options.compression).map_err(|x| x.into())) + .collect::>>() }) - .collect::>>()?; + .collect::>>>()?; // create the iterator over groups (one in this case) // (for more batches, create the iterator from them here) - let row_groups = std::iter::once(Result::Ok(DynIter::new( - columns - .into_iter() - .map(|column| Ok(DynIter::new(column.into_iter()))), - ))); + let row_groups = std::iter::once(Result::Ok(DynIter::new(columns.iter().map(|column| { + Ok(DynStreamingIterator::new( + fallible_streaming_iterator::convert(column.iter().map(Ok)), + )) + })))); // Create a new empty file let mut file = std::fs::File::create(path)?; diff --git a/examples/s3/Cargo.toml b/examples/s3/Cargo.toml index a238257f904..ed47cc23aea 100644 --- a/examples/s3/Cargo.toml +++ b/examples/s3/Cargo.toml @@ -5,6 +5,6 @@ edition = "2018" [dependencies] arrow2 = { path = "../../", default-features = false, features = ["io_parquet", "io_parquet_compression"] } -rust-s3 = { version = "0.27.0-rc4", features = ["tokio"] } +rust-s3 = { version = "0.27.0", features = ["tokio"] } futures = "0.3" tokio = { version = "1.0.0", features = ["macros", "rt-multi-thread"] } diff --git a/examples/s3/src/main.rs b/examples/s3/src/main.rs index db98240f408..4e1f1d70ad2 100644 --- a/examples/s3/src/main.rs +++ b/examples/s3/src/main.rs @@ -50,7 +50,8 @@ async fn main() -> Result<()> { // pages of the first row group and first column // This is IO bounded and SHOULD be done in a shared thread pool (e.g. Tokio) - let pages = get_page_stream(&metadata, 0, 0, &mut reader, vec![]).await?; + let column_metadata = &metadata.row_groups[0].columns()[0]; + let pages = get_page_stream(column_metadata, &mut reader, None, vec![]).await?; // decompress the pages. This is CPU bounded and SHOULD be done in a dedicated thread pool (e.g. Rayon) let pages = pages.map(|compressed_page| decompress(compressed_page?, &mut vec![])); diff --git a/integration-testing/Cargo.toml b/integration-testing/Cargo.toml index e07ae2c141f..a9487cb37bb 100644 --- a/integration-testing/Cargo.toml +++ b/integration-testing/Cargo.toml @@ -18,10 +18,8 @@ [package] name = "arrow-integration-testing" description = "Binaries used in the Arrow integration tests" -version = "4.0.0-SNAPSHOT" -homepage = "https://github.com/apache/arrow" -repository = "https://github.com/apache/arrow" -authors = ["Apache Arrow "] +version = "0.1.0" +authors = ["Jorge C Leitao", "Apache Arrow "] license = "Apache-2.0" edition = "2018" publish = false @@ -30,16 +28,16 @@ publish = false logging = ["tracing-subscriber"] [dependencies] -arrow2 = { path = "../", features = ["io_ipc", "io_ipc_compression", "io_json_integration"], default-features = false } -arrow-flight = { path = "../arrow-flight" } +arrow2 = { path = "../", features = ["io_ipc", "io_ipc_compression", "io_flight", "io_json_integration"] } +arrow-format = { version = "0.3.0", features = ["full"] } async-trait = "0.1.41" clap = "2.33" futures = "0.3" hex = "0.4" -prost = "0.8" +prost = "0.9" serde = { version = "1.0", features = ["rc"] } serde_derive = "1.0" serde_json = { version = "1.0", features = ["preserve_order"] } tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] } -tonic = "0.5.2" -tracing-subscriber = { version = "0.2.15", optional = true } +tonic = "0.6.0" +tracing-subscriber = { version = "0.3.1", optional = true } diff --git a/integration-testing/src/bin/flight-test-integration-client.rs b/integration-testing/src/bin/flight-test-integration-client.rs index 1901553109f..b7bb5d1d79f 100644 --- a/integration-testing/src/bin/flight-test-integration-client.rs +++ b/integration-testing/src/bin/flight-test-integration-client.rs @@ -42,9 +42,7 @@ async fn main() -> Result { let port = matches.value_of("port").expect("Port is required"); match matches.value_of("scenario") { - Some("middleware") => { - flight_client_scenarios::middleware::run_scenario(host, port).await? - } + Some("middleware") => flight_client_scenarios::middleware::run_scenario(host, port).await?, Some("auth:basic_proto") => { flight_client_scenarios::auth_basic_proto::run_scenario(host, port).await? } @@ -53,8 +51,7 @@ async fn main() -> Result { let path = matches .value_of("path") .expect("Path is required if scenario is not specified"); - flight_client_scenarios::integration_test::run_scenario(host, port, path) - .await?; + flight_client_scenarios::integration_test::run_scenario(host, port, path).await?; } } diff --git a/integration-testing/src/bin/flight-test-integration-server.rs b/integration-testing/src/bin/flight-test-integration-server.rs index b1b280743c3..45a08080499 100644 --- a/integration-testing/src/bin/flight-test-integration-server.rs +++ b/integration-testing/src/bin/flight-test-integration-server.rs @@ -40,9 +40,7 @@ async fn main() -> Result { let port = matches.value_of("port").unwrap_or("0"); match matches.value_of("scenario") { - Some("middleware") => { - flight_server_scenarios::middleware::scenario_setup(port).await? - } + Some("middleware") => flight_server_scenarios::middleware::scenario_setup(port).await?, Some("auth:basic_proto") => { flight_server_scenarios::auth_basic_proto::scenario_setup(port).await? } diff --git a/integration-testing/src/flight_client_scenarios/auth_basic_proto.rs b/integration-testing/src/flight_client_scenarios/auth_basic_proto.rs index 5e8cd467198..58258e2164c 100644 --- a/integration-testing/src/flight_client_scenarios/auth_basic_proto.rs +++ b/integration-testing/src/flight_client_scenarios/auth_basic_proto.rs @@ -17,9 +17,8 @@ use crate::{AUTH_PASSWORD, AUTH_USERNAME}; -use arrow_flight::{ - flight_service_client::FlightServiceClient, BasicAuth, HandshakeRequest, -}; +use arrow_format::flight::data::{Action, BasicAuth, HandshakeRequest}; +use arrow_format::flight::service::flight_service_client::FlightServiceClient; use futures::{stream, StreamExt}; use prost::Message; use tonic::{metadata::MetadataValue, Request, Status}; @@ -33,7 +32,7 @@ pub async fn run_scenario(host: &str, port: &str) -> Result { let url = format!("http://{}:{}", host, port); let mut client = FlightServiceClient::connect(url).await?; - let action = arrow_flight::Action::default(); + let action = Action::default(); let resp = client.do_action(Request::new(action.clone())).await; // This client is unauthenticated and should fail. @@ -80,11 +79,7 @@ pub async fn run_scenario(host: &str, port: &str) -> Result { Ok(()) } -async fn authenticate( - client: &mut Client, - username: &str, - password: &str, -) -> Result { +async fn authenticate(client: &mut Client, username: &str, password: &str) -> Result { let auth = BasicAuth { username: username.into(), password: password.into(), diff --git a/integration-testing/src/flight_client_scenarios/integration_test.rs b/integration-testing/src/flight_client_scenarios/integration_test.rs index 820c82114b1..7c67b34096e 100644 --- a/integration-testing/src/flight_client_scenarios/integration_test.rs +++ b/integration-testing/src/flight_client_scenarios/integration_test.rs @@ -20,14 +20,16 @@ use crate::{read_json_file, ArrowFile}; use arrow2::{ array::*, datatypes::*, - io::ipc, - io::ipc::{gen::Message::MessageHeader, read, write}, + io::flight::{self, deserialize_batch, serialize_batch}, + io::ipc::{read, write}, record_batch::RecordBatch, }; -use arrow_flight::{ - flight_descriptor::DescriptorType, flight_service_client::FlightServiceClient, - utils::flight_data_to_arrow_batch, FlightData, FlightDescriptor, Location, Ticket, +use arrow_format::flight::data::{ + flight_descriptor::DescriptorType, FlightData, FlightDescriptor, Location, Ticket, }; +use arrow_format::flight::service::flight_service_client::FlightServiceClient; +use arrow_format::ipc; +use arrow_format::ipc::Message::MessageHeader; use futures::{channel::mpsc, sink::SinkExt, stream, StreamExt}; use tonic::{Request, Streaming}; @@ -77,10 +79,9 @@ async fn upload_data( let (mut upload_tx, upload_rx) = mpsc::channel(10); let options = write::IpcWriteOptions::default(); - let mut schema_flight_data = - arrow_flight::utils::flight_data_from_arrow_schema(&schema, &options); - schema_flight_data.flight_descriptor = Some(descriptor.clone()); - upload_tx.send(schema_flight_data).await?; + let mut schema = flight::serialize_schema(&schema, &options); + schema.flight_descriptor = Some(descriptor.clone()); + upload_tx.send(schema).await?; let mut original_data_iter = original_data.iter().enumerate(); @@ -130,8 +131,7 @@ async fn send_batch( batch: &RecordBatch, options: &write::IpcWriteOptions, ) -> Result { - let (dictionary_flight_data, mut batch_flight_data) = - arrow_flight::utils::flight_data_from_arrow_batch(batch, options); + let (dictionary_flight_data, mut batch_flight_data) = serialize_batch(batch, options); upload_tx .send_all(&mut stream::iter(dictionary_flight_data).map(Ok)) @@ -214,9 +214,8 @@ async fn consume_flight_location( let metadata = counter.to_string().into_bytes(); assert_eq!(metadata, data.app_metadata); - let actual_batch = - flight_data_to_arrow_batch(&data, schema.clone(), true, &dictionaries_by_field) - .expect("Unable to convert flight data to Arrow batch"); + let actual_batch = deserialize_batch(&data, schema.clone(), true, &dictionaries_by_field) + .expect("Unable to convert flight data to Arrow batch"); assert_eq!(expected_batch.schema(), actual_batch.schema()); assert_eq!(expected_batch.num_columns(), actual_batch.num_columns()); @@ -249,7 +248,7 @@ async fn receive_batch_flight_data( ) -> Option { let mut data = resp.next().await?.ok()?; let mut message = - ipc::root_as_message(&data.data_header[..]).expect("Error parsing first message"); + ipc::Message::root_as_message(&data.data_header[..]).expect("Error parsing first message"); while message.header_type() == MessageHeader::DictionaryBatch { let mut reader = std::io::Cursor::new(&data.data_body); @@ -266,7 +265,8 @@ async fn receive_batch_flight_data( .expect("Error reading dictionary"); data = resp.next().await?.ok()?; - message = ipc::root_as_message(&data.data_header[..]).expect("Error parsing message"); + message = + ipc::Message::root_as_message(&data.data_header[..]).expect("Error parsing message"); } Some(data) diff --git a/integration-testing/src/flight_client_scenarios/middleware.rs b/integration-testing/src/flight_client_scenarios/middleware.rs index cbca879dca5..f67580ada87 100644 --- a/integration-testing/src/flight_client_scenarios/middleware.rs +++ b/integration-testing/src/flight_client_scenarios/middleware.rs @@ -15,10 +15,8 @@ // specific language governing permissions and limitations // under the License. -use arrow_flight::{ - flight_descriptor::DescriptorType, flight_service_client::FlightServiceClient, - FlightDescriptor, -}; +use arrow_format::flight::data::{flight_descriptor::DescriptorType, FlightDescriptor}; +use arrow_format::flight::service::flight_service_client::FlightServiceClient; use tonic::{Request, Status}; type Error = Box; diff --git a/integration-testing/src/flight_server_scenarios.rs b/integration-testing/src/flight_server_scenarios.rs index 9163b692086..a8aab14712e 100644 --- a/integration-testing/src/flight_server_scenarios.rs +++ b/integration-testing/src/flight_server_scenarios.rs @@ -17,7 +17,7 @@ use std::net::SocketAddr; -use arrow_flight::{FlightEndpoint, Location, Ticket}; +use arrow_format::flight::data::{FlightEndpoint, Location, Ticket}; use tokio::net::TcpListener; pub mod auth_basic_proto; diff --git a/integration-testing/src/flight_server_scenarios/auth_basic_proto.rs b/integration-testing/src/flight_server_scenarios/auth_basic_proto.rs index ea7ad3c3385..baa181e4efc 100644 --- a/integration-testing/src/flight_server_scenarios/auth_basic_proto.rs +++ b/integration-testing/src/flight_server_scenarios/auth_basic_proto.rs @@ -18,16 +18,12 @@ use std::pin::Pin; use std::sync::Arc; -use arrow_flight::{ - flight_service_server::FlightService, flight_service_server::FlightServiceServer, - Action, ActionType, BasicAuth, Criteria, Empty, FlightData, FlightDescriptor, - FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaResult, Ticket, -}; +use arrow_format::flight::data::*; +use arrow_format::flight::service::flight_service_server::{FlightService, FlightServiceServer}; use futures::{channel::mpsc, sink::SinkExt, Stream, StreamExt}; use tokio::sync::Mutex; -use tonic::{ - metadata::MetadataMap, transport::Server, Request, Response, Status, Streaming, -}; +use tonic::{metadata::MetadataMap, transport::Server, Request, Response, Status, Streaming}; + type TonicStream = Pin + Send + Sync + 'static>>; type Error = Box; @@ -62,10 +58,7 @@ pub struct AuthBasicProtoScenarioImpl { } impl AuthBasicProtoScenarioImpl { - async fn check_auth( - &self, - metadata: &MetadataMap, - ) -> Result { + async fn check_auth(&self, metadata: &MetadataMap) -> Result { let token = metadata .get_bin("auth-token-bin") .and_then(|v| v.to_bytes().ok()) @@ -73,10 +66,7 @@ impl AuthBasicProtoScenarioImpl { self.is_valid(token).await } - async fn is_valid( - &self, - token: Option, - ) -> Result { + async fn is_valid(&self, token: Option) -> Result { match token { Some(t) if t == *self.username => Ok(GrpcServerCallContext { peer_identity: self.username.to_string(), @@ -102,7 +92,7 @@ impl FlightService for AuthBasicProtoScenarioImpl { type ListFlightsStream = TonicStream>; type DoGetStream = TonicStream>; type DoPutStream = TonicStream>; - type DoActionStream = TonicStream>; + type DoActionStream = TonicStream>; type ListActionsStream = TonicStream>; type DoExchangeStream = TonicStream>; @@ -141,12 +131,10 @@ impl FlightService for AuthBasicProtoScenarioImpl { let req = req.expect("Error reading handshake request"); let HandshakeRequest { payload, .. } = req; - let auth = BasicAuth::decode(&*payload) - .expect("Error parsing handshake request"); + let auth = + BasicAuth::decode(&*payload).expect("Error parsing handshake request"); - let resp = if *auth.username == *username - && *auth.password == *password - { + let resp = if *auth.username == *username && *auth.password == *password { Ok(HandshakeResponse { payload: username.as_bytes().to_vec(), ..HandshakeResponse::default() @@ -191,7 +179,8 @@ impl FlightService for AuthBasicProtoScenarioImpl { &self, request: Request>, ) -> Result, Status> { - self.check_auth(request.metadata()).await?; + let metadata = request.metadata(); + self.check_auth(metadata).await?; Err(Status::unimplemented("Not yet implemented")) } @@ -202,7 +191,7 @@ impl FlightService for AuthBasicProtoScenarioImpl { let flight_context = self.check_auth(request.metadata()).await?; // Respond with the authenticated username. let buf = flight_context.peer_identity().as_bytes().to_vec(); - let result = arrow_flight::Result { body: buf }; + let result = arrow_format::flight::data::Result { body: buf }; let output = futures::stream::once(async { Ok(result) }); Ok(Response::new(Box::pin(output) as Self::DoActionStream)) } @@ -219,7 +208,8 @@ impl FlightService for AuthBasicProtoScenarioImpl { &self, request: Request>, ) -> Result, Status> { - self.check_auth(request.metadata()).await?; + let metadata = request.metadata(); + self.check_auth(metadata).await?; Err(Status::unimplemented("Not yet implemented")) } } diff --git a/integration-testing/src/flight_server_scenarios/integration_test.rs b/integration-testing/src/flight_server_scenarios/integration_test.rs index 16954647090..4d3349bff44 100644 --- a/integration-testing/src/flight_server_scenarios/integration_test.rs +++ b/integration-testing/src/flight_server_scenarios/integration_test.rs @@ -20,17 +20,18 @@ use std::convert::TryFrom; use std::pin::Pin; use std::sync::Arc; +use arrow2::io::flight::{serialize_batch, serialize_schema}; +use arrow_format::flight::data::*; +use arrow_format::flight::data::flight_descriptor::*; +use arrow_format::flight::service::flight_service_server::*; +use arrow_format::ipc::Message::{root_as_message, Message, MessageHeader}; +use arrow_format::ipc::Schema as ArrowSchema; + use arrow2::{ - array::Array, - datatypes::*, - io::ipc, - io::ipc::gen::Message::{Message, MessageHeader}, - io::ipc::gen::Schema::MetadataVersion, + array::Array, datatypes::*, io::flight::serialize_schema_to_info, io::ipc, record_batch::RecordBatch, }; -use arrow_flight::flight_descriptor::*; -use arrow_flight::flight_service_server::*; -use arrow_flight::*; + use futures::{channel::mpsc, sink::SinkExt, Stream, StreamExt}; use tokio::sync::Mutex; use tonic::{transport::Server, Request, Response, Status, Streaming}; @@ -81,7 +82,7 @@ impl FlightService for FlightServiceImpl { type ListFlightsStream = TonicStream>; type DoGetStream = TonicStream>; type DoPutStream = TonicStream>; - type DoActionStream = TonicStream>; + type DoActionStream = TonicStream>; type ListActionsStream = TonicStream>; type DoExchangeStream = TonicStream>; @@ -109,12 +110,7 @@ impl FlightService for FlightServiceImpl { let options = ipc::write::IpcWriteOptions::default(); - let schema = std::iter::once({ - Ok(arrow_flight::utils::flight_data_from_arrow_schema( - &flight.schema, - &options, - )) - }); + let schema = std::iter::once(Ok(serialize_schema(&flight.schema, &options))); let batches = flight .chunks @@ -122,7 +118,7 @@ impl FlightService for FlightServiceImpl { .enumerate() .flat_map(|(counter, batch)| { let (dictionary_flight_data, mut batch_flight_data) = - arrow_flight::utils::flight_data_from_arrow_batch(batch, &options); + serialize_batch(batch, &options); // Only the record batch's FlightData gets app_metadata let metadata = counter.to_string().into_bytes(); @@ -176,12 +172,10 @@ impl FlightService for FlightServiceImpl { let total_records: usize = flight.chunks.iter().map(|chunk| chunk.num_rows()).sum(); let options = ipc::write::IpcWriteOptions::default(); - let schema = - arrow_flight::utils::ipc_message_from_arrow_schema(&flight.schema, &options) - .expect( - "Could not generate schema bytes from schema stored by a DoPut; \ + let schema = serialize_schema_to_info(&flight.schema, &options).expect( + "Could not generate schema bytes from schema stored by a DoPut; \ this should be impossible", - ); + ); let info = FlightInfo { schema, @@ -296,7 +290,7 @@ async fn record_batch_from_message( None, true, dictionaries_by_field, - MetadataVersion::V5, + ArrowSchema::MetadataVersion::V5, &mut reader, 0, ); @@ -343,7 +337,7 @@ async fn save_uploaded_chunks( let mut dictionaries_by_field = vec![None; schema_ref.fields().len()]; while let Some(Ok(data)) = input_stream.next().await { - let message = ipc::root_as_message(&data.data_header[..]) + let message = root_as_message(&data.data_header[..]) .map_err(|e| Status::internal(format!("Could not parse message: {:?}", e)))?; match message.header_type() { diff --git a/integration-testing/src/flight_server_scenarios/middleware.rs b/integration-testing/src/flight_server_scenarios/middleware.rs index 1416acc4088..ceffe5a8c9b 100644 --- a/integration-testing/src/flight_server_scenarios/middleware.rs +++ b/integration-testing/src/flight_server_scenarios/middleware.rs @@ -17,12 +17,9 @@ use std::pin::Pin; -use arrow_flight::{ - flight_descriptor::DescriptorType, flight_service_server::FlightService, - flight_service_server::FlightServiceServer, Action, ActionType, Criteria, Empty, - FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, - PutResult, SchemaResult, Ticket, -}; +use arrow_format::flight::data::*; +use arrow_format::flight::data::flight_descriptor::DescriptorType; +use arrow_format::flight::service::flight_service_server::{FlightService, FlightServiceServer}; use futures::Stream; use tonic::{transport::Server, Request, Response, Status, Streaming}; @@ -53,7 +50,7 @@ impl FlightService for MiddlewareScenarioImpl { type ListFlightsStream = TonicStream>; type DoGetStream = TonicStream>; type DoPutStream = TonicStream>; - type DoActionStream = TonicStream>; + type DoActionStream = TonicStream>; type ListActionsStream = TonicStream>; type DoExchangeStream = TonicStream>; @@ -93,8 +90,7 @@ impl FlightService for MiddlewareScenarioImpl { let descriptor = request.into_inner(); - if descriptor.r#type == DescriptorType::Cmd as i32 && descriptor.cmd == b"success" - { + if descriptor.r#type == DescriptorType::Cmd as i32 && descriptor.cmd == b"success" { // Return a fake location - the test doesn't read it let endpoint = super::endpoint("foo", "grpc+tcp://localhost:10010"); diff --git a/parquet_integration/bench_read.py b/parquet_integration/bench_read.py deleted file mode 100644 index f1db81addee..00000000000 --- a/parquet_integration/bench_read.py +++ /dev/null @@ -1,26 +0,0 @@ -import timeit -import io - -import pyarrow.parquet - - -def bench(log2_size: int, datatype: str): - with open(f"fixtures/pyarrow3/v1/benches_{2**log2_size}.parquet", "rb") as f: - data = f.read() - data = io.BytesIO(data) - - def f(): - pyarrow.parquet.read_table(data, columns=[datatype]) - - seconds = timeit.Timer(f).timeit(number=512) / 512 - microseconds = seconds * 1000 * 1000 - print(f"read {datatype} 2^{log2_size} time: {microseconds:.2f} us") - -#for i in range(10, 22, 2): -# bench(i, "int64") - -for i in range(10, 22, 2): - bench(i, "string") - -for i in range(10, 22, 2): - bench(i, "bool") diff --git a/parquet_integration/bench_write.py b/parquet_integration/bench_write.py deleted file mode 100644 index 2c47912205c..00000000000 --- a/parquet_integration/bench_write.py +++ /dev/null @@ -1,74 +0,0 @@ -""" -Benchmark of writing a pyarrow table of size N to parquet. -""" -import io -import os -import timeit - -import numpy -import pyarrow.parquet - - -def case_basic_nullable(size = 1): - int64 = [0, 1, None, 3, None, 5, 6, 7, None, 9] - float64 = [0.0, 1.0, None, 3.0, None, 5.0, 6.0, 7.0, None, 9.0] - string = ["Hello", None, "aa", "", None, "abc", None, None, "def", "aaa"] - boolean = [True, None, False, False, None, True, None, None, True, True] - - fields = [ - pa.field('int64', pa.int64()), - pa.field('float64', pa.float64()), - pa.field('string', pa.utf8()), - pa.field('bool', pa.bool_()), - pa.field('date', pa.timestamp('ms')), - pa.field('uint32', pa.uint32()), - ] - schema = pa.schema(fields) - - return { - "int64": int64 * size, - "float64": float64 * size, - "string": string * size, - "bool": boolean * size, - "date": int64 * size, - "uint32": int64 * size, - }, schema, f"basic_nullable_{size*10}.parquet" - -def bench(log2_size: int, datatype: str): - - if datatype == 'int64': - data = [0, 1, None, 3, 4, 5, 6, 7] * 128 # 1024 entries - field = pyarrow.field('int64', pyarrow.int64()) - elif datatype == 'utf8': - # 4 each because our own benches also use 4 - data = ["aaaa", "aaab", None, "aaac", "aaad", "aaae", "aaaf", "aaag"] * 128 # 1024 entries - field = pyarrow.field('utf8', pyarrow.utf8()) - elif datatype == 'bool': - data = [True, False, None, True, False, True, True, True] * 128 # 1024 entries - field = pyarrow.field('bool', pyarrow.bool_()) - - data = data * 2**log2_size - - t = pyarrow.table([data], schema=pyarrow.schema([field])) - - def f(): - pyarrow.parquet.write_table(t, - io.BytesIO(), - use_dictionary=False, - compression=None, - write_statistics=False, - data_page_size=2**40, # i.e. a large number to ensure a single page - data_page_version="1.0") - - seconds = timeit.Timer(f).timeit(number=512) / 512 - microseconds = seconds * 1000 * 1000 - print(f"write {datatype} 2^{10 + log2_size} time: {microseconds:.2f} us") - -for i in range(0, 12, 2): - bench(i, "int64") - -for i in range(0, 12, 2): - bench(i, "utf8") - -for i in range(0, 12, 2): - bench(i, "bool") diff --git a/parquet_integration/write_parquet.py b/parquet_integration/write_parquet.py index 0d9e556216d..4aef907c614 100644 --- a/parquet_integration/write_parquet.py +++ b/parquet_integration/write_parquet.py @@ -11,7 +11,9 @@ def case_basic_nullable(size=1): float64 = [0.0, 1.0, None, 3.0, None, 5.0, 6.0, 7.0, None, 9.0] string = ["Hello", None, "aa", "", None, "abc", None, None, "def", "aaa"] boolean = [True, None, False, False, None, True, None, None, True, True] - string_large = ["ABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCD😃🌚🕳👊"] * 10 + string_large = [ + "ABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCDABCD😃🌚🕳👊" + ] * 10 decimal = [Decimal(e) if e is not None else None for e in int64] fields = [ @@ -23,9 +25,9 @@ def case_basic_nullable(size=1): pa.field("uint32", pa.uint32()), pa.field("string_large", pa.utf8()), # decimal testing - pa.field("decimal_9", pa.decimal128(9,0)), - pa.field("decimal_18", pa.decimal128(18,0)), - pa.field("decimal_26", pa.decimal128(26,0)), + pa.field("decimal_9", pa.decimal128(9, 0)), + pa.field("decimal_18", pa.decimal128(18, 0)), + pa.field("decimal_26", pa.decimal128(26, 0)), ] schema = pa.schema(fields) @@ -67,9 +69,9 @@ def case_basic_required(size=1): nullable=False, ), pa.field("uint32", pa.uint32(), nullable=False), - pa.field("decimal_9", pa.decimal128(9,0), nullable=False), - pa.field("decimal_18", pa.decimal128(18,0), nullable=False), - pa.field("decimal_26", pa.decimal128(26,0), nullable=False), + pa.field("decimal_9", pa.decimal128(9, 0), nullable=False), + pa.field("decimal_18", pa.decimal128(18, 0), nullable=False), + pa.field("decimal_26", pa.decimal128(26, 0), nullable=False), ] schema = pa.schema(fields) @@ -112,6 +114,26 @@ def case_nested(size): [[4, 5], [6]], [], [[7], None, [9]], + [[], [None], None], + [[10]], + ] + items_required_nested = [ + [[0, 1]], + None, + [[2, 3], [3]], + [[4, 5], [6]], + [], + [[7], None, [9]], + None, + [[10]], + ] + items_required_nested_2 = [ + [[0, 1]], + None, + [[2, 3], [3]], + [[4, 5], [6]], + [], + [[7], [8], [9]], None, [[10]], ] @@ -138,6 +160,10 @@ def case_nested(size): pa.field("list_utf8", pa.list_(pa.utf8())), pa.field("list_large_binary", pa.list_(pa.large_binary())), pa.field("list_nested_i64", pa.list_(pa.list_(pa.int64()))), + pa.field("list_nested_inner_required_i64", pa.list_(pa.list_(pa.int64()))), + pa.field( + "list_nested_inner_required_required_i64", pa.list_(pa.list_(pa.int64())) + ), ] schema = pa.schema(fields) return ( @@ -150,19 +176,44 @@ def case_nested(size): "list_utf8": string * size, "list_large_binary": string * size, "list_nested_i64": items_nested * size, + "list_nested_inner_required_i64": items_required_nested * size, + "list_nested_inner_required_required_i64": items_required_nested_2 * size, }, schema, f"nested_nullable_{size*10}.parquet", ) -def write_pyarrow(case, size=1, page_version=1, use_dictionary=False): +def write_pyarrow( + case, + size: int, + page_version: int, + use_dictionary: bool, + multiple_pages: bool, + compression: bool, +): data, schema, path = case(size) base_path = f"{PYARROW_PATH}/v{page_version}" if use_dictionary: base_path = f"{base_path}/dict" + if multiple_pages: + base_path = f"{base_path}/multi" + + if compression: + base_path = f"{base_path}/snappy" + + if compression: + compression = "snappy" + else: + compression = None + + if multiple_pages: + data_page_size = 2 ** 10 # i.e. a small number to ensure multiple pages + else: + data_page_size = 2 ** 40 # i.e. a large number to ensure a single page + t = pa.table(data, schema=schema) os.makedirs(base_path, exist_ok=True) pa.parquet.write_table( @@ -170,9 +221,9 @@ def write_pyarrow(case, size=1, page_version=1, use_dictionary=False): f"{base_path}/{path}", row_group_size=2 ** 40, use_dictionary=use_dictionary, - compression=None, + compression=compression, write_statistics=True, - data_page_size=2 ** 40, # i.e. a large number to ensure a single page + data_page_size=data_page_size, data_page_version=f"{page_version}.0", ) @@ -180,18 +231,26 @@ def write_pyarrow(case, size=1, page_version=1, use_dictionary=False): for case in [case_basic_nullable, case_basic_required, case_nested]: for version in [1, 2]: for use_dict in [True, False]: - write_pyarrow(case, 1, version, use_dict) + write_pyarrow(case, 1, version, use_dict, False, False) def case_benches(size): assert size % 8 == 0 - size //= 8 - data, schema, path = case_basic_nullable(1) + data, schema, _ = case_basic_nullable(1) for k in data: - data[k] = data[k][:8] * size + data[k] = data[k][:8] * (size // 8) return data, schema, f"benches_{size}.parquet" # for read benchmarks -for i in range(3 + 10, 3 + 22, 2): - write_pyarrow(case_benches, 2 ** i, 1) # V1 +for i in range(10, 22, 2): + # two pages (dict) + write_pyarrow(case_benches, 2 ** i, 1, True, False, False) + # single page + write_pyarrow(case_benches, 2 ** i, 1, False, False, False) + # multiple pages + write_pyarrow(case_benches, 2 ** i, 1, False, True, False) + # multiple compressed pages + write_pyarrow(case_benches, 2 ** i, 1, False, True, True) + # single compressed page + write_pyarrow(case_benches, 2 ** i, 1, False, False, True) diff --git a/src/array/display.rs b/src/array/display.rs index 886f6c1bc79..ef65bb8aa95 100644 --- a/src/array/display.rs +++ b/src/array/display.rs @@ -209,14 +209,9 @@ pub fn get_value_display<'a>(array: &'a dyn Array) -> Box Strin } Union(_, _, _) => { let array = array.as_any().downcast_ref::().unwrap(); - let displays = array - .fields() - .iter() - .map(|x| get_display(x.as_ref())) - .collect::>(); Box::new(move |row: usize| { let (field, index) = array.index(row); - displays[field](index) + get_display(array.fields()[field].as_ref())(index) }) } Extension(_, _, _) => todo!(), diff --git a/src/array/fixed_size_binary/mod.rs b/src/array/fixed_size_binary/mod.rs index 3091aae3b62..78db064ced5 100644 --- a/src/array/fixed_size_binary/mod.rs +++ b/src/array/fixed_size_binary/mod.rs @@ -34,7 +34,7 @@ impl FixedSizeBinaryArray { /// Returns a new [`FixedSizeBinaryArray`]. pub fn from_data(data_type: DataType, values: Buffer, validity: Option) -> Self { - let size = *Self::get_size(&data_type) as usize; + let size = Self::get_size(&data_type); assert_eq!(values.len() % size, 0); @@ -135,9 +135,9 @@ impl FixedSizeBinaryArray { } impl FixedSizeBinaryArray { - pub(crate) fn get_size(data_type: &DataType) -> &i32 { + pub(crate) fn get_size(data_type: &DataType) -> usize { match data_type.to_logical_type() { - DataType::FixedSizeBinary(size) => size, + DataType::FixedSizeBinary(size) => *size, _ => panic!("Wrong DataType"), } } diff --git a/src/array/fixed_size_binary/mutable.rs b/src/array/fixed_size_binary/mutable.rs index 2e0d218015e..46a4bcc143a 100644 --- a/src/array/fixed_size_binary/mutable.rs +++ b/src/array/fixed_size_binary/mutable.rs @@ -39,7 +39,7 @@ impl MutableFixedSizeBinaryArray { values: MutableBuffer, validity: Option, ) -> Self { - let size = *FixedSizeBinaryArray::get_size(&data_type) as usize; + let size = FixedSizeBinaryArray::get_size(&data_type); assert_eq!( values.len() % size, 0, @@ -68,7 +68,7 @@ impl MutableFixedSizeBinaryArray { /// Creates a new [`MutableFixedSizeBinaryArray`] with capacity for `capacity` entries. pub fn with_capacity(size: usize, capacity: usize) -> Self { Self::from_data( - DataType::FixedSizeBinary(size as i32), + DataType::FixedSizeBinary(size), MutableBuffer::::with_capacity(capacity * size), None, ) @@ -189,7 +189,7 @@ impl MutableArray for MutableFixedSizeBinaryArray { fn as_box(&mut self) -> Box { Box::new(FixedSizeBinaryArray::from_data( - DataType::FixedSizeBinary(self.size as i32), + DataType::FixedSizeBinary(self.size), std::mem::take(&mut self.values).into(), std::mem::take(&mut self.validity).map(|x| x.into()), )) @@ -197,7 +197,7 @@ impl MutableArray for MutableFixedSizeBinaryArray { fn as_arc(&mut self) -> Arc { Arc::new(FixedSizeBinaryArray::from_data( - DataType::FixedSizeBinary(self.size as i32), + DataType::FixedSizeBinary(self.size), std::mem::take(&mut self.values).into(), std::mem::take(&mut self.validity).map(|x| x.into()), )) diff --git a/src/array/fixed_size_list/mod.rs b/src/array/fixed_size_list/mod.rs index 5b78569eed4..a291fd38e9b 100644 --- a/src/array/fixed_size_list/mod.rs +++ b/src/array/fixed_size_list/mod.rs @@ -154,7 +154,7 @@ impl FixedSizeListArray { /// Returns a [`DataType`] consistent with [`FixedSizeListArray`]. pub fn default_datatype(data_type: DataType, size: usize) -> DataType { let field = Box::new(Field::new("item", data_type, true)); - DataType::FixedSizeList(field, size as i32) + DataType::FixedSizeList(field, size) } } diff --git a/src/array/growable/fixed_binary.rs b/src/array/growable/fixed_binary.rs index b9472a972b3..3abdc67fdbd 100644 --- a/src/array/growable/fixed_binary.rs +++ b/src/array/growable/fixed_binary.rs @@ -40,7 +40,7 @@ impl<'a> GrowableFixedSizeBinary<'a> { .map(|array| build_extend_null_bits(*array, use_validity)) .collect(); - let size = *FixedSizeBinaryArray::get_size(arrays[0].data_type()) as usize; + let size = FixedSizeBinaryArray::get_size(arrays[0].data_type()); Self { arrays, values: MutableBuffer::with_capacity(0), diff --git a/src/array/specification.rs b/src/array/specification.rs index ceedb46b51f..1edcb70653f 100644 --- a/src/array/specification.rs +++ b/src/array/specification.rs @@ -75,28 +75,45 @@ pub fn check_offsets_minimal(offsets: &[O], values_len: usize) -> usi /// * any slice of `values` between two consecutive pairs from `offsets` is invalid `utf8`, or /// * any offset is larger or equal to `values_len`. pub fn check_offsets_and_utf8(offsets: &[O], values: &[u8]) { - offsets.windows(2).for_each(|window| { - let start = window[0].to_usize(); - let end = window[1].to_usize(); - // assert monotonicity - assert!(start <= end); - // assert bounds - let slice = &values[start..end]; - // assert utf8 - simdutf8::basic::from_utf8(slice).expect("A non-utf8 string was passed."); - }); + const SIMD_CHUNK_SIZE: usize = 64; + + if values.is_ascii() { + check_offsets(offsets, values.len()); + } else { + offsets.windows(2).for_each(|window| { + let start = window[0].to_usize(); + let end = window[1].to_usize(); + // assert monotonicity + assert!(start <= end); + // assert bounds + let slice = &values[start..end]; + + // Fast ASCII check per item + if slice.len() < SIMD_CHUNK_SIZE && slice.is_ascii() { + return; + } + + // assert utf8 + simdutf8::basic::from_utf8(slice).expect("A non-utf8 string was passed."); + }); + } } /// # Panics iff: /// * the `offsets` is not monotonically increasing, or /// * any offset is larger or equal to `values_len`. pub fn check_offsets(offsets: &[O], values_len: usize) { - offsets.windows(2).for_each(|window| { - let start = window[0].to_usize(); - let end = window[1].to_usize(); - // assert monotonicity - assert!(start <= end); - // assert bound - assert!(end <= values_len); - }); + if offsets.is_empty() { + return; + } + + let mut last = offsets[0]; + // assert monotonicity + assert!(offsets.iter().skip(1).all(|&end| { + let monotone = last <= end; + last = end; + monotone + })); + // assert bounds + assert!(last.to_usize() <= values_len); } diff --git a/src/array/union/mod.rs b/src/array/union/mod.rs index b5a6a1d2605..77574b1706f 100644 --- a/src/array/union/mod.rs +++ b/src/array/union/mod.rs @@ -4,7 +4,7 @@ use crate::{ array::{display::get_value_display, display_fmt, new_empty_array, new_null_array, Array}, bitmap::Bitmap, buffer::Buffer, - datatypes::{DataType, Field}, + datatypes::{DataType, Field, UnionMode}, scalar::{new_scalar, Scalar}, }; @@ -37,13 +37,13 @@ pub struct UnionArray { impl UnionArray { /// Creates a new null [`UnionArray`]. pub fn new_null(data_type: DataType, length: usize) -> Self { - if let DataType::Union(f, _, is_sparse) = &data_type { + if let DataType::Union(f, _, mode) = &data_type { let fields = f .iter() .map(|x| new_null_array(x.data_type().clone(), length).into()) .collect(); - let offsets = if *is_sparse { + let offsets = if mode.is_sparse() { None } else { Some((0..length as i32).collect::>()) @@ -60,13 +60,13 @@ impl UnionArray { /// Creates a new empty [`UnionArray`]. pub fn new_empty(data_type: DataType) -> Self { - if let DataType::Union(f, _, is_sparse) = &data_type { + if let DataType::Union(f, _, mode) = &data_type { let fields = f .iter() .map(|x| new_empty_array(x.data_type().clone()).into()) .collect(); - let offsets = if *is_sparse { + let offsets = if mode.is_sparse() { None } else { Some(Buffer::new()) @@ -92,7 +92,7 @@ impl UnionArray { fields: Vec>, offsets: Option>, ) -> Self { - let (f, ids, is_sparse) = Self::get_all(&data_type); + let (f, ids, mode) = Self::get_all(&data_type); if f.len() != fields.len() { panic!("The number of `fields` must equal the number of fields in the Union DataType") @@ -104,7 +104,7 @@ impl UnionArray { if !same_data_types { panic!("All fields' datatype in the union must equal the datatypes on the fields.") } - if offsets.is_none() != is_sparse { + if offsets.is_none() != mode.is_sparse() { panic!("Sparsness flag must equal to noness of offsets in UnionArray") } let fields_hash = ids.as_ref().map(|ids| { @@ -244,11 +244,9 @@ impl Array for UnionArray { } impl UnionArray { - fn get_all(data_type: &DataType) -> (&[Field], Option<&[i32]>, bool) { + fn get_all(data_type: &DataType) -> (&[Field], Option<&[i32]>, UnionMode) { match data_type.to_logical_type() { - DataType::Union(fields, ids, is_sparse) => { - (fields, ids.as_ref().map(|x| x.as_ref()), *is_sparse) - } + DataType::Union(fields, ids, mode) => (fields, ids.as_ref().map(|x| x.as_ref()), *mode), _ => panic!("Wrong datatype passed to UnionArray."), } } @@ -264,7 +262,7 @@ impl UnionArray { /// # Panic /// Panics iff `data_type`'s logical type is not [`DataType::Union`]. pub fn is_sparse(data_type: &DataType) -> bool { - Self::get_all(data_type).2 + Self::get_all(data_type).2.is_sparse() } } diff --git a/src/compute/comparison/boolean.rs b/src/compute/comparison/boolean.rs index 6371c77b96b..f9eb74928a4 100644 --- a/src/compute/comparison/boolean.rs +++ b/src/compute/comparison/boolean.rs @@ -82,7 +82,11 @@ pub fn eq(lhs: &BooleanArray, rhs: &BooleanArray) -> Result { /// Perform `left == right` operation on an array and a scalar value. pub fn eq_scalar(lhs: &BooleanArray, rhs: bool) -> BooleanArray { - compare_op_scalar(lhs, rhs, |a, b| !(a ^ b)) + if rhs { + lhs.clone() + } else { + compare_op_scalar(lhs, rhs, |a, _| !a) + } } /// Perform `left != right` operation on two arrays. @@ -92,7 +96,7 @@ pub fn neq(lhs: &BooleanArray, rhs: &BooleanArray) -> Result { /// Perform `left != right` operation on an array and a scalar value. pub fn neq_scalar(lhs: &BooleanArray, rhs: bool) -> BooleanArray { - compare_op_scalar(lhs, rhs, |a, b| a ^ b) + eq_scalar(lhs, !rhs) } /// Perform `left < right` operation on two arrays. @@ -102,7 +106,15 @@ pub fn lt(lhs: &BooleanArray, rhs: &BooleanArray) -> Result { /// Perform `left < right` operation on an array and a scalar value. pub fn lt_scalar(lhs: &BooleanArray, rhs: bool) -> BooleanArray { - compare_op_scalar(lhs, rhs, |a, b| !a & b) + if rhs { + compare_op_scalar(lhs, rhs, |a, _| !a) + } else { + BooleanArray::from_data( + DataType::Boolean, + Bitmap::new_zeroed(lhs.len()), + lhs.validity().cloned(), + ) + } } /// Perform `left <= right` operation on two arrays. @@ -113,7 +125,11 @@ pub fn lt_eq(lhs: &BooleanArray, rhs: &BooleanArray) -> Result { /// Perform `left <= right` operation on an array and a scalar value. /// Null values are less than non-null values. pub fn lt_eq_scalar(lhs: &BooleanArray, rhs: bool) -> BooleanArray { - compare_op_scalar(lhs, rhs, |a, b| !a | b) + if rhs { + compare_op_scalar(lhs, rhs, |_, _| 0b11111111) + } else { + compare_op_scalar(lhs, rhs, |a, _| !a) + } } /// Perform `left > right` operation on two arrays. Non-null values are greater than null @@ -125,7 +141,15 @@ pub fn gt(lhs: &BooleanArray, rhs: &BooleanArray) -> Result { /// Perform `left > right` operation on an array and a scalar value. /// Non-null values are greater than null values. pub fn gt_scalar(lhs: &BooleanArray, rhs: bool) -> BooleanArray { - compare_op_scalar(lhs, rhs, |a, b| a & !b) + if rhs { + BooleanArray::from_data( + DataType::Boolean, + Bitmap::new_zeroed(lhs.len()), + lhs.validity().cloned(), + ) + } else { + lhs.clone() + } } /// Perform `left >= right` operation on two arrays. Non-null values are greater than null @@ -137,7 +161,11 @@ pub fn gt_eq(lhs: &BooleanArray, rhs: &BooleanArray) -> Result { /// Perform `left >= right` operation on an array and a scalar value. /// Non-null values are greater than null values. pub fn gt_eq_scalar(lhs: &BooleanArray, rhs: bool) -> BooleanArray { - compare_op_scalar(lhs, rhs, |a, b| a | !b) + if rhs { + lhs.clone() + } else { + compare_op_scalar(lhs, rhs, |_, _| 0b11111111) + } } /// Compare two [`BooleanArray`]s using the given [`Operator`]. @@ -263,10 +291,45 @@ mod tests { } #[test] - fn test_lt_scalar() { + fn test_lt_scalar_true() { cmp_bool_scalar!(lt_scalar, &[false, true], true, &[true, false]); } + #[test] + fn test_lt_scalar_false() { + cmp_bool_scalar!(lt_scalar, &[false, true], false, &[false, false]); + } + + #[test] + fn test_lt_eq_scalar_true() { + cmp_bool_scalar!(lt_eq_scalar, &[false, true], true, &[true, true]); + } + + #[test] + fn test_lt_eq_scalar_false() { + cmp_bool_scalar!(lt_eq_scalar, &[false, true], false, &[true, false]); + } + + #[test] + fn test_gt_scalar_true() { + cmp_bool_scalar!(gt_scalar, &[false, true], true, &[false, false]); + } + + #[test] + fn test_gt_scalar_false() { + cmp_bool_scalar!(gt_scalar, &[false, true], false, &[false, true]); + } + + #[test] + fn test_gt_eq_scalar_true() { + cmp_bool_scalar!(gt_eq_scalar, &[false, true], true, &[false, true]); + } + + #[test] + fn test_gt_eq_scalar_false() { + cmp_bool_scalar!(gt_eq_scalar, &[false, true], false, &[true, true]); + } + #[test] fn eq_nulls() { cmp_bool_options!( diff --git a/src/datatypes/mod.rs b/src/datatypes/mod.rs index b1c01d83570..81ab5f71b96 100644 --- a/src/datatypes/mod.rs +++ b/src/datatypes/mod.rs @@ -74,7 +74,7 @@ pub enum DataType { Binary, /// Opaque binary data of fixed size. /// Enum parameter specifies the number of bytes per value. - FixedSizeBinary(i32), + FixedSizeBinary(usize), /// Opaque binary data of variable length and 64-bit offsets. LargeBinary, /// A variable-length string in Unicode with UTF-8 encoding. @@ -84,14 +84,14 @@ pub enum DataType { /// A list of some logical data type with variable length. List(Box), /// A list of some logical data type with fixed length. - FixedSizeList(Box, i32), + FixedSizeList(Box, usize), /// A list of some logical data type with variable length and 64-bit offsets. LargeList(Box), /// A nested datatype that contains a number of sub-fields. Struct(Vec), /// A nested datatype that can represent slots of differing types. - /// Third argument represents sparsness - Union(Vec, Option>, bool), + /// Third argument represents mode + Union(Vec, Option>, UnionMode), /// A nested type that is represented as /// /// List> @@ -144,6 +144,37 @@ impl std::fmt::Display for DataType { } } +/// Mode of [`DataType::Union`] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum UnionMode { + /// Dense union + Dense, + /// Sparse union + Sparse, +} + +impl UnionMode { + /// Constructs a [`UnionMode::Sparse`] if the input bool is true, + /// or otherwise constructs a [`UnionMode::Dense`] + pub fn sparse(is_sparse: bool) -> Self { + if is_sparse { + Self::Sparse + } else { + Self::Dense + } + } + + /// Returns whether the mode is sparse + pub fn is_sparse(&self) -> bool { + matches!(self, Self::Sparse) + } + + /// Returns whether the mode is dense + pub fn is_dense(&self) -> bool { + matches!(self, Self::Dense) + } +} + /// The time units defined in Arrow. #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub enum TimeUnit { diff --git a/src/doc/lib.md b/src/doc/lib.md index d37adce76ca..270fe26810e 100644 --- a/src/doc/lib.md +++ b/src/doc/lib.md @@ -77,6 +77,7 @@ functionality, such as: * `io_ipc_compression`: to read and write compressed Arrow IPC (v2) * `io_csv` to read and write CSV * `io_json` to read and write JSON +* `io_flight` to read and write to Arrow's Flight protocol * `io_parquet` to read and write parquet * `io_parquet_compression` to read and write compressed parquet * `io_print` to write batches to formatted ASCII tables diff --git a/src/ffi/schema.rs b/src/ffi/schema.rs index 578d5a49a20..5faf4f37188 100644 --- a/src/ffi/schema.rs +++ b/src/ffi/schema.rs @@ -1,7 +1,7 @@ use std::{collections::BTreeMap, convert::TryInto, ffi::CStr, ffi::CString, ptr}; use crate::{ - datatypes::{DataType, Extension, Field, IntervalUnit, Metadata, TimeUnit}, + datatypes::{DataType, Extension, Field, IntervalUnit, Metadata, TimeUnit, UnionMode}, error::{ArrowError, Result}, }; @@ -314,7 +314,7 @@ unsafe fn to_data_type(schema: &Ffi_ArrowSchema) -> Result { DataType::Decimal(precision, scale) } else if !parts.is_empty() && ((parts[0] == "+us") || (parts[0] == "+ud")) { // union - let is_sparse = parts[0] == "+us"; + let mode = UnionMode::sparse(parts[0] == "+us"); let type_ids = parts[1] .split(',') .map(|x| { @@ -326,7 +326,7 @@ unsafe fn to_data_type(schema: &Ffi_ArrowSchema) -> Result { let fields = (0..schema.n_children as usize) .map(|x| to_field(schema.child(x))) .collect::>>()?; - DataType::Union(fields, Some(type_ids), is_sparse) + DataType::Union(fields, Some(type_ids), mode) } else { return Err(ArrowError::Ffi(format!( "The datatype \"{}\" is still not supported in Rust implementation", @@ -380,10 +380,10 @@ fn to_format(data_type: &DataType) -> String { } DataType::Timestamp(unit, tz) => { let unit = match unit { - TimeUnit::Second => "s".to_string(), - TimeUnit::Millisecond => "m".to_string(), - TimeUnit::Microsecond => "u".to_string(), - TimeUnit::Nanosecond => "n".to_string(), + TimeUnit::Second => "s", + TimeUnit::Millisecond => "m", + TimeUnit::Microsecond => "u", + TimeUnit::Nanosecond => "n", }; format!( "ts{}:{}", @@ -397,8 +397,8 @@ fn to_format(data_type: &DataType) -> String { DataType::Struct(_) => "+s".to_string(), DataType::FixedSizeBinary(size) => format!("w{}", size), DataType::FixedSizeList(_, size) => format!("+w:{}", size), - DataType::Union(f, ids, is_sparse) => { - let sparsness = if *is_sparse { 's' } else { 'd' }; + DataType::Union(f, ids, mode) => { + let sparsness = if mode.is_sparse() { 's' } else { 'd' }; let mut r = format!("+u{}:", sparsness); let ids = if let Some(ids) = ids { ids.iter() diff --git a/src/io/avro/read/mod.rs b/src/io/avro/read/mod.rs index 19440ee1559..a0669507e13 100644 --- a/src/io/avro/read/mod.rs +++ b/src/io/avro/read/mod.rs @@ -4,7 +4,8 @@ use std::io::Read; use std::sync::Arc; use avro_rs::{Codec, Schema as AvroSchema}; -use streaming_iterator::StreamingIterator; +use fallible_streaming_iterator::FallibleStreamingIterator; +use libflate::deflate::Decoder; mod deserialize; mod nested; @@ -67,14 +68,19 @@ fn read_block(reader: &mut R, buf: &mut Vec, file_marker: [u8; 16]) Ok(rows) } -fn decompress_block(buf: &mut Vec, decompress: &mut Vec, codec: Codec) -> Result { +/// Decompresses an avro block. +/// Returns whether the buffers where swapped. +fn decompress_block(block: &mut Vec, decompress: &mut Vec, codec: Codec) -> Result { match codec { Codec::Null => { - std::mem::swap(buf, decompress); - Ok(false) + std::mem::swap(block, decompress); + Ok(true) } Codec::Deflate => { - todo!() + decompress.clear(); + let mut decoder = Decoder::new(&block[..]); + decoder.read_to_end(decompress)?; + Ok(false) } } } @@ -102,13 +108,14 @@ impl<'a, R: Read> BlockStreamIterator<'a, R> { } } -impl<'a, R: Read> StreamingIterator for BlockStreamIterator<'a, R> { +impl<'a, R: Read> FallibleStreamingIterator for BlockStreamIterator<'a, R> { + type Error = ArrowError; type Item = (Vec, usize); - fn advance(&mut self) { + fn advance(&mut self) -> Result<()> { let (buf, rows) = &mut self.buf; - // todo: surface this error - *rows = read_block(self.reader, buf, self.file_marker).unwrap(); + *rows = read_block(self.reader, buf, self.file_marker)?; + Ok(()) } fn get(&self) -> Option<&Self::Item> { @@ -140,17 +147,18 @@ impl<'a, R: Read> Decompressor<'a, R> { } } -impl<'a, R: Read> StreamingIterator for Decompressor<'a, R> { +impl<'a, R: Read> FallibleStreamingIterator for Decompressor<'a, R> { + type Error = ArrowError; type Item = (Vec, usize); - fn advance(&mut self) { + fn advance(&mut self) -> Result<()> { if self.was_swapped { std::mem::swap(self.blocks.buffer(), &mut self.buf.0); } - self.blocks.advance(); - self.was_swapped = - decompress_block(self.blocks.buffer(), &mut self.buf.0, self.codec).unwrap(); + self.blocks.advance()?; + self.was_swapped = decompress_block(self.blocks.buffer(), &mut self.buf.0, self.codec)?; self.buf.1 = self.blocks.get().map(|(_, rows)| *rows).unwrap_or_default(); + Ok(()) } fn get(&self) -> Option<&Self::Item> { @@ -188,15 +196,12 @@ impl<'a, R: Read> Iterator for Reader<'a, R> { type Item = Result; fn next(&mut self) -> Option { - if let Some((data, rows)) = self.iter.next() { - Some(deserialize::deserialize( - data, - *rows, - self.schema.clone(), - &self.avro_schemas, - )) - } else { - None - } + let schema = self.schema.clone(); + let avro_schemas = &self.avro_schemas; + + self.iter.next().transpose().map(|x| { + let (data, rows) = x?; + deserialize::deserialize(data, *rows, schema, avro_schemas) + }) } } diff --git a/src/io/avro/read/schema.rs b/src/io/avro/read/schema.rs index 8b9d648cf5f..7e0e8551c1a 100644 --- a/src/io/avro/read/schema.rs +++ b/src/io/avro/read/schema.rs @@ -142,7 +142,7 @@ fn schema_to_field( .iter() .map(|s| schema_to_field(s, None, has_nullable, None)) .collect::>>()?; - DataType::Union(fields, None, false) + DataType::Union(fields, None, UnionMode::Dense) } } AvroSchema::Record { name, fields, .. } => { @@ -173,7 +173,7 @@ fn schema_to_field( false, )) } - AvroSchema::Fixed { size, .. } => DataType::FixedSizeBinary(*size as i32), + AvroSchema::Fixed { size, .. } => DataType::FixedSizeBinary(*size), AvroSchema::Decimal { precision, scale, .. } => DataType::Decimal(*precision, *scale), diff --git a/arrow-flight/src/utils.rs b/src/io/flight/mod.rs similarity index 51% rename from arrow-flight/src/utils.rs rename to src/io/flight/mod.rs index 25f01837ee3..ab4b0eb9283 100644 --- a/arrow-flight/src/utils.rs +++ b/src/io/flight/mod.rs @@ -1,48 +1,30 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Utilities to assist with reading and writing Arrow data as Flight messages - -use std::{convert::TryFrom, sync::Arc}; - -use crate::{FlightData, SchemaResult}; - -use arrow2::{ +use std::convert::TryFrom; +use std::sync::Arc; + +use arrow_format::flight::data::{FlightData, SchemaResult}; +use arrow_format::ipc; + +use crate::{ array::*, datatypes::*, error::{ArrowError, Result}, - io::ipc, - io::ipc::gen::Schema::MetadataVersion, + io::ipc::fb_to_schema, io::ipc::read::read_record_batch, io::ipc::write, io::ipc::write::common::{encoded_batch, DictionaryTracker, EncodedData, IpcWriteOptions}, record_batch::RecordBatch, }; -/// Convert a `RecordBatch` to a vector of `FlightData` representing the bytes of the dictionaries -/// and a `FlightData` representing the bytes of the batch's values -pub fn flight_data_from_arrow_batch( +/// Serializes a [`RecordBatch`] to a vector of [`FlightData`] representing the serialized dictionaries +/// and a [`FlightData`] representing the batch. +pub fn serialize_batch( batch: &RecordBatch, options: &IpcWriteOptions, ) -> (Vec, FlightData) { let mut dictionary_tracker = DictionaryTracker::new(false); let (encoded_dictionaries, encoded_batch) = - encoded_batch(batch, &mut dictionary_tracker, &options) + encoded_batch(batch, &mut dictionary_tracker, options) .expect("DictionaryTracker configured above to not error on replacement"); let flight_dictionaries = encoded_dictionaries.into_iter().map(Into::into).collect(); @@ -61,40 +43,37 @@ impl From for FlightData { } } -/// Convert a `Schema` to `SchemaResult` by converting to an IPC message -pub fn flight_schema_from_arrow_schema(schema: &Schema, options: &IpcWriteOptions) -> SchemaResult { +/// Serializes a [`Schema`] to [`SchemaResult`]. +pub fn serialize_schema_to_result(schema: &Schema, options: &IpcWriteOptions) -> SchemaResult { SchemaResult { - schema: flight_schema_as_flatbuffer(schema, options), + schema: schema_as_flatbuffer(schema, options), } } -/// Convert a `Schema` to `FlightData` by converting to an IPC message -pub fn flight_data_from_arrow_schema(schema: &Schema, options: &IpcWriteOptions) -> FlightData { - let data_header = flight_schema_as_flatbuffer(schema, options); +/// Serializes a [`Schema`] to [`FlightData`]. +pub fn serialize_schema(schema: &Schema, options: &IpcWriteOptions) -> FlightData { + let data_header = schema_as_flatbuffer(schema, options); FlightData { data_header, ..Default::default() } } -/// Convert a `Schema` to bytes in the format expected in `FlightInfo.schema` -pub fn ipc_message_from_arrow_schema( - arrow_schema: &Schema, - options: &IpcWriteOptions, -) -> Result> { - let encoded_data = flight_schema_as_encoded_data(arrow_schema, options); +/// Convert a [`Schema`] to bytes in the format expected in [`arrow_format::flight::FlightInfo`]. +pub fn serialize_schema_to_info(schema: &Schema, options: &IpcWriteOptions) -> Result> { + let encoded_data = schema_as_encoded_data(schema, options); let mut schema = vec![]; write::common::write_message(&mut schema, encoded_data, options)?; Ok(schema) } -fn flight_schema_as_flatbuffer(arrow_schema: &Schema, options: &IpcWriteOptions) -> Vec { - let encoded_data = flight_schema_as_encoded_data(arrow_schema, options); +fn schema_as_flatbuffer(schema: &Schema, options: &IpcWriteOptions) -> Vec { + let encoded_data = schema_as_encoded_data(schema, options); encoded_data.ipc_message } -fn flight_schema_as_encoded_data(arrow_schema: &Schema, options: &IpcWriteOptions) -> EncodedData { +fn schema_as_encoded_data(arrow_schema: &Schema, options: &IpcWriteOptions) -> EncodedData { EncodedData { ipc_message: write::schema_to_bytes(arrow_schema, *options.metadata_version()), arrow_data: vec![], @@ -103,8 +82,8 @@ fn flight_schema_as_encoded_data(arrow_schema: &Schema, options: &IpcWriteOption /// Deserialize an IPC message into a schema fn schema_from_bytes(bytes: &[u8]) -> Result { - if let Ok(ipc) = ipc::root_as_message(bytes) { - if let Some((schema, _)) = ipc.header_as_schema().map(ipc::fb_to_schema) { + if let Ok(ipc) = ipc::Message::root_as_message(bytes) { + if let Some((schema, _)) = ipc.header_as_schema().map(fb_to_schema) { Ok(schema) } else { Err(ArrowError::Ipc("Unable to get head as schema".to_string())) @@ -114,9 +93,6 @@ fn schema_from_bytes(bytes: &[u8]) -> Result { } } -/// Try convert `FlightData` into an Arrow Schema -/// -/// Returns an error if the `FlightData` header is not a valid IPC schema impl TryFrom<&FlightData> for Schema { type Error = ArrowError; fn try_from(data: &FlightData) -> Result { @@ -129,9 +105,6 @@ impl TryFrom<&FlightData> for Schema { } } -/// Try convert `SchemaResult` into an Arrow Schema -/// -/// Returns an error if the `FlightData` header is not a valid IPC schema impl TryFrom<&SchemaResult> for Schema { type Error = ArrowError; fn try_from(data: &SchemaResult) -> Result { @@ -144,15 +117,15 @@ impl TryFrom<&SchemaResult> for Schema { } } -/// Convert a FlightData message to a RecordBatch -pub fn flight_data_to_arrow_batch( +/// Deserializes [`FlightData`] to a [`RecordBatch`]. +pub fn deserialize_batch( data: &FlightData, schema: Arc, is_little_endian: bool, dictionaries_by_field: &[Option>], ) -> Result { // check that the data_header is a record batch message - let message = ipc::root_as_message(&data.data_header[..]) + let message = ipc::Message::root_as_message(&data.data_header[..]) .map_err(|err| ArrowError::Ipc(format!("Unable to get root as message: {:?}", err)))?; let mut reader = std::io::Cursor::new(&data.data_body); @@ -168,8 +141,8 @@ pub fn flight_data_to_arrow_batch( schema.clone(), None, is_little_endian, - &dictionaries_by_field, - MetadataVersion::V5, + dictionaries_by_field, + ipc::Schema::MetadataVersion::V5, &mut reader, 0, ) diff --git a/src/io/ipc/convert.rs b/src/io/ipc/convert.rs index d6f3edfc1b9..ac78bdf7a67 100644 --- a/src/io/ipc/convert.rs +++ b/src/io/ipc/convert.rs @@ -17,22 +17,20 @@ //! Utilities for converting between IPC types and native Arrow types -use crate::datatypes::{ - get_extension, DataType, Extension, Field, IntervalUnit, Metadata, Schema, TimeUnit, +use arrow_format::ipc::flatbuffers::{ + FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, WIPOffset, }; -use crate::io::ipc::convert::ipc::UnionMode; -use crate::io::ipc::endianess::is_native_little_endian; - +use std::collections::{BTreeMap, HashMap}; mod ipc { - pub use super::super::gen::File::*; - pub use super::super::gen::Message::*; - pub use super::super::gen::Schema::*; + pub use arrow_format::ipc::File::*; + pub use arrow_format::ipc::Message::*; + pub use arrow_format::ipc::Schema::*; } -use flatbuffers::{FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, WIPOffset}; -use std::collections::{BTreeMap, HashMap}; - -use DataType::*; +use crate::datatypes::{ + get_extension, DataType, Extension, Field, IntervalUnit, Metadata, Schema, TimeUnit, UnionMode, +}; +use crate::io::ipc::endianess::is_native_little_endian; pub fn schema_to_fb_offset<'a>( fbb: &mut FlatBufferBuilder<'a>, @@ -193,7 +191,7 @@ fn get_data_type(field: ipc::Field, extension: Extension, may_be_dictionary: boo ipc::Type::LargeUtf8 => DataType::LargeUtf8, ipc::Type::FixedSizeBinary => { let fsb = field.type_as_fixed_size_binary().unwrap(); - DataType::FixedSizeBinary(fsb.byteWidth()) + DataType::FixedSizeBinary(fsb.byteWidth() as usize) } ipc::Type::FloatingPoint => { let float = field.type_as_floating_point().unwrap(); @@ -275,7 +273,7 @@ fn get_data_type(field: ipc::Field, extension: Extension, may_be_dictionary: boo panic!("expect a list to have one child") } let fsl = field.type_as_fixed_size_list().unwrap(); - DataType::FixedSizeList(Box::new(children.get(0).into()), fsl.listSize()) + DataType::FixedSizeList(Box::new(children.get(0).into()), fsl.listSize() as usize) } ipc::Type::Struct_ => { let mut fields = vec![]; @@ -294,7 +292,7 @@ fn get_data_type(field: ipc::Field, extension: Extension, may_be_dictionary: boo ipc::Type::Union => { let type_ = field.type_as_union().unwrap(); - let is_sparse = type_.mode() == UnionMode::Sparse; + let mode = UnionMode::sparse(type_.mode() == ipc::UnionMode::Sparse); let ids = type_.typeIds().map(|x| x.iter().collect()); @@ -305,7 +303,7 @@ fn get_data_type(field: ipc::Field, extension: Extension, may_be_dictionary: boo } else { vec![] }; - DataType::Union(fields, ids, is_sparse) + DataType::Union(fields, ids, mode) } ipc::Type::Map => { let map = field.type_as_map().unwrap(); @@ -378,7 +376,7 @@ pub(crate) fn build_field<'a>( let fb_field_name = fbb.create_string(field.name().as_str()); let field_type = get_fb_field_type(field.data_type(), field.is_nullable(), fbb); - let fb_dictionary = if let Dictionary(index_type, inner) = field.data_type() { + let fb_dictionary = if let DataType::Dictionary(index_type, inner) = field.data_type() { if let DataType::Extension(name, _, metadata) = inner.as_ref() { write_extension(fbb, name, metadata, &mut kv_vec); } @@ -428,6 +426,7 @@ pub(crate) fn build_field<'a>( } fn type_to_field_type(data_type: &DataType) -> ipc::Type { + use DataType::*; match data_type { Null => ipc::Type::Null, Boolean => ipc::Type::Bool, @@ -461,6 +460,7 @@ pub(crate) fn get_fb_field_type<'a>( is_nullable: bool, fbb: &mut FlatBufferBuilder<'a>, ) -> FbFieldType<'a> { + use DataType::*; let type_type = type_to_field_type(data_type); // some IPC implementations expect an empty list for child data, instead of a null value. @@ -704,16 +704,16 @@ pub(crate) fn get_fb_field_type<'a>( children: Some(fbb.create_vector(&empty_fields[..])), } } - Union(fields, ids, is_sparse) => { + Union(fields, ids, mode) => { let children: Vec<_> = fields.iter().map(|field| build_field(fbb, field)).collect(); let ids = ids.as_ref().map(|ids| fbb.create_vector(ids)); let mut builder = ipc::UnionBuilder::new(fbb); - builder.add_mode(if *is_sparse { - UnionMode::Sparse + builder.add_mode(if mode.is_sparse() { + ipc::UnionMode::Sparse } else { - UnionMode::Dense + ipc::UnionMode::Dense }); if let Some(ids) = ids { @@ -745,6 +745,7 @@ pub(crate) fn get_fb_dictionary<'a>( dict_is_ordered: bool, fbb: &mut FlatBufferBuilder<'a>, ) -> WIPOffset> { + use DataType::*; // We assume that the dictionary index type (as an integer) has already been // validated elsewhere, and can safely assume we are dealing with integers let mut index_builder = ipc::IntBuilder::new(fbb); diff --git a/src/io/ipc/gen/File.rs b/src/io/ipc/gen/File.rs deleted file mode 100644 index a5a1512ce95..00000000000 --- a/src/io/ipc/gen/File.rs +++ /dev/null @@ -1,471 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#![allow(dead_code)] -#![allow(unused_imports)] - -use super::Schema::*; -use flatbuffers::EndianScalar; -use std::{cmp::Ordering, mem}; -// automatically generated by the FlatBuffers compiler, do not modify - -// struct Block, aligned to 8 -#[repr(transparent)] -#[derive(Clone, Copy, PartialEq)] -pub struct Block(pub [u8; 24]); -impl std::fmt::Debug for Block { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - f.debug_struct("Block") - .field("offset", &self.offset()) - .field("metaDataLength", &self.metaDataLength()) - .field("bodyLength", &self.bodyLength()) - .finish() - } -} - -impl flatbuffers::SimpleToVerifyInSlice for Block {} -impl flatbuffers::SafeSliceAccess for Block {} -impl<'a> flatbuffers::Follow<'a> for Block { - type Inner = &'a Block; - #[inline] - fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { - <&'a Block>::follow(buf, loc) - } -} -impl<'a> flatbuffers::Follow<'a> for &'a Block { - type Inner = &'a Block; - #[inline] - fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { - flatbuffers::follow_cast_ref::(buf, loc) - } -} -impl<'b> flatbuffers::Push for Block { - type Output = Block; - #[inline] - fn push(&self, dst: &mut [u8], _rest: &[u8]) { - let src = unsafe { - ::std::slice::from_raw_parts(self as *const Block as *const u8, Self::size()) - }; - dst.copy_from_slice(src); - } -} -impl<'b> flatbuffers::Push for &'b Block { - type Output = Block; - - #[inline] - fn push(&self, dst: &mut [u8], _rest: &[u8]) { - let src = unsafe { - ::std::slice::from_raw_parts(*self as *const Block as *const u8, Self::size()) - }; - dst.copy_from_slice(src); - } -} - -impl<'a> flatbuffers::Verifiable for Block { - #[inline] - fn run_verifier( - v: &mut flatbuffers::Verifier, - pos: usize, - ) -> Result<(), flatbuffers::InvalidFlatbuffer> { - use flatbuffers::Verifiable; - v.in_buffer::(pos) - } -} -impl Block { - #[allow(clippy::too_many_arguments)] - pub fn new(offset: i64, metaDataLength: i32, bodyLength: i64) -> Self { - let mut s = Self([0; 24]); - s.set_offset(offset); - s.set_metaDataLength(metaDataLength); - s.set_bodyLength(bodyLength); - s - } - - /// Index to the start of the RecordBlock (note this is past the Message header) - pub fn offset(&self) -> i64 { - let mut mem = core::mem::MaybeUninit::::uninit(); - unsafe { - core::ptr::copy_nonoverlapping( - self.0[0..].as_ptr(), - mem.as_mut_ptr() as *mut u8, - core::mem::size_of::(), - ); - mem.assume_init() - } - .from_little_endian() - } - - pub fn set_offset(&mut self, x: i64) { - let x_le = x.to_little_endian(); - unsafe { - core::ptr::copy_nonoverlapping( - &x_le as *const i64 as *const u8, - self.0[0..].as_mut_ptr(), - core::mem::size_of::(), - ); - } - } - - /// Length of the metadata - pub fn metaDataLength(&self) -> i32 { - let mut mem = core::mem::MaybeUninit::::uninit(); - unsafe { - core::ptr::copy_nonoverlapping( - self.0[8..].as_ptr(), - mem.as_mut_ptr() as *mut u8, - core::mem::size_of::(), - ); - mem.assume_init() - } - .from_little_endian() - } - - pub fn set_metaDataLength(&mut self, x: i32) { - let x_le = x.to_little_endian(); - unsafe { - core::ptr::copy_nonoverlapping( - &x_le as *const i32 as *const u8, - self.0[8..].as_mut_ptr(), - core::mem::size_of::(), - ); - } - } - - /// Length of the data (this is aligned so there can be a gap between this and - /// the metadata). - pub fn bodyLength(&self) -> i64 { - let mut mem = core::mem::MaybeUninit::::uninit(); - unsafe { - core::ptr::copy_nonoverlapping( - self.0[16..].as_ptr(), - mem.as_mut_ptr() as *mut u8, - core::mem::size_of::(), - ); - mem.assume_init() - } - .from_little_endian() - } - - pub fn set_bodyLength(&mut self, x: i64) { - let x_le = x.to_little_endian(); - unsafe { - core::ptr::copy_nonoverlapping( - &x_le as *const i64 as *const u8, - self.0[16..].as_mut_ptr(), - core::mem::size_of::(), - ); - } - } -} - -pub enum FooterOffset {} -#[derive(Copy, Clone, PartialEq)] - -/// ---------------------------------------------------------------------- -/// Arrow File metadata -/// -pub struct Footer<'a> { - pub _tab: flatbuffers::Table<'a>, -} - -impl<'a> flatbuffers::Follow<'a> for Footer<'a> { - type Inner = Footer<'a>; - #[inline] - fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { - Self { - _tab: flatbuffers::Table { buf, loc }, - } - } -} - -impl<'a> Footer<'a> { - #[inline] - pub fn init_from_table(table: flatbuffers::Table<'a>) -> Self { - Footer { _tab: table } - } - #[allow(unused_mut)] - pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr>( - _fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr>, - args: &'args FooterArgs<'args>, - ) -> flatbuffers::WIPOffset> { - let mut builder = FooterBuilder::new(_fbb); - if let Some(x) = args.custom_metadata { - builder.add_custom_metadata(x); - } - if let Some(x) = args.recordBatches { - builder.add_recordBatches(x); - } - if let Some(x) = args.dictionaries { - builder.add_dictionaries(x); - } - if let Some(x) = args.schema { - builder.add_schema(x); - } - builder.add_version(args.version); - builder.finish() - } - - pub const VT_VERSION: flatbuffers::VOffsetT = 4; - pub const VT_SCHEMA: flatbuffers::VOffsetT = 6; - pub const VT_DICTIONARIES: flatbuffers::VOffsetT = 8; - pub const VT_RECORDBATCHES: flatbuffers::VOffsetT = 10; - pub const VT_CUSTOM_METADATA: flatbuffers::VOffsetT = 12; - - #[inline] - pub fn version(&self) -> MetadataVersion { - self._tab - .get::(Footer::VT_VERSION, Some(MetadataVersion::V1)) - .unwrap() - } - #[inline] - pub fn schema(&self) -> Option> { - self._tab - .get::>(Footer::VT_SCHEMA, None) - } - #[inline] - pub fn dictionaries(&self) -> Option<&'a [Block]> { - self._tab - .get::>>( - Footer::VT_DICTIONARIES, - None, - ) - .map(|v| v.safe_slice()) - } - #[inline] - pub fn recordBatches(&self) -> Option<&'a [Block]> { - self._tab - .get::>>( - Footer::VT_RECORDBATCHES, - None, - ) - .map(|v| v.safe_slice()) - } - /// User-defined metadata - #[inline] - pub fn custom_metadata( - &self, - ) -> Option>>> { - self._tab.get::>, - >>(Footer::VT_CUSTOM_METADATA, None) - } -} - -impl flatbuffers::Verifiable for Footer<'_> { - #[inline] - fn run_verifier( - v: &mut flatbuffers::Verifier, - pos: usize, - ) -> Result<(), flatbuffers::InvalidFlatbuffer> { - use flatbuffers::Verifiable; - v.visit_table(pos)? - .visit_field::(&"version", Self::VT_VERSION, false)? - .visit_field::>(&"schema", Self::VT_SCHEMA, false)? - .visit_field::>>( - &"dictionaries", - Self::VT_DICTIONARIES, - false, - )? - .visit_field::>>( - &"recordBatches", - Self::VT_RECORDBATCHES, - false, - )? - .visit_field::>, - >>(&"custom_metadata", Self::VT_CUSTOM_METADATA, false)? - .finish(); - Ok(()) - } -} -pub struct FooterArgs<'a> { - pub version: MetadataVersion, - pub schema: Option>>, - pub dictionaries: Option>>, - pub recordBatches: Option>>, - pub custom_metadata: Option< - flatbuffers::WIPOffset>>>, - >, -} -impl<'a> Default for FooterArgs<'a> { - #[inline] - fn default() -> Self { - FooterArgs { - version: MetadataVersion::V1, - schema: None, - dictionaries: None, - recordBatches: None, - custom_metadata: None, - } - } -} -pub struct FooterBuilder<'a: 'b, 'b> { - fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a>, - start_: flatbuffers::WIPOffset, -} -impl<'a: 'b, 'b> FooterBuilder<'a, 'b> { - #[inline] - pub fn add_version(&mut self, version: MetadataVersion) { - self.fbb_ - .push_slot::(Footer::VT_VERSION, version, MetadataVersion::V1); - } - #[inline] - pub fn add_schema(&mut self, schema: flatbuffers::WIPOffset>) { - self.fbb_ - .push_slot_always::>(Footer::VT_SCHEMA, schema); - } - #[inline] - pub fn add_dictionaries( - &mut self, - dictionaries: flatbuffers::WIPOffset>, - ) { - self.fbb_ - .push_slot_always::>(Footer::VT_DICTIONARIES, dictionaries); - } - #[inline] - pub fn add_recordBatches( - &mut self, - recordBatches: flatbuffers::WIPOffset>, - ) { - self.fbb_ - .push_slot_always::>(Footer::VT_RECORDBATCHES, recordBatches); - } - #[inline] - pub fn add_custom_metadata( - &mut self, - custom_metadata: flatbuffers::WIPOffset< - flatbuffers::Vector<'b, flatbuffers::ForwardsUOffset>>, - >, - ) { - self.fbb_.push_slot_always::>( - Footer::VT_CUSTOM_METADATA, - custom_metadata, - ); - } - #[inline] - pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>) -> FooterBuilder<'a, 'b> { - let start = _fbb.start_table(); - FooterBuilder { - fbb_: _fbb, - start_: start, - } - } - #[inline] - pub fn finish(self) -> flatbuffers::WIPOffset> { - let o = self.fbb_.end_table(self.start_); - flatbuffers::WIPOffset::new(o.value()) - } -} - -impl std::fmt::Debug for Footer<'_> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let mut ds = f.debug_struct("Footer"); - ds.field("version", &self.version()); - ds.field("schema", &self.schema()); - ds.field("dictionaries", &self.dictionaries()); - ds.field("recordBatches", &self.recordBatches()); - ds.field("custom_metadata", &self.custom_metadata()); - ds.finish() - } -} -#[inline] -#[deprecated(since = "2.0.0", note = "Deprecated in favor of `root_as...` methods.")] -pub fn get_root_as_footer<'a>(buf: &'a [u8]) -> Footer<'a> { - unsafe { flatbuffers::root_unchecked::>(buf) } -} - -#[inline] -#[deprecated(since = "2.0.0", note = "Deprecated in favor of `root_as...` methods.")] -pub fn get_size_prefixed_root_as_footer<'a>(buf: &'a [u8]) -> Footer<'a> { - unsafe { flatbuffers::size_prefixed_root_unchecked::>(buf) } -} - -#[inline] -/// Verifies that a buffer of bytes contains a `Footer` -/// and returns it. -/// Note that verification is still experimental and may not -/// catch every error, or be maximally performant. For the -/// previous, unchecked, behavior use -/// `root_as_footer_unchecked`. -pub fn root_as_footer(buf: &[u8]) -> Result { - flatbuffers::root::