diff --git a/Cargo.toml b/Cargo.toml index d5982d72553..20a11dc9061 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,7 +39,8 @@ indexmap = { version = "^1.6", optional = true } # used to print columns in a nice columnar format comfy-table = { version = "4.0", optional = true, default-features = false } -flatbuffers = { version = "=2.0.0", optional = true } +arrow-format = { version = "*", optional = true, features = ["ipc"] } + hex = { version = "^0.4", optional = true } # for IPC compression @@ -90,6 +91,7 @@ full = [ "io_csv", "io_json", "io_ipc", + "io_flight", "io_ipc_compression", "io_json_integration", "io_print", @@ -107,8 +109,9 @@ io_csv = ["io_csv_read", "io_csv_write"] io_csv_read = ["csv", "lexical-core"] io_csv_write = ["csv", "streaming-iterator", "lexical-core"] io_json = ["serde", "serde_json", "indexmap"] -io_ipc = ["flatbuffers"] +io_ipc = ["arrow-format"] io_ipc_compression = ["lz4", "zstd"] +io_flight = ["io_ipc", "arrow-format/flight-data"] io_parquet_compression = [ "parquet2/zstd", "parquet2/snappy", @@ -145,6 +148,8 @@ skip_feature_sets = [ ["io_csv_write"], ["io_avro"], ["io_json"], + ["io_flight"], + ["io_ipc"], ["io_parquet"], ["io_json_integration"], # this does not change the public API diff --git a/arrow-flight/Cargo.toml b/arrow-flight/Cargo.toml deleted file mode 100644 index ad423b6d0d4..00000000000 --- a/arrow-flight/Cargo.toml +++ /dev/null @@ -1,39 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -[package] -name = "arrow-flight" -description = "Apache Arrow Flight" -version = "0.1.0" -edition = "2018" -authors = ["Apache Arrow "] -homepage = "https://github.com/apache/arrow" -repository = "https://github.com/apache/arrow" -license = "Apache-2.0" - -[dependencies] -arrow2 = { path = "../", features = ["io_ipc"], default-features = false } -tonic = "0.5.2" -bytes = "1" -prost = "0.8.0" -prost-derive = "0.8.0" -tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] } -futures = { version = "0.3", default-features = false, features = ["alloc"]} - -#[lib] -#name = "flight" -#path = "src/lib.rs" diff --git a/arrow-flight/src/arrow.flight.protocol.rs b/arrow-flight/src/arrow.flight.protocol.rs deleted file mode 100644 index 5db746f6fda..00000000000 --- a/arrow-flight/src/arrow.flight.protocol.rs +++ /dev/null @@ -1,1071 +0,0 @@ -// This file was automatically generated through the build.rs script, and should not be edited. - -/// -/// The request that a client provides to a server on handshake. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct HandshakeRequest { - /// - /// A defined protocol version - #[prost(uint64, tag = "1")] - pub protocol_version: u64, - /// - /// Arbitrary auth/handshake info. - #[prost(bytes = "vec", tag = "2")] - pub payload: ::prost::alloc::vec::Vec, -} -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct HandshakeResponse { - /// - /// A defined protocol version - #[prost(uint64, tag = "1")] - pub protocol_version: u64, - /// - /// Arbitrary auth/handshake info. - #[prost(bytes = "vec", tag = "2")] - pub payload: ::prost::alloc::vec::Vec, -} -/// -/// A message for doing simple auth. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct BasicAuth { - #[prost(string, tag = "2")] - pub username: ::prost::alloc::string::String, - #[prost(string, tag = "3")] - pub password: ::prost::alloc::string::String, -} -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Empty {} -/// -/// Describes an available action, including both the name used for execution -/// along with a short description of the purpose of the action. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct ActionType { - #[prost(string, tag = "1")] - pub r#type: ::prost::alloc::string::String, - #[prost(string, tag = "2")] - pub description: ::prost::alloc::string::String, -} -/// -/// A service specific expression that can be used to return a limited set -/// of available Arrow Flight streams. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Criteria { - #[prost(bytes = "vec", tag = "1")] - pub expression: ::prost::alloc::vec::Vec, -} -/// -/// An opaque action specific for the service. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Action { - #[prost(string, tag = "1")] - pub r#type: ::prost::alloc::string::String, - #[prost(bytes = "vec", tag = "2")] - pub body: ::prost::alloc::vec::Vec, -} -/// -/// An opaque result returned after executing an action. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Result { - #[prost(bytes = "vec", tag = "1")] - pub body: ::prost::alloc::vec::Vec, -} -/// -/// Wrap the result of a getSchema call -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct SchemaResult { - /// schema of the dataset as described in Schema.fbs::Schema. - #[prost(bytes = "vec", tag = "1")] - pub schema: ::prost::alloc::vec::Vec, -} -/// -/// The name or tag for a Flight. May be used as a way to retrieve or generate -/// a flight or be used to expose a set of previously defined flights. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct FlightDescriptor { - #[prost(enumeration = "flight_descriptor::DescriptorType", tag = "1")] - pub r#type: i32, - /// - /// Opaque value used to express a command. Should only be defined when - /// type = CMD. - #[prost(bytes = "vec", tag = "2")] - pub cmd: ::prost::alloc::vec::Vec, - /// - /// List of strings identifying a particular dataset. Should only be defined - /// when type = PATH. - #[prost(string, repeated, tag = "3")] - pub path: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, -} -/// Nested message and enum types in `FlightDescriptor`. -pub mod flight_descriptor { - /// - /// Describes what type of descriptor is defined. - #[derive( - Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord, ::prost::Enumeration, - )] - #[repr(i32)] - pub enum DescriptorType { - /// Protobuf pattern, not used. - Unknown = 0, - /// - /// A named path that identifies a dataset. A path is composed of a string - /// or list of strings describing a particular dataset. This is conceptually - /// similar to a path inside a filesystem. - Path = 1, - /// - /// An opaque command to generate a dataset. - Cmd = 2, - } -} -/// -/// The access coordinates for retrieval of a dataset. With a FlightInfo, a -/// consumer is able to determine how to retrieve a dataset. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct FlightInfo { - /// schema of the dataset as described in Schema.fbs::Schema. - #[prost(bytes = "vec", tag = "1")] - pub schema: ::prost::alloc::vec::Vec, - /// - /// The descriptor associated with this info. - #[prost(message, optional, tag = "2")] - pub flight_descriptor: ::core::option::Option, - /// - /// A list of endpoints associated with the flight. To consume the whole - /// flight, all endpoints must be consumed. - #[prost(message, repeated, tag = "3")] - pub endpoint: ::prost::alloc::vec::Vec, - /// Set these to -1 if unknown. - #[prost(int64, tag = "4")] - pub total_records: i64, - #[prost(int64, tag = "5")] - pub total_bytes: i64, -} -/// -/// A particular stream or split associated with a flight. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct FlightEndpoint { - /// - /// Token used to retrieve this stream. - #[prost(message, optional, tag = "1")] - pub ticket: ::core::option::Option, - /// - /// A list of URIs where this ticket can be redeemed. If the list is - /// empty, the expectation is that the ticket can only be redeemed on the - /// current service where the ticket was generated. - #[prost(message, repeated, tag = "2")] - pub location: ::prost::alloc::vec::Vec, -} -/// -/// A location where a Flight service will accept retrieval of a particular -/// stream given a ticket. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Location { - #[prost(string, tag = "1")] - pub uri: ::prost::alloc::string::String, -} -/// -/// An opaque identifier that the service can use to retrieve a particular -/// portion of a stream. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct Ticket { - #[prost(bytes = "vec", tag = "1")] - pub ticket: ::prost::alloc::vec::Vec, -} -/// -/// A batch of Arrow data as part of a stream of batches. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct FlightData { - /// - /// The descriptor of the data. This is only relevant when a client is - /// starting a new DoPut stream. - #[prost(message, optional, tag = "1")] - pub flight_descriptor: ::core::option::Option, - /// - /// Header for message data as described in Message.fbs::Message. - #[prost(bytes = "vec", tag = "2")] - pub data_header: ::prost::alloc::vec::Vec, - /// - /// Application-defined metadata. - #[prost(bytes = "vec", tag = "3")] - pub app_metadata: ::prost::alloc::vec::Vec, - /// - /// The actual batch of Arrow data. Preferably handled with minimal-copies - /// coming last in the definition to help with sidecar patterns (it is - /// expected that some implementations will fetch this field off the wire - /// with specialized code to avoid extra memory copies). - #[prost(bytes = "vec", tag = "1000")] - pub data_body: ::prost::alloc::vec::Vec, -} -///* -/// The response message associated with the submission of a DoPut. -#[derive(Clone, PartialEq, ::prost::Message)] -pub struct PutResult { - #[prost(bytes = "vec", tag = "1")] - pub app_metadata: ::prost::alloc::vec::Vec, -} -#[doc = r" Generated client implementations."] -pub mod flight_service_client { - #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::*; - #[doc = ""] - #[doc = " A flight service is an endpoint for retrieving or storing Arrow data. A"] - #[doc = " flight service can expose one or more predefined endpoints that can be"] - #[doc = " accessed using the Arrow Flight Protocol. Additionally, a flight service"] - #[doc = " can expose a set of actions that are available."] - #[derive(Debug, Clone)] - pub struct FlightServiceClient { - inner: tonic::client::Grpc, - } - impl FlightServiceClient { - #[doc = r" Attempt to create a new client by connecting to a given endpoint."] - pub async fn connect(dst: D) -> Result - where - D: std::convert::TryInto, - D::Error: Into, - { - let conn = tonic::transport::Endpoint::new(dst)?.connect().await?; - Ok(Self::new(conn)) - } - } - impl FlightServiceClient - where - T: tonic::client::GrpcService, - T::ResponseBody: Body + Send + Sync + 'static, - T::Error: Into, - ::Error: Into + Send, - { - pub fn new(inner: T) -> Self { - let inner = tonic::client::Grpc::new(inner); - Self { inner } - } - pub fn with_interceptor( - inner: T, - interceptor: F, - ) -> FlightServiceClient> - where - F: tonic::service::Interceptor, - T: tonic::codegen::Service< - http::Request, - Response = http::Response< - >::ResponseBody, - >, - >, - >>::Error: - Into + Send + Sync, - { - FlightServiceClient::new(InterceptedService::new(inner, interceptor)) - } - #[doc = r" Compress requests with `gzip`."] - #[doc = r""] - #[doc = r" This requires the server to support it otherwise it might respond with an"] - #[doc = r" error."] - pub fn send_gzip(mut self) -> Self { - self.inner = self.inner.send_gzip(); - self - } - #[doc = r" Enable decompressing responses with `gzip`."] - pub fn accept_gzip(mut self) -> Self { - self.inner = self.inner.accept_gzip(); - self - } - #[doc = ""] - #[doc = " Handshake between client and server. Depending on the server, the"] - #[doc = " handshake may be required to determine the token that should be used for"] - #[doc = " future operations. Both request and response are streams to allow multiple"] - #[doc = " round-trips depending on auth mechanism."] - pub async fn handshake( - &mut self, - request: impl tonic::IntoStreamingRequest, - ) -> Result< - tonic::Response>, - tonic::Status, - > { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/arrow.flight.protocol.FlightService/Handshake", - ); - self.inner - .streaming(request.into_streaming_request(), path, codec) - .await - } - #[doc = ""] - #[doc = " Get a list of available streams given a particular criteria. Most flight"] - #[doc = " services will expose one or more streams that are readily available for"] - #[doc = " retrieval. This api allows listing the streams available for"] - #[doc = " consumption. A user can also provide a criteria. The criteria can limit"] - #[doc = " the subset of streams that can be listed via this interface. Each flight"] - #[doc = " service allows its own definition of how to consume criteria."] - pub async fn list_flights( - &mut self, - request: impl tonic::IntoRequest, - ) -> Result< - tonic::Response>, - tonic::Status, - > { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/arrow.flight.protocol.FlightService/ListFlights", - ); - self.inner - .server_streaming(request.into_request(), path, codec) - .await - } - #[doc = ""] - #[doc = " For a given FlightDescriptor, get information about how the flight can be"] - #[doc = " consumed. This is a useful interface if the consumer of the interface"] - #[doc = " already can identify the specific flight to consume. This interface can"] - #[doc = " also allow a consumer to generate a flight stream through a specified"] - #[doc = " descriptor. For example, a flight descriptor might be something that"] - #[doc = " includes a SQL statement or a Pickled Python operation that will be"] - #[doc = " executed. In those cases, the descriptor will not be previously available"] - #[doc = " within the list of available streams provided by ListFlights but will be"] - #[doc = " available for consumption for the duration defined by the specific flight"] - #[doc = " service."] - pub async fn get_flight_info( - &mut self, - request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/arrow.flight.protocol.FlightService/GetFlightInfo", - ); - self.inner.unary(request.into_request(), path, codec).await - } - #[doc = ""] - #[doc = " For a given FlightDescriptor, get the Schema as described in Schema.fbs::Schema"] - #[doc = " This is used when a consumer needs the Schema of flight stream. Similar to"] - #[doc = " GetFlightInfo this interface may generate a new flight that was not previously"] - #[doc = " available in ListFlights."] - pub async fn get_schema( - &mut self, - request: impl tonic::IntoRequest, - ) -> Result, tonic::Status> { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/arrow.flight.protocol.FlightService/GetSchema", - ); - self.inner.unary(request.into_request(), path, codec).await - } - #[doc = ""] - #[doc = " Retrieve a single stream associated with a particular descriptor"] - #[doc = " associated with the referenced ticket. A Flight can be composed of one or"] - #[doc = " more streams where each stream can be retrieved using a separate opaque"] - #[doc = " ticket that the flight service uses for managing a collection of streams."] - pub async fn do_get( - &mut self, - request: impl tonic::IntoRequest, - ) -> Result< - tonic::Response>, - tonic::Status, - > { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/arrow.flight.protocol.FlightService/DoGet", - ); - self.inner - .server_streaming(request.into_request(), path, codec) - .await - } - #[doc = ""] - #[doc = " Push a stream to the flight service associated with a particular"] - #[doc = " flight stream. This allows a client of a flight service to upload a stream"] - #[doc = " of data. Depending on the particular flight service, a client consumer"] - #[doc = " could be allowed to upload a single stream per descriptor or an unlimited"] - #[doc = " number. In the latter, the service might implement a 'seal' action that"] - #[doc = " can be applied to a descriptor once all streams are uploaded."] - pub async fn do_put( - &mut self, - request: impl tonic::IntoStreamingRequest, - ) -> Result< - tonic::Response>, - tonic::Status, - > { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/arrow.flight.protocol.FlightService/DoPut", - ); - self.inner - .streaming(request.into_streaming_request(), path, codec) - .await - } - #[doc = ""] - #[doc = " Open a bidirectional data channel for a given descriptor. This"] - #[doc = " allows clients to send and receive arbitrary Arrow data and"] - #[doc = " application-specific metadata in a single logical stream. In"] - #[doc = " contrast to DoGet/DoPut, this is more suited for clients"] - #[doc = " offloading computation (rather than storage) to a Flight service."] - pub async fn do_exchange( - &mut self, - request: impl tonic::IntoStreamingRequest, - ) -> Result< - tonic::Response>, - tonic::Status, - > { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/arrow.flight.protocol.FlightService/DoExchange", - ); - self.inner - .streaming(request.into_streaming_request(), path, codec) - .await - } - #[doc = ""] - #[doc = " Flight services can support an arbitrary number of simple actions in"] - #[doc = " addition to the possible ListFlights, GetFlightInfo, DoGet, DoPut"] - #[doc = " operations that are potentially available. DoAction allows a flight client"] - #[doc = " to do a specific action against a flight service. An action includes"] - #[doc = " opaque request and response objects that are specific to the type action"] - #[doc = " being undertaken."] - pub async fn do_action( - &mut self, - request: impl tonic::IntoRequest, - ) -> Result>, tonic::Status> - { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/arrow.flight.protocol.FlightService/DoAction", - ); - self.inner - .server_streaming(request.into_request(), path, codec) - .await - } - #[doc = ""] - #[doc = " A flight service exposes all of the available action types that it has"] - #[doc = " along with descriptions. This allows different flight consumers to"] - #[doc = " understand the capabilities of the flight service."] - pub async fn list_actions( - &mut self, - request: impl tonic::IntoRequest, - ) -> Result< - tonic::Response>, - tonic::Status, - > { - self.inner.ready().await.map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/arrow.flight.protocol.FlightService/ListActions", - ); - self.inner - .server_streaming(request.into_request(), path, codec) - .await - } - } -} -#[doc = r" Generated server implementations."] -pub mod flight_service_server { - #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::*; - #[doc = "Generated trait containing gRPC methods that should be implemented for use with FlightServiceServer."] - #[async_trait] - pub trait FlightService: Send + Sync + 'static { - #[doc = "Server streaming response type for the Handshake method."] - type HandshakeStream: futures_core::Stream> - + Send - + Sync - + 'static; - #[doc = ""] - #[doc = " Handshake between client and server. Depending on the server, the"] - #[doc = " handshake may be required to determine the token that should be used for"] - #[doc = " future operations. Both request and response are streams to allow multiple"] - #[doc = " round-trips depending on auth mechanism."] - async fn handshake( - &self, - request: tonic::Request>, - ) -> Result, tonic::Status>; - #[doc = "Server streaming response type for the ListFlights method."] - type ListFlightsStream: futures_core::Stream> - + Send - + Sync - + 'static; - #[doc = ""] - #[doc = " Get a list of available streams given a particular criteria. Most flight"] - #[doc = " services will expose one or more streams that are readily available for"] - #[doc = " retrieval. This api allows listing the streams available for"] - #[doc = " consumption. A user can also provide a criteria. The criteria can limit"] - #[doc = " the subset of streams that can be listed via this interface. Each flight"] - #[doc = " service allows its own definition of how to consume criteria."] - async fn list_flights( - &self, - request: tonic::Request, - ) -> Result, tonic::Status>; - #[doc = ""] - #[doc = " For a given FlightDescriptor, get information about how the flight can be"] - #[doc = " consumed. This is a useful interface if the consumer of the interface"] - #[doc = " already can identify the specific flight to consume. This interface can"] - #[doc = " also allow a consumer to generate a flight stream through a specified"] - #[doc = " descriptor. For example, a flight descriptor might be something that"] - #[doc = " includes a SQL statement or a Pickled Python operation that will be"] - #[doc = " executed. In those cases, the descriptor will not be previously available"] - #[doc = " within the list of available streams provided by ListFlights but will be"] - #[doc = " available for consumption for the duration defined by the specific flight"] - #[doc = " service."] - async fn get_flight_info( - &self, - request: tonic::Request, - ) -> Result, tonic::Status>; - #[doc = ""] - #[doc = " For a given FlightDescriptor, get the Schema as described in Schema.fbs::Schema"] - #[doc = " This is used when a consumer needs the Schema of flight stream. Similar to"] - #[doc = " GetFlightInfo this interface may generate a new flight that was not previously"] - #[doc = " available in ListFlights."] - async fn get_schema( - &self, - request: tonic::Request, - ) -> Result, tonic::Status>; - #[doc = "Server streaming response type for the DoGet method."] - type DoGetStream: futures_core::Stream> - + Send - + Sync - + 'static; - #[doc = ""] - #[doc = " Retrieve a single stream associated with a particular descriptor"] - #[doc = " associated with the referenced ticket. A Flight can be composed of one or"] - #[doc = " more streams where each stream can be retrieved using a separate opaque"] - #[doc = " ticket that the flight service uses for managing a collection of streams."] - async fn do_get( - &self, - request: tonic::Request, - ) -> Result, tonic::Status>; - #[doc = "Server streaming response type for the DoPut method."] - type DoPutStream: futures_core::Stream> - + Send - + Sync - + 'static; - #[doc = ""] - #[doc = " Push a stream to the flight service associated with a particular"] - #[doc = " flight stream. This allows a client of a flight service to upload a stream"] - #[doc = " of data. Depending on the particular flight service, a client consumer"] - #[doc = " could be allowed to upload a single stream per descriptor or an unlimited"] - #[doc = " number. In the latter, the service might implement a 'seal' action that"] - #[doc = " can be applied to a descriptor once all streams are uploaded."] - async fn do_put( - &self, - request: tonic::Request>, - ) -> Result, tonic::Status>; - #[doc = "Server streaming response type for the DoExchange method."] - type DoExchangeStream: futures_core::Stream> - + Send - + Sync - + 'static; - #[doc = ""] - #[doc = " Open a bidirectional data channel for a given descriptor. This"] - #[doc = " allows clients to send and receive arbitrary Arrow data and"] - #[doc = " application-specific metadata in a single logical stream. In"] - #[doc = " contrast to DoGet/DoPut, this is more suited for clients"] - #[doc = " offloading computation (rather than storage) to a Flight service."] - async fn do_exchange( - &self, - request: tonic::Request>, - ) -> Result, tonic::Status>; - #[doc = "Server streaming response type for the DoAction method."] - type DoActionStream: futures_core::Stream> - + Send - + Sync - + 'static; - #[doc = ""] - #[doc = " Flight services can support an arbitrary number of simple actions in"] - #[doc = " addition to the possible ListFlights, GetFlightInfo, DoGet, DoPut"] - #[doc = " operations that are potentially available. DoAction allows a flight client"] - #[doc = " to do a specific action against a flight service. An action includes"] - #[doc = " opaque request and response objects that are specific to the type action"] - #[doc = " being undertaken."] - async fn do_action( - &self, - request: tonic::Request, - ) -> Result, tonic::Status>; - #[doc = "Server streaming response type for the ListActions method."] - type ListActionsStream: futures_core::Stream> - + Send - + Sync - + 'static; - #[doc = ""] - #[doc = " A flight service exposes all of the available action types that it has"] - #[doc = " along with descriptions. This allows different flight consumers to"] - #[doc = " understand the capabilities of the flight service."] - async fn list_actions( - &self, - request: tonic::Request, - ) -> Result, tonic::Status>; - } - #[doc = ""] - #[doc = " A flight service is an endpoint for retrieving or storing Arrow data. A"] - #[doc = " flight service can expose one or more predefined endpoints that can be"] - #[doc = " accessed using the Arrow Flight Protocol. Additionally, a flight service"] - #[doc = " can expose a set of actions that are available."] - #[derive(Debug)] - pub struct FlightServiceServer { - inner: _Inner, - accept_compression_encodings: (), - send_compression_encodings: (), - } - struct _Inner(Arc); - impl FlightServiceServer { - pub fn new(inner: T) -> Self { - let inner = Arc::new(inner); - let inner = _Inner(inner); - Self { - inner, - accept_compression_encodings: Default::default(), - send_compression_encodings: Default::default(), - } - } - pub fn with_interceptor( - inner: T, - interceptor: F, - ) -> InterceptedService - where - F: tonic::service::Interceptor, - { - InterceptedService::new(Self::new(inner), interceptor) - } - } - impl tonic::codegen::Service> for FlightServiceServer - where - T: FlightService, - B: Body + Send + Sync + 'static, - B::Error: Into + Send + 'static, - { - type Response = http::Response; - type Error = Never; - type Future = BoxFuture; - fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - fn call(&mut self, req: http::Request) -> Self::Future { - let inner = self.inner.clone(); - match req.uri().path() { - "/arrow.flight.protocol.FlightService/Handshake" => { - #[allow(non_camel_case_types)] - struct HandshakeSvc(pub Arc); - impl - tonic::server::StreamingService - for HandshakeSvc - { - type Response = super::HandshakeResponse; - type ResponseStream = T::HandshakeStream; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request< - tonic::Streaming, - >, - ) -> Self::Future { - let inner = self.0.clone(); - let fut = async move { (*inner).handshake(request).await }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = HandshakeSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); - let res = grpc.streaming(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - "/arrow.flight.protocol.FlightService/ListFlights" => { - #[allow(non_camel_case_types)] - struct ListFlightsSvc(pub Arc); - impl - tonic::server::ServerStreamingService - for ListFlightsSvc - { - type Response = super::FlightInfo; - type ResponseStream = T::ListFlightsStream; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = self.0.clone(); - let fut = async move { (*inner).list_flights(request).await }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = ListFlightsSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); - let res = grpc.server_streaming(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - "/arrow.flight.protocol.FlightService/GetFlightInfo" => { - #[allow(non_camel_case_types)] - struct GetFlightInfoSvc(pub Arc); - impl - tonic::server::UnaryService - for GetFlightInfoSvc - { - type Response = super::FlightInfo; - type Future = - BoxFuture, tonic::Status>; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = self.0.clone(); - let fut = - async move { (*inner).get_flight_info(request).await }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = GetFlightInfoSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); - let res = grpc.unary(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - "/arrow.flight.protocol.FlightService/GetSchema" => { - #[allow(non_camel_case_types)] - struct GetSchemaSvc(pub Arc); - impl - tonic::server::UnaryService - for GetSchemaSvc - { - type Response = super::SchemaResult; - type Future = - BoxFuture, tonic::Status>; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = self.0.clone(); - let fut = async move { (*inner).get_schema(request).await }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = GetSchemaSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); - let res = grpc.unary(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - "/arrow.flight.protocol.FlightService/DoGet" => { - #[allow(non_camel_case_types)] - struct DoGetSvc(pub Arc); - impl - tonic::server::ServerStreamingService - for DoGetSvc - { - type Response = super::FlightData; - type ResponseStream = T::DoGetStream; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = self.0.clone(); - let fut = async move { (*inner).do_get(request).await }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = DoGetSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); - let res = grpc.server_streaming(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - "/arrow.flight.protocol.FlightService/DoPut" => { - #[allow(non_camel_case_types)] - struct DoPutSvc(pub Arc); - impl - tonic::server::StreamingService - for DoPutSvc - { - type Response = super::PutResult; - type ResponseStream = T::DoPutStream; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request>, - ) -> Self::Future { - let inner = self.0.clone(); - let fut = async move { (*inner).do_put(request).await }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = DoPutSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); - let res = grpc.streaming(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - "/arrow.flight.protocol.FlightService/DoExchange" => { - #[allow(non_camel_case_types)] - struct DoExchangeSvc(pub Arc); - impl - tonic::server::StreamingService - for DoExchangeSvc - { - type Response = super::FlightData; - type ResponseStream = T::DoExchangeStream; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request>, - ) -> Self::Future { - let inner = self.0.clone(); - let fut = async move { (*inner).do_exchange(request).await }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = DoExchangeSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); - let res = grpc.streaming(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - "/arrow.flight.protocol.FlightService/DoAction" => { - #[allow(non_camel_case_types)] - struct DoActionSvc(pub Arc); - impl - tonic::server::ServerStreamingService - for DoActionSvc - { - type Response = super::Result; - type ResponseStream = T::DoActionStream; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = self.0.clone(); - let fut = async move { (*inner).do_action(request).await }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = DoActionSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); - let res = grpc.server_streaming(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - "/arrow.flight.protocol.FlightService/ListActions" => { - #[allow(non_camel_case_types)] - struct ListActionsSvc(pub Arc); - impl - tonic::server::ServerStreamingService - for ListActionsSvc - { - type Response = super::ActionType; - type ResponseStream = T::ListActionsStream; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; - fn call( - &mut self, - request: tonic::Request, - ) -> Self::Future { - let inner = self.0.clone(); - let fut = async move { (*inner).list_actions(request).await }; - Box::pin(fut) - } - } - let accept_compression_encodings = self.accept_compression_encodings; - let send_compression_encodings = self.send_compression_encodings; - let inner = self.inner.clone(); - let fut = async move { - let inner = inner.0; - let method = ListActionsSvc(inner); - let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); - let res = grpc.server_streaming(method, req).await; - Ok(res) - }; - Box::pin(fut) - } - _ => Box::pin(async move { - Ok(http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap()) - }), - } - } - } - impl Clone for FlightServiceServer { - fn clone(&self) -> Self { - let inner = self.inner.clone(); - Self { - inner, - accept_compression_encodings: self.accept_compression_encodings, - send_compression_encodings: self.send_compression_encodings, - } - } - } - impl Clone for _Inner { - fn clone(&self) -> Self { - Self(self.0.clone()) - } - } - impl std::fmt::Debug for _Inner { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{:?}", self.0) - } - } - impl tonic::transport::NamedService for FlightServiceServer { - const NAME: &'static str = "arrow.flight.protocol.FlightService"; - } -} \ No newline at end of file diff --git a/arrow-flight/src/lib.rs b/arrow-flight/src/lib.rs deleted file mode 100644 index 6af2e748678..00000000000 --- a/arrow-flight/src/lib.rs +++ /dev/null @@ -1,20 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -include!("arrow.flight.protocol.rs"); - -pub mod utils; diff --git a/integration-testing/Cargo.toml b/integration-testing/Cargo.toml index e07ae2c141f..9c2782e760c 100644 --- a/integration-testing/Cargo.toml +++ b/integration-testing/Cargo.toml @@ -18,10 +18,8 @@ [package] name = "arrow-integration-testing" description = "Binaries used in the Arrow integration tests" -version = "4.0.0-SNAPSHOT" -homepage = "https://github.com/apache/arrow" -repository = "https://github.com/apache/arrow" -authors = ["Apache Arrow "] +version = "0.1.0" +authors = ["Jorge C Leitao", "Apache Arrow "] license = "Apache-2.0" edition = "2018" publish = false @@ -30,8 +28,8 @@ publish = false logging = ["tracing-subscriber"] [dependencies] -arrow2 = { path = "../", features = ["io_ipc", "io_ipc_compression", "io_json_integration"], default-features = false } -arrow-flight = { path = "../arrow-flight" } +arrow2 = { path = "../", features = ["io_ipc", "io_ipc_compression", "io_flight", "io_json_integration"] } +arrow-format = { version = "*", features = ["ipc", "flight-service"] } async-trait = "0.1.41" clap = "2.33" futures = "0.3" diff --git a/integration-testing/src/flight_client_scenarios/auth_basic_proto.rs b/integration-testing/src/flight_client_scenarios/auth_basic_proto.rs index 5e8cd467198..1b07e6358d5 100644 --- a/integration-testing/src/flight_client_scenarios/auth_basic_proto.rs +++ b/integration-testing/src/flight_client_scenarios/auth_basic_proto.rs @@ -17,8 +17,9 @@ use crate::{AUTH_PASSWORD, AUTH_USERNAME}; -use arrow_flight::{ - flight_service_client::FlightServiceClient, BasicAuth, HandshakeRequest, +use arrow_format::flight::data::{Action, HandshakeRequest, BasicAuth}; +use arrow_format::flight::service::{ + flight_service_client::FlightServiceClient, }; use futures::{stream, StreamExt}; use prost::Message; @@ -33,7 +34,7 @@ pub async fn run_scenario(host: &str, port: &str) -> Result { let url = format!("http://{}:{}", host, port); let mut client = FlightServiceClient::connect(url).await?; - let action = arrow_flight::Action::default(); + let action = Action::default(); let resp = client.do_action(Request::new(action.clone())).await; // This client is unauthenticated and should fail. diff --git a/integration-testing/src/flight_client_scenarios/integration_test.rs b/integration-testing/src/flight_client_scenarios/integration_test.rs index 820c82114b1..98fe0c2192b 100644 --- a/integration-testing/src/flight_client_scenarios/integration_test.rs +++ b/integration-testing/src/flight_client_scenarios/integration_test.rs @@ -17,17 +17,14 @@ use crate::{read_json_file, ArrowFile}; -use arrow2::{ - array::*, - datatypes::*, - io::ipc, - io::ipc::{gen::Message::MessageHeader, read, write}, - record_batch::RecordBatch, -}; -use arrow_flight::{ - flight_descriptor::DescriptorType, flight_service_client::FlightServiceClient, - utils::flight_data_to_arrow_batch, FlightData, FlightDescriptor, Location, Ticket, +use arrow2::{array::*, datatypes::*, io::flight::{self, deserialize_batch, serialize_batch}, io::ipc::{read, write}, record_batch::RecordBatch}; +use arrow_format::ipc; +use arrow_format::ipc::Message::MessageHeader; +use arrow_format::flight::data::{ + flight_descriptor::DescriptorType, + FlightData, FlightDescriptor, Location, Ticket, }; +use arrow_format::flight::service::flight_service_client::FlightServiceClient; use futures::{channel::mpsc, sink::SinkExt, stream, StreamExt}; use tonic::{Request, Streaming}; @@ -77,10 +74,9 @@ async fn upload_data( let (mut upload_tx, upload_rx) = mpsc::channel(10); let options = write::IpcWriteOptions::default(); - let mut schema_flight_data = - arrow_flight::utils::flight_data_from_arrow_schema(&schema, &options); - schema_flight_data.flight_descriptor = Some(descriptor.clone()); - upload_tx.send(schema_flight_data).await?; + let mut schema = flight::serialize_schema(&schema, &options); + schema.flight_descriptor = Some(descriptor.clone()); + upload_tx.send(schema).await?; let mut original_data_iter = original_data.iter().enumerate(); @@ -131,7 +127,7 @@ async fn send_batch( options: &write::IpcWriteOptions, ) -> Result { let (dictionary_flight_data, mut batch_flight_data) = - arrow_flight::utils::flight_data_from_arrow_batch(batch, options); + serialize_batch(batch, options); upload_tx .send_all(&mut stream::iter(dictionary_flight_data).map(Ok)) @@ -215,7 +211,7 @@ async fn consume_flight_location( assert_eq!(metadata, data.app_metadata); let actual_batch = - flight_data_to_arrow_batch(&data, schema.clone(), true, &dictionaries_by_field) + deserialize_batch(&data, schema.clone(), true, &dictionaries_by_field) .expect("Unable to convert flight data to Arrow batch"); assert_eq!(expected_batch.schema(), actual_batch.schema()); @@ -249,7 +245,7 @@ async fn receive_batch_flight_data( ) -> Option { let mut data = resp.next().await?.ok()?; let mut message = - ipc::root_as_message(&data.data_header[..]).expect("Error parsing first message"); + ipc::Message::root_as_message(&data.data_header[..]).expect("Error parsing first message"); while message.header_type() == MessageHeader::DictionaryBatch { let mut reader = std::io::Cursor::new(&data.data_body); @@ -266,7 +262,7 @@ async fn receive_batch_flight_data( .expect("Error reading dictionary"); data = resp.next().await?.ok()?; - message = ipc::root_as_message(&data.data_header[..]).expect("Error parsing message"); + message = ipc::Message::root_as_message(&data.data_header[..]).expect("Error parsing message"); } Some(data) diff --git a/integration-testing/src/flight_client_scenarios/middleware.rs b/integration-testing/src/flight_client_scenarios/middleware.rs index cbca879dca5..0694fef55c7 100644 --- a/integration-testing/src/flight_client_scenarios/middleware.rs +++ b/integration-testing/src/flight_client_scenarios/middleware.rs @@ -15,10 +15,10 @@ // specific language governing permissions and limitations // under the License. -use arrow_flight::{ - flight_descriptor::DescriptorType, flight_service_client::FlightServiceClient, - FlightDescriptor, +use arrow_format::flight::service::{ + flight_service_client::FlightServiceClient, }; +use arrow_format::flight::data::{flight_descriptor::DescriptorType, FlightDescriptor}; use tonic::{Request, Status}; type Error = Box; diff --git a/integration-testing/src/flight_server_scenarios.rs b/integration-testing/src/flight_server_scenarios.rs index 9163b692086..a8aab14712e 100644 --- a/integration-testing/src/flight_server_scenarios.rs +++ b/integration-testing/src/flight_server_scenarios.rs @@ -17,7 +17,7 @@ use std::net::SocketAddr; -use arrow_flight::{FlightEndpoint, Location, Ticket}; +use arrow_format::flight::data::{FlightEndpoint, Location, Ticket}; use tokio::net::TcpListener; pub mod auth_basic_proto; diff --git a/integration-testing/src/flight_server_scenarios/auth_basic_proto.rs b/integration-testing/src/flight_server_scenarios/auth_basic_proto.rs index ea7ad3c3385..5223aaa297b 100644 --- a/integration-testing/src/flight_server_scenarios/auth_basic_proto.rs +++ b/integration-testing/src/flight_server_scenarios/auth_basic_proto.rs @@ -18,16 +18,14 @@ use std::pin::Pin; use std::sync::Arc; -use arrow_flight::{ - flight_service_server::FlightService, flight_service_server::FlightServiceServer, - Action, ActionType, BasicAuth, Criteria, Empty, FlightData, FlightDescriptor, - FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaResult, Ticket, -}; +use arrow_format::flight::service::flight_service_server::{FlightService, FlightServiceServer}; +use arrow_format::flight::data::*; use futures::{channel::mpsc, sink::SinkExt, Stream, StreamExt}; use tokio::sync::Mutex; use tonic::{ metadata::MetadataMap, transport::Server, Request, Response, Status, Streaming, }; + type TonicStream = Pin + Send + Sync + 'static>>; type Error = Box; @@ -102,7 +100,7 @@ impl FlightService for AuthBasicProtoScenarioImpl { type ListFlightsStream = TonicStream>; type DoGetStream = TonicStream>; type DoPutStream = TonicStream>; - type DoActionStream = TonicStream>; + type DoActionStream = TonicStream>; type ListActionsStream = TonicStream>; type DoExchangeStream = TonicStream>; @@ -202,7 +200,7 @@ impl FlightService for AuthBasicProtoScenarioImpl { let flight_context = self.check_auth(request.metadata()).await?; // Respond with the authenticated username. let buf = flight_context.peer_identity().as_bytes().to_vec(); - let result = arrow_flight::Result { body: buf }; + let result = arrow_format::flight::data::Result { body: buf }; let output = futures::stream::once(async { Ok(result) }); Ok(Response::new(Box::pin(output) as Self::DoActionStream)) } diff --git a/integration-testing/src/flight_server_scenarios/integration_test.rs b/integration-testing/src/flight_server_scenarios/integration_test.rs index 16954647090..28ded196fc0 100644 --- a/integration-testing/src/flight_server_scenarios/integration_test.rs +++ b/integration-testing/src/flight_server_scenarios/integration_test.rs @@ -16,21 +16,25 @@ // under the License. use std::collections::HashMap; -use std::convert::TryFrom; use std::pin::Pin; use std::sync::Arc; +use std::convert::TryFrom; + +use arrow2::io::flight::{serialize_batch, serialize_schema}; +use arrow_format::flight::data::flight_descriptor::*; +use arrow_format::flight::service::flight_service_server::*; +use arrow_format::flight::data::*; +use arrow_format::ipc::Schema as ArrowSchema; +use arrow_format::ipc::Message::{Message, MessageHeader, root_as_message}; use arrow2::{ array::Array, datatypes::*, - io::ipc, - io::ipc::gen::Message::{Message, MessageHeader}, - io::ipc::gen::Schema::MetadataVersion, record_batch::RecordBatch, + io::ipc, + io::flight::serialize_schema_to_info }; -use arrow_flight::flight_descriptor::*; -use arrow_flight::flight_service_server::*; -use arrow_flight::*; + use futures::{channel::mpsc, sink::SinkExt, Stream, StreamExt}; use tokio::sync::Mutex; use tonic::{transport::Server, Request, Response, Status, Streaming}; @@ -81,7 +85,7 @@ impl FlightService for FlightServiceImpl { type ListFlightsStream = TonicStream>; type DoGetStream = TonicStream>; type DoPutStream = TonicStream>; - type DoActionStream = TonicStream>; + type DoActionStream = TonicStream>; type ListActionsStream = TonicStream>; type DoExchangeStream = TonicStream>; @@ -110,7 +114,7 @@ impl FlightService for FlightServiceImpl { let options = ipc::write::IpcWriteOptions::default(); let schema = std::iter::once({ - Ok(arrow_flight::utils::flight_data_from_arrow_schema( + Ok(serialize_schema( &flight.schema, &options, )) @@ -122,7 +126,7 @@ impl FlightService for FlightServiceImpl { .enumerate() .flat_map(|(counter, batch)| { let (dictionary_flight_data, mut batch_flight_data) = - arrow_flight::utils::flight_data_from_arrow_batch(batch, &options); + serialize_batch(batch, &options); // Only the record batch's FlightData gets app_metadata let metadata = counter.to_string().into_bytes(); @@ -177,7 +181,7 @@ impl FlightService for FlightServiceImpl { let options = ipc::write::IpcWriteOptions::default(); let schema = - arrow_flight::utils::ipc_message_from_arrow_schema(&flight.schema, &options) + serialize_schema_to_info(&flight.schema, &options) .expect( "Could not generate schema bytes from schema stored by a DoPut; \ this should be impossible", @@ -296,7 +300,7 @@ async fn record_batch_from_message( None, true, dictionaries_by_field, - MetadataVersion::V5, + ArrowSchema::MetadataVersion::V5, &mut reader, 0, ); @@ -343,7 +347,7 @@ async fn save_uploaded_chunks( let mut dictionaries_by_field = vec![None; schema_ref.fields().len()]; while let Some(Ok(data)) = input_stream.next().await { - let message = ipc::root_as_message(&data.data_header[..]) + let message = root_as_message(&data.data_header[..]) .map_err(|e| Status::internal(format!("Could not parse message: {:?}", e)))?; match message.header_type() { diff --git a/integration-testing/src/flight_server_scenarios/middleware.rs b/integration-testing/src/flight_server_scenarios/middleware.rs index 1416acc4088..ed686cfc7a4 100644 --- a/integration-testing/src/flight_server_scenarios/middleware.rs +++ b/integration-testing/src/flight_server_scenarios/middleware.rs @@ -17,12 +17,9 @@ use std::pin::Pin; -use arrow_flight::{ - flight_descriptor::DescriptorType, flight_service_server::FlightService, - flight_service_server::FlightServiceServer, Action, ActionType, Criteria, Empty, - FlightData, FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, - PutResult, SchemaResult, Ticket, -}; +use arrow_format::flight::data::flight_descriptor::DescriptorType; +use arrow_format::flight::service::flight_service_server::{FlightService, FlightServiceServer}; +use arrow_format::flight::data::*; use futures::Stream; use tonic::{transport::Server, Request, Response, Status, Streaming}; @@ -53,7 +50,7 @@ impl FlightService for MiddlewareScenarioImpl { type ListFlightsStream = TonicStream>; type DoGetStream = TonicStream>; type DoPutStream = TonicStream>; - type DoActionStream = TonicStream>; + type DoActionStream = TonicStream>; type ListActionsStream = TonicStream>; type DoExchangeStream = TonicStream>; diff --git a/src/doc/lib.md b/src/doc/lib.md index d37adce76ca..270fe26810e 100644 --- a/src/doc/lib.md +++ b/src/doc/lib.md @@ -77,6 +77,7 @@ functionality, such as: * `io_ipc_compression`: to read and write compressed Arrow IPC (v2) * `io_csv` to read and write CSV * `io_json` to read and write JSON +* `io_flight` to read and write to Arrow's Flight protocol * `io_parquet` to read and write parquet * `io_parquet_compression` to read and write compressed parquet * `io_print` to write batches to formatted ASCII tables diff --git a/arrow-flight/src/utils.rs b/src/io/flight/mod.rs similarity index 51% rename from arrow-flight/src/utils.rs rename to src/io/flight/mod.rs index 25f01837ee3..ab4b0eb9283 100644 --- a/arrow-flight/src/utils.rs +++ b/src/io/flight/mod.rs @@ -1,48 +1,30 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -//! Utilities to assist with reading and writing Arrow data as Flight messages - -use std::{convert::TryFrom, sync::Arc}; - -use crate::{FlightData, SchemaResult}; - -use arrow2::{ +use std::convert::TryFrom; +use std::sync::Arc; + +use arrow_format::flight::data::{FlightData, SchemaResult}; +use arrow_format::ipc; + +use crate::{ array::*, datatypes::*, error::{ArrowError, Result}, - io::ipc, - io::ipc::gen::Schema::MetadataVersion, + io::ipc::fb_to_schema, io::ipc::read::read_record_batch, io::ipc::write, io::ipc::write::common::{encoded_batch, DictionaryTracker, EncodedData, IpcWriteOptions}, record_batch::RecordBatch, }; -/// Convert a `RecordBatch` to a vector of `FlightData` representing the bytes of the dictionaries -/// and a `FlightData` representing the bytes of the batch's values -pub fn flight_data_from_arrow_batch( +/// Serializes a [`RecordBatch`] to a vector of [`FlightData`] representing the serialized dictionaries +/// and a [`FlightData`] representing the batch. +pub fn serialize_batch( batch: &RecordBatch, options: &IpcWriteOptions, ) -> (Vec, FlightData) { let mut dictionary_tracker = DictionaryTracker::new(false); let (encoded_dictionaries, encoded_batch) = - encoded_batch(batch, &mut dictionary_tracker, &options) + encoded_batch(batch, &mut dictionary_tracker, options) .expect("DictionaryTracker configured above to not error on replacement"); let flight_dictionaries = encoded_dictionaries.into_iter().map(Into::into).collect(); @@ -61,40 +43,37 @@ impl From for FlightData { } } -/// Convert a `Schema` to `SchemaResult` by converting to an IPC message -pub fn flight_schema_from_arrow_schema(schema: &Schema, options: &IpcWriteOptions) -> SchemaResult { +/// Serializes a [`Schema`] to [`SchemaResult`]. +pub fn serialize_schema_to_result(schema: &Schema, options: &IpcWriteOptions) -> SchemaResult { SchemaResult { - schema: flight_schema_as_flatbuffer(schema, options), + schema: schema_as_flatbuffer(schema, options), } } -/// Convert a `Schema` to `FlightData` by converting to an IPC message -pub fn flight_data_from_arrow_schema(schema: &Schema, options: &IpcWriteOptions) -> FlightData { - let data_header = flight_schema_as_flatbuffer(schema, options); +/// Serializes a [`Schema`] to [`FlightData`]. +pub fn serialize_schema(schema: &Schema, options: &IpcWriteOptions) -> FlightData { + let data_header = schema_as_flatbuffer(schema, options); FlightData { data_header, ..Default::default() } } -/// Convert a `Schema` to bytes in the format expected in `FlightInfo.schema` -pub fn ipc_message_from_arrow_schema( - arrow_schema: &Schema, - options: &IpcWriteOptions, -) -> Result> { - let encoded_data = flight_schema_as_encoded_data(arrow_schema, options); +/// Convert a [`Schema`] to bytes in the format expected in [`arrow_format::flight::FlightInfo`]. +pub fn serialize_schema_to_info(schema: &Schema, options: &IpcWriteOptions) -> Result> { + let encoded_data = schema_as_encoded_data(schema, options); let mut schema = vec![]; write::common::write_message(&mut schema, encoded_data, options)?; Ok(schema) } -fn flight_schema_as_flatbuffer(arrow_schema: &Schema, options: &IpcWriteOptions) -> Vec { - let encoded_data = flight_schema_as_encoded_data(arrow_schema, options); +fn schema_as_flatbuffer(schema: &Schema, options: &IpcWriteOptions) -> Vec { + let encoded_data = schema_as_encoded_data(schema, options); encoded_data.ipc_message } -fn flight_schema_as_encoded_data(arrow_schema: &Schema, options: &IpcWriteOptions) -> EncodedData { +fn schema_as_encoded_data(arrow_schema: &Schema, options: &IpcWriteOptions) -> EncodedData { EncodedData { ipc_message: write::schema_to_bytes(arrow_schema, *options.metadata_version()), arrow_data: vec![], @@ -103,8 +82,8 @@ fn flight_schema_as_encoded_data(arrow_schema: &Schema, options: &IpcWriteOption /// Deserialize an IPC message into a schema fn schema_from_bytes(bytes: &[u8]) -> Result { - if let Ok(ipc) = ipc::root_as_message(bytes) { - if let Some((schema, _)) = ipc.header_as_schema().map(ipc::fb_to_schema) { + if let Ok(ipc) = ipc::Message::root_as_message(bytes) { + if let Some((schema, _)) = ipc.header_as_schema().map(fb_to_schema) { Ok(schema) } else { Err(ArrowError::Ipc("Unable to get head as schema".to_string())) @@ -114,9 +93,6 @@ fn schema_from_bytes(bytes: &[u8]) -> Result { } } -/// Try convert `FlightData` into an Arrow Schema -/// -/// Returns an error if the `FlightData` header is not a valid IPC schema impl TryFrom<&FlightData> for Schema { type Error = ArrowError; fn try_from(data: &FlightData) -> Result { @@ -129,9 +105,6 @@ impl TryFrom<&FlightData> for Schema { } } -/// Try convert `SchemaResult` into an Arrow Schema -/// -/// Returns an error if the `FlightData` header is not a valid IPC schema impl TryFrom<&SchemaResult> for Schema { type Error = ArrowError; fn try_from(data: &SchemaResult) -> Result { @@ -144,15 +117,15 @@ impl TryFrom<&SchemaResult> for Schema { } } -/// Convert a FlightData message to a RecordBatch -pub fn flight_data_to_arrow_batch( +/// Deserializes [`FlightData`] to a [`RecordBatch`]. +pub fn deserialize_batch( data: &FlightData, schema: Arc, is_little_endian: bool, dictionaries_by_field: &[Option>], ) -> Result { // check that the data_header is a record batch message - let message = ipc::root_as_message(&data.data_header[..]) + let message = ipc::Message::root_as_message(&data.data_header[..]) .map_err(|err| ArrowError::Ipc(format!("Unable to get root as message: {:?}", err)))?; let mut reader = std::io::Cursor::new(&data.data_body); @@ -168,8 +141,8 @@ pub fn flight_data_to_arrow_batch( schema.clone(), None, is_little_endian, - &dictionaries_by_field, - MetadataVersion::V5, + dictionaries_by_field, + ipc::Schema::MetadataVersion::V5, &mut reader, 0, ) diff --git a/src/io/ipc/convert.rs b/src/io/ipc/convert.rs index d6f3edfc1b9..b69cc83bad6 100644 --- a/src/io/ipc/convert.rs +++ b/src/io/ipc/convert.rs @@ -17,22 +17,20 @@ //! Utilities for converting between IPC types and native Arrow types -use crate::datatypes::{ - get_extension, DataType, Extension, Field, IntervalUnit, Metadata, Schema, TimeUnit, +use arrow_format::ipc::flatbuffers::{ + FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, WIPOffset, }; -use crate::io::ipc::convert::ipc::UnionMode; -use crate::io::ipc::endianess::is_native_little_endian; - +use std::collections::{BTreeMap, HashMap}; mod ipc { - pub use super::super::gen::File::*; - pub use super::super::gen::Message::*; - pub use super::super::gen::Schema::*; + pub use arrow_format::ipc::File::*; + pub use arrow_format::ipc::Message::*; + pub use arrow_format::ipc::Schema::*; } -use flatbuffers::{FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, WIPOffset}; -use std::collections::{BTreeMap, HashMap}; - -use DataType::*; +use crate::datatypes::{ + get_extension, DataType, Extension, Field, IntervalUnit, Metadata, Schema, TimeUnit, +}; +use crate::io::ipc::endianess::is_native_little_endian; pub fn schema_to_fb_offset<'a>( fbb: &mut FlatBufferBuilder<'a>, @@ -294,7 +292,7 @@ fn get_data_type(field: ipc::Field, extension: Extension, may_be_dictionary: boo ipc::Type::Union => { let type_ = field.type_as_union().unwrap(); - let is_sparse = type_.mode() == UnionMode::Sparse; + let is_sparse = type_.mode() == ipc::UnionMode::Sparse; let ids = type_.typeIds().map(|x| x.iter().collect()); @@ -378,7 +376,7 @@ pub(crate) fn build_field<'a>( let fb_field_name = fbb.create_string(field.name().as_str()); let field_type = get_fb_field_type(field.data_type(), field.is_nullable(), fbb); - let fb_dictionary = if let Dictionary(index_type, inner) = field.data_type() { + let fb_dictionary = if let DataType::Dictionary(index_type, inner) = field.data_type() { if let DataType::Extension(name, _, metadata) = inner.as_ref() { write_extension(fbb, name, metadata, &mut kv_vec); } @@ -428,6 +426,7 @@ pub(crate) fn build_field<'a>( } fn type_to_field_type(data_type: &DataType) -> ipc::Type { + use DataType::*; match data_type { Null => ipc::Type::Null, Boolean => ipc::Type::Bool, @@ -461,6 +460,7 @@ pub(crate) fn get_fb_field_type<'a>( is_nullable: bool, fbb: &mut FlatBufferBuilder<'a>, ) -> FbFieldType<'a> { + use DataType::*; let type_type = type_to_field_type(data_type); // some IPC implementations expect an empty list for child data, instead of a null value. @@ -711,9 +711,9 @@ pub(crate) fn get_fb_field_type<'a>( let mut builder = ipc::UnionBuilder::new(fbb); builder.add_mode(if *is_sparse { - UnionMode::Sparse + ipc::UnionMode::Sparse } else { - UnionMode::Dense + ipc::UnionMode::Dense }); if let Some(ids) = ids { @@ -745,6 +745,7 @@ pub(crate) fn get_fb_dictionary<'a>( dict_is_ordered: bool, fbb: &mut FlatBufferBuilder<'a>, ) -> WIPOffset> { + use DataType::*; // We assume that the dictionary index type (as an integer) has already been // validated elsewhere, and can safely assume we are dealing with integers let mut index_builder = ipc::IntBuilder::new(fbb); diff --git a/src/io/ipc/gen/File.rs b/src/io/ipc/gen/File.rs deleted file mode 100644 index a5a1512ce95..00000000000 --- a/src/io/ipc/gen/File.rs +++ /dev/null @@ -1,471 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#![allow(dead_code)] -#![allow(unused_imports)] - -use super::Schema::*; -use flatbuffers::EndianScalar; -use std::{cmp::Ordering, mem}; -// automatically generated by the FlatBuffers compiler, do not modify - -// struct Block, aligned to 8 -#[repr(transparent)] -#[derive(Clone, Copy, PartialEq)] -pub struct Block(pub [u8; 24]); -impl std::fmt::Debug for Block { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - f.debug_struct("Block") - .field("offset", &self.offset()) - .field("metaDataLength", &self.metaDataLength()) - .field("bodyLength", &self.bodyLength()) - .finish() - } -} - -impl flatbuffers::SimpleToVerifyInSlice for Block {} -impl flatbuffers::SafeSliceAccess for Block {} -impl<'a> flatbuffers::Follow<'a> for Block { - type Inner = &'a Block; - #[inline] - fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { - <&'a Block>::follow(buf, loc) - } -} -impl<'a> flatbuffers::Follow<'a> for &'a Block { - type Inner = &'a Block; - #[inline] - fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { - flatbuffers::follow_cast_ref::(buf, loc) - } -} -impl<'b> flatbuffers::Push for Block { - type Output = Block; - #[inline] - fn push(&self, dst: &mut [u8], _rest: &[u8]) { - let src = unsafe { - ::std::slice::from_raw_parts(self as *const Block as *const u8, Self::size()) - }; - dst.copy_from_slice(src); - } -} -impl<'b> flatbuffers::Push for &'b Block { - type Output = Block; - - #[inline] - fn push(&self, dst: &mut [u8], _rest: &[u8]) { - let src = unsafe { - ::std::slice::from_raw_parts(*self as *const Block as *const u8, Self::size()) - }; - dst.copy_from_slice(src); - } -} - -impl<'a> flatbuffers::Verifiable for Block { - #[inline] - fn run_verifier( - v: &mut flatbuffers::Verifier, - pos: usize, - ) -> Result<(), flatbuffers::InvalidFlatbuffer> { - use flatbuffers::Verifiable; - v.in_buffer::(pos) - } -} -impl Block { - #[allow(clippy::too_many_arguments)] - pub fn new(offset: i64, metaDataLength: i32, bodyLength: i64) -> Self { - let mut s = Self([0; 24]); - s.set_offset(offset); - s.set_metaDataLength(metaDataLength); - s.set_bodyLength(bodyLength); - s - } - - /// Index to the start of the RecordBlock (note this is past the Message header) - pub fn offset(&self) -> i64 { - let mut mem = core::mem::MaybeUninit::::uninit(); - unsafe { - core::ptr::copy_nonoverlapping( - self.0[0..].as_ptr(), - mem.as_mut_ptr() as *mut u8, - core::mem::size_of::(), - ); - mem.assume_init() - } - .from_little_endian() - } - - pub fn set_offset(&mut self, x: i64) { - let x_le = x.to_little_endian(); - unsafe { - core::ptr::copy_nonoverlapping( - &x_le as *const i64 as *const u8, - self.0[0..].as_mut_ptr(), - core::mem::size_of::(), - ); - } - } - - /// Length of the metadata - pub fn metaDataLength(&self) -> i32 { - let mut mem = core::mem::MaybeUninit::::uninit(); - unsafe { - core::ptr::copy_nonoverlapping( - self.0[8..].as_ptr(), - mem.as_mut_ptr() as *mut u8, - core::mem::size_of::(), - ); - mem.assume_init() - } - .from_little_endian() - } - - pub fn set_metaDataLength(&mut self, x: i32) { - let x_le = x.to_little_endian(); - unsafe { - core::ptr::copy_nonoverlapping( - &x_le as *const i32 as *const u8, - self.0[8..].as_mut_ptr(), - core::mem::size_of::(), - ); - } - } - - /// Length of the data (this is aligned so there can be a gap between this and - /// the metadata). - pub fn bodyLength(&self) -> i64 { - let mut mem = core::mem::MaybeUninit::::uninit(); - unsafe { - core::ptr::copy_nonoverlapping( - self.0[16..].as_ptr(), - mem.as_mut_ptr() as *mut u8, - core::mem::size_of::(), - ); - mem.assume_init() - } - .from_little_endian() - } - - pub fn set_bodyLength(&mut self, x: i64) { - let x_le = x.to_little_endian(); - unsafe { - core::ptr::copy_nonoverlapping( - &x_le as *const i64 as *const u8, - self.0[16..].as_mut_ptr(), - core::mem::size_of::(), - ); - } - } -} - -pub enum FooterOffset {} -#[derive(Copy, Clone, PartialEq)] - -/// ---------------------------------------------------------------------- -/// Arrow File metadata -/// -pub struct Footer<'a> { - pub _tab: flatbuffers::Table<'a>, -} - -impl<'a> flatbuffers::Follow<'a> for Footer<'a> { - type Inner = Footer<'a>; - #[inline] - fn follow(buf: &'a [u8], loc: usize) -> Self::Inner { - Self { - _tab: flatbuffers::Table { buf, loc }, - } - } -} - -impl<'a> Footer<'a> { - #[inline] - pub fn init_from_table(table: flatbuffers::Table<'a>) -> Self { - Footer { _tab: table } - } - #[allow(unused_mut)] - pub fn create<'bldr: 'args, 'args: 'mut_bldr, 'mut_bldr>( - _fbb: &'mut_bldr mut flatbuffers::FlatBufferBuilder<'bldr>, - args: &'args FooterArgs<'args>, - ) -> flatbuffers::WIPOffset> { - let mut builder = FooterBuilder::new(_fbb); - if let Some(x) = args.custom_metadata { - builder.add_custom_metadata(x); - } - if let Some(x) = args.recordBatches { - builder.add_recordBatches(x); - } - if let Some(x) = args.dictionaries { - builder.add_dictionaries(x); - } - if let Some(x) = args.schema { - builder.add_schema(x); - } - builder.add_version(args.version); - builder.finish() - } - - pub const VT_VERSION: flatbuffers::VOffsetT = 4; - pub const VT_SCHEMA: flatbuffers::VOffsetT = 6; - pub const VT_DICTIONARIES: flatbuffers::VOffsetT = 8; - pub const VT_RECORDBATCHES: flatbuffers::VOffsetT = 10; - pub const VT_CUSTOM_METADATA: flatbuffers::VOffsetT = 12; - - #[inline] - pub fn version(&self) -> MetadataVersion { - self._tab - .get::(Footer::VT_VERSION, Some(MetadataVersion::V1)) - .unwrap() - } - #[inline] - pub fn schema(&self) -> Option> { - self._tab - .get::>(Footer::VT_SCHEMA, None) - } - #[inline] - pub fn dictionaries(&self) -> Option<&'a [Block]> { - self._tab - .get::>>( - Footer::VT_DICTIONARIES, - None, - ) - .map(|v| v.safe_slice()) - } - #[inline] - pub fn recordBatches(&self) -> Option<&'a [Block]> { - self._tab - .get::>>( - Footer::VT_RECORDBATCHES, - None, - ) - .map(|v| v.safe_slice()) - } - /// User-defined metadata - #[inline] - pub fn custom_metadata( - &self, - ) -> Option>>> { - self._tab.get::>, - >>(Footer::VT_CUSTOM_METADATA, None) - } -} - -impl flatbuffers::Verifiable for Footer<'_> { - #[inline] - fn run_verifier( - v: &mut flatbuffers::Verifier, - pos: usize, - ) -> Result<(), flatbuffers::InvalidFlatbuffer> { - use flatbuffers::Verifiable; - v.visit_table(pos)? - .visit_field::(&"version", Self::VT_VERSION, false)? - .visit_field::>(&"schema", Self::VT_SCHEMA, false)? - .visit_field::>>( - &"dictionaries", - Self::VT_DICTIONARIES, - false, - )? - .visit_field::>>( - &"recordBatches", - Self::VT_RECORDBATCHES, - false, - )? - .visit_field::>, - >>(&"custom_metadata", Self::VT_CUSTOM_METADATA, false)? - .finish(); - Ok(()) - } -} -pub struct FooterArgs<'a> { - pub version: MetadataVersion, - pub schema: Option>>, - pub dictionaries: Option>>, - pub recordBatches: Option>>, - pub custom_metadata: Option< - flatbuffers::WIPOffset>>>, - >, -} -impl<'a> Default for FooterArgs<'a> { - #[inline] - fn default() -> Self { - FooterArgs { - version: MetadataVersion::V1, - schema: None, - dictionaries: None, - recordBatches: None, - custom_metadata: None, - } - } -} -pub struct FooterBuilder<'a: 'b, 'b> { - fbb_: &'b mut flatbuffers::FlatBufferBuilder<'a>, - start_: flatbuffers::WIPOffset, -} -impl<'a: 'b, 'b> FooterBuilder<'a, 'b> { - #[inline] - pub fn add_version(&mut self, version: MetadataVersion) { - self.fbb_ - .push_slot::(Footer::VT_VERSION, version, MetadataVersion::V1); - } - #[inline] - pub fn add_schema(&mut self, schema: flatbuffers::WIPOffset>) { - self.fbb_ - .push_slot_always::>(Footer::VT_SCHEMA, schema); - } - #[inline] - pub fn add_dictionaries( - &mut self, - dictionaries: flatbuffers::WIPOffset>, - ) { - self.fbb_ - .push_slot_always::>(Footer::VT_DICTIONARIES, dictionaries); - } - #[inline] - pub fn add_recordBatches( - &mut self, - recordBatches: flatbuffers::WIPOffset>, - ) { - self.fbb_ - .push_slot_always::>(Footer::VT_RECORDBATCHES, recordBatches); - } - #[inline] - pub fn add_custom_metadata( - &mut self, - custom_metadata: flatbuffers::WIPOffset< - flatbuffers::Vector<'b, flatbuffers::ForwardsUOffset>>, - >, - ) { - self.fbb_.push_slot_always::>( - Footer::VT_CUSTOM_METADATA, - custom_metadata, - ); - } - #[inline] - pub fn new(_fbb: &'b mut flatbuffers::FlatBufferBuilder<'a>) -> FooterBuilder<'a, 'b> { - let start = _fbb.start_table(); - FooterBuilder { - fbb_: _fbb, - start_: start, - } - } - #[inline] - pub fn finish(self) -> flatbuffers::WIPOffset> { - let o = self.fbb_.end_table(self.start_); - flatbuffers::WIPOffset::new(o.value()) - } -} - -impl std::fmt::Debug for Footer<'_> { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - let mut ds = f.debug_struct("Footer"); - ds.field("version", &self.version()); - ds.field("schema", &self.schema()); - ds.field("dictionaries", &self.dictionaries()); - ds.field("recordBatches", &self.recordBatches()); - ds.field("custom_metadata", &self.custom_metadata()); - ds.finish() - } -} -#[inline] -#[deprecated(since = "2.0.0", note = "Deprecated in favor of `root_as...` methods.")] -pub fn get_root_as_footer<'a>(buf: &'a [u8]) -> Footer<'a> { - unsafe { flatbuffers::root_unchecked::>(buf) } -} - -#[inline] -#[deprecated(since = "2.0.0", note = "Deprecated in favor of `root_as...` methods.")] -pub fn get_size_prefixed_root_as_footer<'a>(buf: &'a [u8]) -> Footer<'a> { - unsafe { flatbuffers::size_prefixed_root_unchecked::>(buf) } -} - -#[inline] -/// Verifies that a buffer of bytes contains a `Footer` -/// and returns it. -/// Note that verification is still experimental and may not -/// catch every error, or be maximally performant. For the -/// previous, unchecked, behavior use -/// `root_as_footer_unchecked`. -pub fn root_as_footer(buf: &[u8]) -> Result { - flatbuffers::root::