From b9fca9433a95f677e93f0acaea2893faf2007520 Mon Sep 17 00:00:00 2001 From: Chojan Shang Date: Tue, 26 Oct 2021 10:02:51 +0800 Subject: [PATCH 1/4] Try to bump deps Signed-off-by: Chojan Shang --- Cargo.toml | 9 +++++---- integration-testing/Cargo.toml | 8 ++++---- src/io/flight/mod.rs | 2 +- src/io/ipc/convert.rs | 4 +--- src/io/ipc/read/reader.rs | 2 +- src/io/ipc/write/common.rs | 2 +- src/io/ipc/write/schema.rs | 2 +- src/io/ipc/write/writer.rs | 2 +- 8 files changed, 15 insertions(+), 16 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 73642d0f39c..1f24ce970a4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ bench = false [dependencies] num-traits = "0.2" chrono = { version = "0.4", default_features = false, features = ["std"] } -chrono-tz = { version = "0.5", optional = true } +chrono-tz = { version = "0.6", optional = true } # To efficiently cast numbers to strings lexical-core = { version = "0.8", optional = true } # We need to Hash values before sending them to an hasher. This @@ -39,7 +39,8 @@ indexmap = { version = "^1.6", optional = true } # used to print columns in a nice columnar format comfy-table = { version = "4.0", optional = true, default-features = false } -arrow-format = { version = "*", optional = true, features = ["ipc"] } +arrow-format = { git = "https://github.com/PsiACE/arrow-format.git", optional = true, features = ["ipc"] } +flatbuffers = { version = "=2.0.0", optional = true } hex = { version = "^0.4", optional = true } @@ -109,9 +110,9 @@ io_csv = ["io_csv_read", "io_csv_write"] io_csv_read = ["csv", "lexical-core"] io_csv_write = ["csv", "streaming-iterator", "lexical-core"] io_json = ["serde", "serde_json", "indexmap"] -io_ipc = ["arrow-format"] +io_ipc = ["arrow-format", "flatbuffers"] io_ipc_compression = ["lz4", "zstd"] -io_flight = ["io_ipc", "arrow-format/flight-data"] +io_flight = ["io_ipc", "arrow-format/flight"] io_parquet_compression = [ "parquet2/zstd", "parquet2/snappy", diff --git a/integration-testing/Cargo.toml b/integration-testing/Cargo.toml index 9c2782e760c..b8351927d3e 100644 --- a/integration-testing/Cargo.toml +++ b/integration-testing/Cargo.toml @@ -29,15 +29,15 @@ logging = ["tracing-subscriber"] [dependencies] arrow2 = { path = "../", features = ["io_ipc", "io_ipc_compression", "io_flight", "io_json_integration"] } -arrow-format = { version = "*", features = ["ipc", "flight-service"] } +arrow-format = { git = "https://github.com/PsiACE/arrow-format.git", features = ["ipc", "flight"] } async-trait = "0.1.41" clap = "2.33" futures = "0.3" hex = "0.4" -prost = "0.8" +prost = "0.9" serde = { version = "1.0", features = ["rc"] } serde_derive = "1.0" serde_json = { version = "1.0", features = ["preserve_order"] } tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] } -tonic = "0.5.2" -tracing-subscriber = { version = "0.2.15", optional = true } +tonic = "0.6.0" +tracing-subscriber = { version = "0.3.1", optional = true } diff --git a/src/io/flight/mod.rs b/src/io/flight/mod.rs index ab4b0eb9283..90c9fd499fa 100644 --- a/src/io/flight/mod.rs +++ b/src/io/flight/mod.rs @@ -1,7 +1,7 @@ use std::convert::TryFrom; use std::sync::Arc; -use arrow_format::flight::data::{FlightData, SchemaResult}; +use arrow_format::flight::{FlightData, SchemaResult}; use arrow_format::ipc; use crate::{ diff --git a/src/io/ipc/convert.rs b/src/io/ipc/convert.rs index b69cc83bad6..d4b520854ce 100644 --- a/src/io/ipc/convert.rs +++ b/src/io/ipc/convert.rs @@ -17,9 +17,7 @@ //! Utilities for converting between IPC types and native Arrow types -use arrow_format::ipc::flatbuffers::{ - FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, WIPOffset, -}; +use flatbuffers::{FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, WIPOffset}; use std::collections::{BTreeMap, HashMap}; mod ipc { pub use arrow_format::ipc::File::*; diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index 6f2707a6412..c5fe14928fd 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -19,7 +19,7 @@ use std::io::{Read, Seek, SeekFrom}; use std::sync::Arc; use arrow_format::ipc; -use arrow_format::ipc::flatbuffers::VerifierOptions; +use flatbuffers::VerifierOptions; use crate::array::*; use crate::datatypes::Schema; diff --git a/src/io/ipc/write/common.rs b/src/io/ipc/write/common.rs index ae5151e20cc..d3ccd3b6d5b 100644 --- a/src/io/ipc/write/common.rs +++ b/src/io/ipc/write/common.rs @@ -20,7 +20,7 @@ use std::io::Write; use std::{collections::HashMap, sync::Arc}; use arrow_format::ipc; -use arrow_format::ipc::flatbuffers::FlatBufferBuilder; +use flatbuffers::FlatBufferBuilder; use crate::array::Array; use crate::error::{ArrowError, Result}; diff --git a/src/io/ipc/write/schema.rs b/src/io/ipc/write/schema.rs index c56e6136fbe..4f2ada82627 100644 --- a/src/io/ipc/write/schema.rs +++ b/src/io/ipc/write/schema.rs @@ -1,5 +1,5 @@ use arrow_format::ipc; -use arrow_format::ipc::flatbuffers::FlatBufferBuilder; +use flatbuffers::FlatBufferBuilder; use crate::datatypes::*; diff --git a/src/io/ipc/write/writer.rs b/src/io/ipc/write/writer.rs index 8c604e9cab8..9d69f0805c3 100644 --- a/src/io/ipc/write/writer.rs +++ b/src/io/ipc/write/writer.rs @@ -23,7 +23,7 @@ use std::io::Write; use arrow_format::ipc; -use arrow_format::ipc::flatbuffers::FlatBufferBuilder; +use flatbuffers::FlatBufferBuilder; use super::super::ARROW_MAGIC; use super::{ From 548387ead025d951d3e57ff525d4b7ad91ed8303 Mon Sep 17 00:00:00 2001 From: Chojan Shang Date: Wed, 27 Oct 2021 22:04:57 +0800 Subject: [PATCH 2/4] Update integration-testing Signed-off-by: Chojan Shang --- Cargo.toml | 2 +- integration-testing/Cargo.toml | 2 +- .../src/bin/flight-test-integration-client.rs | 7 ++-- .../src/bin/flight-test-integration-server.rs | 4 +-- .../auth_basic_proto.rs | 12 ++----- .../integration_test.rs | 28 ++++++++------- .../src/flight_client_scenarios/middleware.rs | 6 ++-- .../src/flight_server_scenarios.rs | 2 +- .../auth_basic_proto.rs | 36 ++++++++----------- .../integration_test.rs | 32 ++++++----------- .../src/flight_server_scenarios/middleware.rs | 11 +++--- 11 files changed, 57 insertions(+), 85 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 1f24ce970a4..9df4de61945 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,7 +39,7 @@ indexmap = { version = "^1.6", optional = true } # used to print columns in a nice columnar format comfy-table = { version = "4.0", optional = true, default-features = false } -arrow-format = { git = "https://github.com/PsiACE/arrow-format.git", optional = true, features = ["ipc"] } +arrow-format = { git = "https://github.com/DataEngineeringLabs/arrow-format.git", optional = true, features = ["ipc"] } flatbuffers = { version = "=2.0.0", optional = true } hex = { version = "^0.4", optional = true } diff --git a/integration-testing/Cargo.toml b/integration-testing/Cargo.toml index b8351927d3e..0c01dc776f8 100644 --- a/integration-testing/Cargo.toml +++ b/integration-testing/Cargo.toml @@ -29,7 +29,7 @@ logging = ["tracing-subscriber"] [dependencies] arrow2 = { path = "../", features = ["io_ipc", "io_ipc_compression", "io_flight", "io_json_integration"] } -arrow-format = { git = "https://github.com/PsiACE/arrow-format.git", features = ["ipc", "flight"] } +arrow-format = { git = "https://github.com/DataEngineeringLabs/arrow-format.git", features = ["ipc", "flight"] } async-trait = "0.1.41" clap = "2.33" futures = "0.3" diff --git a/integration-testing/src/bin/flight-test-integration-client.rs b/integration-testing/src/bin/flight-test-integration-client.rs index 1901553109f..b7bb5d1d79f 100644 --- a/integration-testing/src/bin/flight-test-integration-client.rs +++ b/integration-testing/src/bin/flight-test-integration-client.rs @@ -42,9 +42,7 @@ async fn main() -> Result { let port = matches.value_of("port").expect("Port is required"); match matches.value_of("scenario") { - Some("middleware") => { - flight_client_scenarios::middleware::run_scenario(host, port).await? - } + Some("middleware") => flight_client_scenarios::middleware::run_scenario(host, port).await?, Some("auth:basic_proto") => { flight_client_scenarios::auth_basic_proto::run_scenario(host, port).await? } @@ -53,8 +51,7 @@ async fn main() -> Result { let path = matches .value_of("path") .expect("Path is required if scenario is not specified"); - flight_client_scenarios::integration_test::run_scenario(host, port, path) - .await?; + flight_client_scenarios::integration_test::run_scenario(host, port, path).await?; } } diff --git a/integration-testing/src/bin/flight-test-integration-server.rs b/integration-testing/src/bin/flight-test-integration-server.rs index b1b280743c3..45a08080499 100644 --- a/integration-testing/src/bin/flight-test-integration-server.rs +++ b/integration-testing/src/bin/flight-test-integration-server.rs @@ -40,9 +40,7 @@ async fn main() -> Result { let port = matches.value_of("port").unwrap_or("0"); match matches.value_of("scenario") { - Some("middleware") => { - flight_server_scenarios::middleware::scenario_setup(port).await? - } + Some("middleware") => flight_server_scenarios::middleware::scenario_setup(port).await?, Some("auth:basic_proto") => { flight_server_scenarios::auth_basic_proto::scenario_setup(port).await? } diff --git a/integration-testing/src/flight_client_scenarios/auth_basic_proto.rs b/integration-testing/src/flight_client_scenarios/auth_basic_proto.rs index 1b07e6358d5..45aa952f7e2 100644 --- a/integration-testing/src/flight_client_scenarios/auth_basic_proto.rs +++ b/integration-testing/src/flight_client_scenarios/auth_basic_proto.rs @@ -17,10 +17,8 @@ use crate::{AUTH_PASSWORD, AUTH_USERNAME}; -use arrow_format::flight::data::{Action, HandshakeRequest, BasicAuth}; -use arrow_format::flight::service::{ - flight_service_client::FlightServiceClient, -}; +use arrow_format::flight::flight_service_client::FlightServiceClient; +use arrow_format::flight::{Action, BasicAuth, HandshakeRequest}; use futures::{stream, StreamExt}; use prost::Message; use tonic::{metadata::MetadataValue, Request, Status}; @@ -81,11 +79,7 @@ pub async fn run_scenario(host: &str, port: &str) -> Result { Ok(()) } -async fn authenticate( - client: &mut Client, - username: &str, - password: &str, -) -> Result { +async fn authenticate(client: &mut Client, username: &str, password: &str) -> Result { let auth = BasicAuth { username: username.into(), password: password.into(), diff --git a/integration-testing/src/flight_client_scenarios/integration_test.rs b/integration-testing/src/flight_client_scenarios/integration_test.rs index 98fe0c2192b..f237866b97a 100644 --- a/integration-testing/src/flight_client_scenarios/integration_test.rs +++ b/integration-testing/src/flight_client_scenarios/integration_test.rs @@ -17,14 +17,19 @@ use crate::{read_json_file, ArrowFile}; -use arrow2::{array::*, datatypes::*, io::flight::{self, deserialize_batch, serialize_batch}, io::ipc::{read, write}, record_batch::RecordBatch}; +use arrow2::{ + array::*, + datatypes::*, + io::flight::{self, deserialize_batch, serialize_batch}, + io::ipc::{read, write}, + record_batch::RecordBatch, +}; +use arrow_format::flight::flight_service_client::FlightServiceClient; +use arrow_format::flight::{ + flight_descriptor::DescriptorType, FlightData, FlightDescriptor, Location, Ticket, +}; use arrow_format::ipc; use arrow_format::ipc::Message::MessageHeader; -use arrow_format::flight::data::{ - flight_descriptor::DescriptorType, - FlightData, FlightDescriptor, Location, Ticket, -}; -use arrow_format::flight::service::flight_service_client::FlightServiceClient; use futures::{channel::mpsc, sink::SinkExt, stream, StreamExt}; use tonic::{Request, Streaming}; @@ -126,8 +131,7 @@ async fn send_batch( batch: &RecordBatch, options: &write::IpcWriteOptions, ) -> Result { - let (dictionary_flight_data, mut batch_flight_data) = - serialize_batch(batch, options); + let (dictionary_flight_data, mut batch_flight_data) = serialize_batch(batch, options); upload_tx .send_all(&mut stream::iter(dictionary_flight_data).map(Ok)) @@ -210,9 +214,8 @@ async fn consume_flight_location( let metadata = counter.to_string().into_bytes(); assert_eq!(metadata, data.app_metadata); - let actual_batch = - deserialize_batch(&data, schema.clone(), true, &dictionaries_by_field) - .expect("Unable to convert flight data to Arrow batch"); + let actual_batch = deserialize_batch(&data, schema.clone(), true, &dictionaries_by_field) + .expect("Unable to convert flight data to Arrow batch"); assert_eq!(expected_batch.schema(), actual_batch.schema()); assert_eq!(expected_batch.num_columns(), actual_batch.num_columns()); @@ -262,7 +265,8 @@ async fn receive_batch_flight_data( .expect("Error reading dictionary"); data = resp.next().await?.ok()?; - message = ipc::Message::root_as_message(&data.data_header[..]).expect("Error parsing message"); + message = + ipc::Message::root_as_message(&data.data_header[..]).expect("Error parsing message"); } Some(data) diff --git a/integration-testing/src/flight_client_scenarios/middleware.rs b/integration-testing/src/flight_client_scenarios/middleware.rs index 0694fef55c7..8a2ef6c05d6 100644 --- a/integration-testing/src/flight_client_scenarios/middleware.rs +++ b/integration-testing/src/flight_client_scenarios/middleware.rs @@ -15,10 +15,8 @@ // specific language governing permissions and limitations // under the License. -use arrow_format::flight::service::{ - flight_service_client::FlightServiceClient, -}; -use arrow_format::flight::data::{flight_descriptor::DescriptorType, FlightDescriptor}; +use arrow_format::flight::flight_service_client::FlightServiceClient; +use arrow_format::flight::{flight_descriptor::DescriptorType, FlightDescriptor}; use tonic::{Request, Status}; type Error = Box; diff --git a/integration-testing/src/flight_server_scenarios.rs b/integration-testing/src/flight_server_scenarios.rs index a8aab14712e..259b7b57533 100644 --- a/integration-testing/src/flight_server_scenarios.rs +++ b/integration-testing/src/flight_server_scenarios.rs @@ -17,7 +17,7 @@ use std::net::SocketAddr; -use arrow_format::flight::data::{FlightEndpoint, Location, Ticket}; +use arrow_format::flight::{FlightEndpoint, Location, Ticket}; use tokio::net::TcpListener; pub mod auth_basic_proto; diff --git a/integration-testing/src/flight_server_scenarios/auth_basic_proto.rs b/integration-testing/src/flight_server_scenarios/auth_basic_proto.rs index 5223aaa297b..bc7ff956796 100644 --- a/integration-testing/src/flight_server_scenarios/auth_basic_proto.rs +++ b/integration-testing/src/flight_server_scenarios/auth_basic_proto.rs @@ -18,13 +18,11 @@ use std::pin::Pin; use std::sync::Arc; -use arrow_format::flight::service::flight_service_server::{FlightService, FlightServiceServer}; -use arrow_format::flight::data::*; +use arrow_format::flight::flight_service_server::{FlightService, FlightServiceServer}; +use arrow_format::flight::*; use futures::{channel::mpsc, sink::SinkExt, Stream, StreamExt}; use tokio::sync::Mutex; -use tonic::{ - metadata::MetadataMap, transport::Server, Request, Response, Status, Streaming, -}; +use tonic::{metadata::MetadataMap, transport::Server, Request, Response, Status, Streaming}; type TonicStream = Pin + Send + Sync + 'static>>; @@ -60,10 +58,7 @@ pub struct AuthBasicProtoScenarioImpl { } impl AuthBasicProtoScenarioImpl { - async fn check_auth( - &self, - metadata: &MetadataMap, - ) -> Result { + async fn check_auth(&self, metadata: &MetadataMap) -> Result { let token = metadata .get_bin("auth-token-bin") .and_then(|v| v.to_bytes().ok()) @@ -71,10 +66,7 @@ impl AuthBasicProtoScenarioImpl { self.is_valid(token).await } - async fn is_valid( - &self, - token: Option, - ) -> Result { + async fn is_valid(&self, token: Option) -> Result { match token { Some(t) if t == *self.username => Ok(GrpcServerCallContext { peer_identity: self.username.to_string(), @@ -100,7 +92,7 @@ impl FlightService for AuthBasicProtoScenarioImpl { type ListFlightsStream = TonicStream>; type DoGetStream = TonicStream>; type DoPutStream = TonicStream>; - type DoActionStream = TonicStream>; + type DoActionStream = TonicStream>; type ListActionsStream = TonicStream>; type DoExchangeStream = TonicStream>; @@ -139,12 +131,10 @@ impl FlightService for AuthBasicProtoScenarioImpl { let req = req.expect("Error reading handshake request"); let HandshakeRequest { payload, .. } = req; - let auth = BasicAuth::decode(&*payload) - .expect("Error parsing handshake request"); + let auth = + BasicAuth::decode(&*payload).expect("Error parsing handshake request"); - let resp = if *auth.username == *username - && *auth.password == *password - { + let resp = if *auth.username == *username && *auth.password == *password { Ok(HandshakeResponse { payload: username.as_bytes().to_vec(), ..HandshakeResponse::default() @@ -189,7 +179,8 @@ impl FlightService for AuthBasicProtoScenarioImpl { &self, request: Request>, ) -> Result, Status> { - self.check_auth(request.metadata()).await?; + let metadata = request.metadata(); + self.check_auth(metadata).await?; Err(Status::unimplemented("Not yet implemented")) } @@ -200,7 +191,7 @@ impl FlightService for AuthBasicProtoScenarioImpl { let flight_context = self.check_auth(request.metadata()).await?; // Respond with the authenticated username. let buf = flight_context.peer_identity().as_bytes().to_vec(); - let result = arrow_format::flight::data::Result { body: buf }; + let result = arrow_format::flight::Result { body: buf }; let output = futures::stream::once(async { Ok(result) }); Ok(Response::new(Box::pin(output) as Self::DoActionStream)) } @@ -217,7 +208,8 @@ impl FlightService for AuthBasicProtoScenarioImpl { &self, request: Request>, ) -> Result, Status> { - self.check_auth(request.metadata()).await?; + let metadata = request.metadata(); + self.check_auth(metadata).await?; Err(Status::unimplemented("Not yet implemented")) } } diff --git a/integration-testing/src/flight_server_scenarios/integration_test.rs b/integration-testing/src/flight_server_scenarios/integration_test.rs index 28ded196fc0..6418ec73291 100644 --- a/integration-testing/src/flight_server_scenarios/integration_test.rs +++ b/integration-testing/src/flight_server_scenarios/integration_test.rs @@ -16,23 +16,20 @@ // under the License. use std::collections::HashMap; +use std::convert::TryFrom; use std::pin::Pin; use std::sync::Arc; -use std::convert::TryFrom; use arrow2::io::flight::{serialize_batch, serialize_schema}; -use arrow_format::flight::data::flight_descriptor::*; -use arrow_format::flight::service::flight_service_server::*; -use arrow_format::flight::data::*; +use arrow_format::flight::flight_descriptor::*; +use arrow_format::flight::flight_service_server::*; +use arrow_format::flight::*; +use arrow_format::ipc::Message::{root_as_message, Message, MessageHeader}; use arrow_format::ipc::Schema as ArrowSchema; -use arrow_format::ipc::Message::{Message, MessageHeader, root_as_message}; use arrow2::{ - array::Array, - datatypes::*, + array::Array, datatypes::*, io::flight::serialize_schema_to_info, io::ipc, record_batch::RecordBatch, - io::ipc, - io::flight::serialize_schema_to_info }; use futures::{channel::mpsc, sink::SinkExt, Stream, StreamExt}; @@ -85,7 +82,7 @@ impl FlightService for FlightServiceImpl { type ListFlightsStream = TonicStream>; type DoGetStream = TonicStream>; type DoPutStream = TonicStream>; - type DoActionStream = TonicStream>; + type DoActionStream = TonicStream>; type ListActionsStream = TonicStream>; type DoExchangeStream = TonicStream>; @@ -113,12 +110,7 @@ impl FlightService for FlightServiceImpl { let options = ipc::write::IpcWriteOptions::default(); - let schema = std::iter::once({ - Ok(serialize_schema( - &flight.schema, - &options, - )) - }); + let schema = std::iter::once(Ok(serialize_schema(&flight.schema, &options))); let batches = flight .chunks @@ -180,12 +172,10 @@ impl FlightService for FlightServiceImpl { let total_records: usize = flight.chunks.iter().map(|chunk| chunk.num_rows()).sum(); let options = ipc::write::IpcWriteOptions::default(); - let schema = - serialize_schema_to_info(&flight.schema, &options) - .expect( - "Could not generate schema bytes from schema stored by a DoPut; \ + let schema = serialize_schema_to_info(&flight.schema, &options).expect( + "Could not generate schema bytes from schema stored by a DoPut; \ this should be impossible", - ); + ); let info = FlightInfo { schema, diff --git a/integration-testing/src/flight_server_scenarios/middleware.rs b/integration-testing/src/flight_server_scenarios/middleware.rs index ed686cfc7a4..afc1e817d65 100644 --- a/integration-testing/src/flight_server_scenarios/middleware.rs +++ b/integration-testing/src/flight_server_scenarios/middleware.rs @@ -17,9 +17,9 @@ use std::pin::Pin; -use arrow_format::flight::data::flight_descriptor::DescriptorType; -use arrow_format::flight::service::flight_service_server::{FlightService, FlightServiceServer}; -use arrow_format::flight::data::*; +use arrow_format::flight::flight_descriptor::DescriptorType; +use arrow_format::flight::flight_service_server::{FlightService, FlightServiceServer}; +use arrow_format::flight::*; use futures::Stream; use tonic::{transport::Server, Request, Response, Status, Streaming}; @@ -50,7 +50,7 @@ impl FlightService for MiddlewareScenarioImpl { type ListFlightsStream = TonicStream>; type DoGetStream = TonicStream>; type DoPutStream = TonicStream>; - type DoActionStream = TonicStream>; + type DoActionStream = TonicStream>; type ListActionsStream = TonicStream>; type DoExchangeStream = TonicStream>; @@ -90,8 +90,7 @@ impl FlightService for MiddlewareScenarioImpl { let descriptor = request.into_inner(); - if descriptor.r#type == DescriptorType::Cmd as i32 && descriptor.cmd == b"success" - { + if descriptor.r#type == DescriptorType::Cmd as i32 && descriptor.cmd == b"success" { // Return a fake location - the test doesn't read it let endpoint = super::endpoint("foo", "grpc+tcp://localhost:10010"); From b7e107f76360758602593ccb3432058b5999250a Mon Sep 17 00:00:00 2001 From: Chojan Shang Date: Wed, 27 Oct 2021 22:46:25 +0800 Subject: [PATCH 3/4] Bump toolchain Signed-off-by: Chojan Shang --- .github/workflows/test.yml | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index c588c236422..1e147fcc9f5 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -74,7 +74,7 @@ jobs: - uses: actions/checkout@v2 - uses: actions-rs/toolchain@v1 with: - toolchain: nightly-2021-07-09 + toolchain: nightly-2021-10-24 override: true - uses: Swatinem/rust-cache@v1 - name: Install Miri @@ -93,7 +93,7 @@ jobs: - uses: actions/checkout@v2 - uses: actions-rs/toolchain@v1 with: - toolchain: nightly-2021-07-09 + toolchain: nightly-2021-10-24 override: true - uses: Swatinem/rust-cache@v1 - name: Install Miri @@ -112,6 +112,8 @@ jobs: - uses: actions/checkout@v2 with: submodules: true + - name: Install Rust + run: rustup update stable - name: Setup parquet files run: | apt update && apt install python3-pip python3-venv -y -q From 9e1f42fe7eaa09b6916f21d230cd266cac230f5a Mon Sep 17 00:00:00 2001 From: Chojan Shang Date: Thu, 28 Oct 2021 13:33:56 +0800 Subject: [PATCH 4/4] Bump arrow-format to v0.3.0 Signed-off-by: Chojan Shang --- Cargo.toml | 7 +++---- integration-testing/Cargo.toml | 2 +- .../src/flight_client_scenarios/auth_basic_proto.rs | 4 ++-- .../src/flight_client_scenarios/integration_test.rs | 4 ++-- .../src/flight_client_scenarios/middleware.rs | 4 ++-- integration-testing/src/flight_server_scenarios.rs | 2 +- .../src/flight_server_scenarios/auth_basic_proto.rs | 8 ++++---- .../src/flight_server_scenarios/integration_test.rs | 8 ++++---- .../src/flight_server_scenarios/middleware.rs | 8 ++++---- src/io/flight/mod.rs | 2 +- src/io/ipc/convert.rs | 4 +++- src/io/ipc/read/reader.rs | 2 +- src/io/ipc/write/common.rs | 2 +- src/io/ipc/write/schema.rs | 2 +- src/io/ipc/write/writer.rs | 2 +- 15 files changed, 31 insertions(+), 30 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 9df4de61945..b33954ec06f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,8 +39,7 @@ indexmap = { version = "^1.6", optional = true } # used to print columns in a nice columnar format comfy-table = { version = "4.0", optional = true, default-features = false } -arrow-format = { git = "https://github.com/DataEngineeringLabs/arrow-format.git", optional = true, features = ["ipc"] } -flatbuffers = { version = "=2.0.0", optional = true } +arrow-format = { version = "0.3.0", optional = true, features = ["ipc"] } hex = { version = "^0.4", optional = true } @@ -110,9 +109,9 @@ io_csv = ["io_csv_read", "io_csv_write"] io_csv_read = ["csv", "lexical-core"] io_csv_write = ["csv", "streaming-iterator", "lexical-core"] io_json = ["serde", "serde_json", "indexmap"] -io_ipc = ["arrow-format", "flatbuffers"] +io_ipc = ["arrow-format"] io_ipc_compression = ["lz4", "zstd"] -io_flight = ["io_ipc", "arrow-format/flight"] +io_flight = ["io_ipc", "arrow-format/flight-data"] io_parquet_compression = [ "parquet2/zstd", "parquet2/snappy", diff --git a/integration-testing/Cargo.toml b/integration-testing/Cargo.toml index 0c01dc776f8..a9487cb37bb 100644 --- a/integration-testing/Cargo.toml +++ b/integration-testing/Cargo.toml @@ -29,7 +29,7 @@ logging = ["tracing-subscriber"] [dependencies] arrow2 = { path = "../", features = ["io_ipc", "io_ipc_compression", "io_flight", "io_json_integration"] } -arrow-format = { git = "https://github.com/DataEngineeringLabs/arrow-format.git", features = ["ipc", "flight"] } +arrow-format = { version = "0.3.0", features = ["full"] } async-trait = "0.1.41" clap = "2.33" futures = "0.3" diff --git a/integration-testing/src/flight_client_scenarios/auth_basic_proto.rs b/integration-testing/src/flight_client_scenarios/auth_basic_proto.rs index 45aa952f7e2..58258e2164c 100644 --- a/integration-testing/src/flight_client_scenarios/auth_basic_proto.rs +++ b/integration-testing/src/flight_client_scenarios/auth_basic_proto.rs @@ -17,8 +17,8 @@ use crate::{AUTH_PASSWORD, AUTH_USERNAME}; -use arrow_format::flight::flight_service_client::FlightServiceClient; -use arrow_format::flight::{Action, BasicAuth, HandshakeRequest}; +use arrow_format::flight::data::{Action, BasicAuth, HandshakeRequest}; +use arrow_format::flight::service::flight_service_client::FlightServiceClient; use futures::{stream, StreamExt}; use prost::Message; use tonic::{metadata::MetadataValue, Request, Status}; diff --git a/integration-testing/src/flight_client_scenarios/integration_test.rs b/integration-testing/src/flight_client_scenarios/integration_test.rs index f237866b97a..7c67b34096e 100644 --- a/integration-testing/src/flight_client_scenarios/integration_test.rs +++ b/integration-testing/src/flight_client_scenarios/integration_test.rs @@ -24,10 +24,10 @@ use arrow2::{ io::ipc::{read, write}, record_batch::RecordBatch, }; -use arrow_format::flight::flight_service_client::FlightServiceClient; -use arrow_format::flight::{ +use arrow_format::flight::data::{ flight_descriptor::DescriptorType, FlightData, FlightDescriptor, Location, Ticket, }; +use arrow_format::flight::service::flight_service_client::FlightServiceClient; use arrow_format::ipc; use arrow_format::ipc::Message::MessageHeader; use futures::{channel::mpsc, sink::SinkExt, stream, StreamExt}; diff --git a/integration-testing/src/flight_client_scenarios/middleware.rs b/integration-testing/src/flight_client_scenarios/middleware.rs index 8a2ef6c05d6..f67580ada87 100644 --- a/integration-testing/src/flight_client_scenarios/middleware.rs +++ b/integration-testing/src/flight_client_scenarios/middleware.rs @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. -use arrow_format::flight::flight_service_client::FlightServiceClient; -use arrow_format::flight::{flight_descriptor::DescriptorType, FlightDescriptor}; +use arrow_format::flight::data::{flight_descriptor::DescriptorType, FlightDescriptor}; +use arrow_format::flight::service::flight_service_client::FlightServiceClient; use tonic::{Request, Status}; type Error = Box; diff --git a/integration-testing/src/flight_server_scenarios.rs b/integration-testing/src/flight_server_scenarios.rs index 259b7b57533..a8aab14712e 100644 --- a/integration-testing/src/flight_server_scenarios.rs +++ b/integration-testing/src/flight_server_scenarios.rs @@ -17,7 +17,7 @@ use std::net::SocketAddr; -use arrow_format::flight::{FlightEndpoint, Location, Ticket}; +use arrow_format::flight::data::{FlightEndpoint, Location, Ticket}; use tokio::net::TcpListener; pub mod auth_basic_proto; diff --git a/integration-testing/src/flight_server_scenarios/auth_basic_proto.rs b/integration-testing/src/flight_server_scenarios/auth_basic_proto.rs index bc7ff956796..baa181e4efc 100644 --- a/integration-testing/src/flight_server_scenarios/auth_basic_proto.rs +++ b/integration-testing/src/flight_server_scenarios/auth_basic_proto.rs @@ -18,8 +18,8 @@ use std::pin::Pin; use std::sync::Arc; -use arrow_format::flight::flight_service_server::{FlightService, FlightServiceServer}; -use arrow_format::flight::*; +use arrow_format::flight::data::*; +use arrow_format::flight::service::flight_service_server::{FlightService, FlightServiceServer}; use futures::{channel::mpsc, sink::SinkExt, Stream, StreamExt}; use tokio::sync::Mutex; use tonic::{metadata::MetadataMap, transport::Server, Request, Response, Status, Streaming}; @@ -92,7 +92,7 @@ impl FlightService for AuthBasicProtoScenarioImpl { type ListFlightsStream = TonicStream>; type DoGetStream = TonicStream>; type DoPutStream = TonicStream>; - type DoActionStream = TonicStream>; + type DoActionStream = TonicStream>; type ListActionsStream = TonicStream>; type DoExchangeStream = TonicStream>; @@ -191,7 +191,7 @@ impl FlightService for AuthBasicProtoScenarioImpl { let flight_context = self.check_auth(request.metadata()).await?; // Respond with the authenticated username. let buf = flight_context.peer_identity().as_bytes().to_vec(); - let result = arrow_format::flight::Result { body: buf }; + let result = arrow_format::flight::data::Result { body: buf }; let output = futures::stream::once(async { Ok(result) }); Ok(Response::new(Box::pin(output) as Self::DoActionStream)) } diff --git a/integration-testing/src/flight_server_scenarios/integration_test.rs b/integration-testing/src/flight_server_scenarios/integration_test.rs index 6418ec73291..4d3349bff44 100644 --- a/integration-testing/src/flight_server_scenarios/integration_test.rs +++ b/integration-testing/src/flight_server_scenarios/integration_test.rs @@ -21,9 +21,9 @@ use std::pin::Pin; use std::sync::Arc; use arrow2::io::flight::{serialize_batch, serialize_schema}; -use arrow_format::flight::flight_descriptor::*; -use arrow_format::flight::flight_service_server::*; -use arrow_format::flight::*; +use arrow_format::flight::data::*; +use arrow_format::flight::data::flight_descriptor::*; +use arrow_format::flight::service::flight_service_server::*; use arrow_format::ipc::Message::{root_as_message, Message, MessageHeader}; use arrow_format::ipc::Schema as ArrowSchema; @@ -82,7 +82,7 @@ impl FlightService for FlightServiceImpl { type ListFlightsStream = TonicStream>; type DoGetStream = TonicStream>; type DoPutStream = TonicStream>; - type DoActionStream = TonicStream>; + type DoActionStream = TonicStream>; type ListActionsStream = TonicStream>; type DoExchangeStream = TonicStream>; diff --git a/integration-testing/src/flight_server_scenarios/middleware.rs b/integration-testing/src/flight_server_scenarios/middleware.rs index afc1e817d65..ceffe5a8c9b 100644 --- a/integration-testing/src/flight_server_scenarios/middleware.rs +++ b/integration-testing/src/flight_server_scenarios/middleware.rs @@ -17,9 +17,9 @@ use std::pin::Pin; -use arrow_format::flight::flight_descriptor::DescriptorType; -use arrow_format::flight::flight_service_server::{FlightService, FlightServiceServer}; -use arrow_format::flight::*; +use arrow_format::flight::data::*; +use arrow_format::flight::data::flight_descriptor::DescriptorType; +use arrow_format::flight::service::flight_service_server::{FlightService, FlightServiceServer}; use futures::Stream; use tonic::{transport::Server, Request, Response, Status, Streaming}; @@ -50,7 +50,7 @@ impl FlightService for MiddlewareScenarioImpl { type ListFlightsStream = TonicStream>; type DoGetStream = TonicStream>; type DoPutStream = TonicStream>; - type DoActionStream = TonicStream>; + type DoActionStream = TonicStream>; type ListActionsStream = TonicStream>; type DoExchangeStream = TonicStream>; diff --git a/src/io/flight/mod.rs b/src/io/flight/mod.rs index 90c9fd499fa..ab4b0eb9283 100644 --- a/src/io/flight/mod.rs +++ b/src/io/flight/mod.rs @@ -1,7 +1,7 @@ use std::convert::TryFrom; use std::sync::Arc; -use arrow_format::flight::{FlightData, SchemaResult}; +use arrow_format::flight::data::{FlightData, SchemaResult}; use arrow_format::ipc; use crate::{ diff --git a/src/io/ipc/convert.rs b/src/io/ipc/convert.rs index d4b520854ce..b69cc83bad6 100644 --- a/src/io/ipc/convert.rs +++ b/src/io/ipc/convert.rs @@ -17,7 +17,9 @@ //! Utilities for converting between IPC types and native Arrow types -use flatbuffers::{FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, WIPOffset}; +use arrow_format::ipc::flatbuffers::{ + FlatBufferBuilder, ForwardsUOffset, UnionWIPOffset, Vector, WIPOffset, +}; use std::collections::{BTreeMap, HashMap}; mod ipc { pub use arrow_format::ipc::File::*; diff --git a/src/io/ipc/read/reader.rs b/src/io/ipc/read/reader.rs index c5fe14928fd..6f2707a6412 100644 --- a/src/io/ipc/read/reader.rs +++ b/src/io/ipc/read/reader.rs @@ -19,7 +19,7 @@ use std::io::{Read, Seek, SeekFrom}; use std::sync::Arc; use arrow_format::ipc; -use flatbuffers::VerifierOptions; +use arrow_format::ipc::flatbuffers::VerifierOptions; use crate::array::*; use crate::datatypes::Schema; diff --git a/src/io/ipc/write/common.rs b/src/io/ipc/write/common.rs index d3ccd3b6d5b..ae5151e20cc 100644 --- a/src/io/ipc/write/common.rs +++ b/src/io/ipc/write/common.rs @@ -20,7 +20,7 @@ use std::io::Write; use std::{collections::HashMap, sync::Arc}; use arrow_format::ipc; -use flatbuffers::FlatBufferBuilder; +use arrow_format::ipc::flatbuffers::FlatBufferBuilder; use crate::array::Array; use crate::error::{ArrowError, Result}; diff --git a/src/io/ipc/write/schema.rs b/src/io/ipc/write/schema.rs index 4f2ada82627..c56e6136fbe 100644 --- a/src/io/ipc/write/schema.rs +++ b/src/io/ipc/write/schema.rs @@ -1,5 +1,5 @@ use arrow_format::ipc; -use flatbuffers::FlatBufferBuilder; +use arrow_format::ipc::flatbuffers::FlatBufferBuilder; use crate::datatypes::*; diff --git a/src/io/ipc/write/writer.rs b/src/io/ipc/write/writer.rs index 9d69f0805c3..8c604e9cab8 100644 --- a/src/io/ipc/write/writer.rs +++ b/src/io/ipc/write/writer.rs @@ -23,7 +23,7 @@ use std::io::Write; use arrow_format::ipc; -use flatbuffers::FlatBufferBuilder; +use arrow_format::ipc::flatbuffers::FlatBufferBuilder; use super::super::ARROW_MAGIC; use super::{