diff --git a/Cargo.toml b/Cargo.toml index 1f24ce970a4..9df4de61945 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -39,7 +39,7 @@ indexmap = { version = "^1.6", optional = true } # used to print columns in a nice columnar format comfy-table = { version = "4.0", optional = true, default-features = false } -arrow-format = { git = "https://github.com/PsiACE/arrow-format.git", optional = true, features = ["ipc"] } +arrow-format = { git = "https://github.com/DataEngineeringLabs/arrow-format.git", optional = true, features = ["ipc"] } flatbuffers = { version = "=2.0.0", optional = true } hex = { version = "^0.4", optional = true } diff --git a/integration-testing/Cargo.toml b/integration-testing/Cargo.toml index b8351927d3e..0c01dc776f8 100644 --- a/integration-testing/Cargo.toml +++ b/integration-testing/Cargo.toml @@ -29,7 +29,7 @@ logging = ["tracing-subscriber"] [dependencies] arrow2 = { path = "../", features = ["io_ipc", "io_ipc_compression", "io_flight", "io_json_integration"] } -arrow-format = { git = "https://github.com/PsiACE/arrow-format.git", features = ["ipc", "flight"] } +arrow-format = { git = "https://github.com/DataEngineeringLabs/arrow-format.git", features = ["ipc", "flight"] } async-trait = "0.1.41" clap = "2.33" futures = "0.3" diff --git a/integration-testing/src/bin/flight-test-integration-client.rs b/integration-testing/src/bin/flight-test-integration-client.rs index 1901553109f..b7bb5d1d79f 100644 --- a/integration-testing/src/bin/flight-test-integration-client.rs +++ b/integration-testing/src/bin/flight-test-integration-client.rs @@ -42,9 +42,7 @@ async fn main() -> Result { let port = matches.value_of("port").expect("Port is required"); match matches.value_of("scenario") { - Some("middleware") => { - flight_client_scenarios::middleware::run_scenario(host, port).await? - } + Some("middleware") => flight_client_scenarios::middleware::run_scenario(host, port).await?, Some("auth:basic_proto") => { flight_client_scenarios::auth_basic_proto::run_scenario(host, port).await? } @@ -53,8 +51,7 @@ async fn main() -> Result { let path = matches .value_of("path") .expect("Path is required if scenario is not specified"); - flight_client_scenarios::integration_test::run_scenario(host, port, path) - .await?; + flight_client_scenarios::integration_test::run_scenario(host, port, path).await?; } } diff --git a/integration-testing/src/bin/flight-test-integration-server.rs b/integration-testing/src/bin/flight-test-integration-server.rs index b1b280743c3..45a08080499 100644 --- a/integration-testing/src/bin/flight-test-integration-server.rs +++ b/integration-testing/src/bin/flight-test-integration-server.rs @@ -40,9 +40,7 @@ async fn main() -> Result { let port = matches.value_of("port").unwrap_or("0"); match matches.value_of("scenario") { - Some("middleware") => { - flight_server_scenarios::middleware::scenario_setup(port).await? - } + Some("middleware") => flight_server_scenarios::middleware::scenario_setup(port).await?, Some("auth:basic_proto") => { flight_server_scenarios::auth_basic_proto::scenario_setup(port).await? } diff --git a/integration-testing/src/flight_client_scenarios/auth_basic_proto.rs b/integration-testing/src/flight_client_scenarios/auth_basic_proto.rs index 1b07e6358d5..45aa952f7e2 100644 --- a/integration-testing/src/flight_client_scenarios/auth_basic_proto.rs +++ b/integration-testing/src/flight_client_scenarios/auth_basic_proto.rs @@ -17,10 +17,8 @@ use crate::{AUTH_PASSWORD, AUTH_USERNAME}; -use arrow_format::flight::data::{Action, HandshakeRequest, BasicAuth}; -use arrow_format::flight::service::{ - flight_service_client::FlightServiceClient, -}; +use arrow_format::flight::flight_service_client::FlightServiceClient; +use arrow_format::flight::{Action, BasicAuth, HandshakeRequest}; use futures::{stream, StreamExt}; use prost::Message; use tonic::{metadata::MetadataValue, Request, Status}; @@ -81,11 +79,7 @@ pub async fn run_scenario(host: &str, port: &str) -> Result { Ok(()) } -async fn authenticate( - client: &mut Client, - username: &str, - password: &str, -) -> Result { +async fn authenticate(client: &mut Client, username: &str, password: &str) -> Result { let auth = BasicAuth { username: username.into(), password: password.into(), diff --git a/integration-testing/src/flight_client_scenarios/integration_test.rs b/integration-testing/src/flight_client_scenarios/integration_test.rs index 98fe0c2192b..f237866b97a 100644 --- a/integration-testing/src/flight_client_scenarios/integration_test.rs +++ b/integration-testing/src/flight_client_scenarios/integration_test.rs @@ -17,14 +17,19 @@ use crate::{read_json_file, ArrowFile}; -use arrow2::{array::*, datatypes::*, io::flight::{self, deserialize_batch, serialize_batch}, io::ipc::{read, write}, record_batch::RecordBatch}; +use arrow2::{ + array::*, + datatypes::*, + io::flight::{self, deserialize_batch, serialize_batch}, + io::ipc::{read, write}, + record_batch::RecordBatch, +}; +use arrow_format::flight::flight_service_client::FlightServiceClient; +use arrow_format::flight::{ + flight_descriptor::DescriptorType, FlightData, FlightDescriptor, Location, Ticket, +}; use arrow_format::ipc; use arrow_format::ipc::Message::MessageHeader; -use arrow_format::flight::data::{ - flight_descriptor::DescriptorType, - FlightData, FlightDescriptor, Location, Ticket, -}; -use arrow_format::flight::service::flight_service_client::FlightServiceClient; use futures::{channel::mpsc, sink::SinkExt, stream, StreamExt}; use tonic::{Request, Streaming}; @@ -126,8 +131,7 @@ async fn send_batch( batch: &RecordBatch, options: &write::IpcWriteOptions, ) -> Result { - let (dictionary_flight_data, mut batch_flight_data) = - serialize_batch(batch, options); + let (dictionary_flight_data, mut batch_flight_data) = serialize_batch(batch, options); upload_tx .send_all(&mut stream::iter(dictionary_flight_data).map(Ok)) @@ -210,9 +214,8 @@ async fn consume_flight_location( let metadata = counter.to_string().into_bytes(); assert_eq!(metadata, data.app_metadata); - let actual_batch = - deserialize_batch(&data, schema.clone(), true, &dictionaries_by_field) - .expect("Unable to convert flight data to Arrow batch"); + let actual_batch = deserialize_batch(&data, schema.clone(), true, &dictionaries_by_field) + .expect("Unable to convert flight data to Arrow batch"); assert_eq!(expected_batch.schema(), actual_batch.schema()); assert_eq!(expected_batch.num_columns(), actual_batch.num_columns()); @@ -262,7 +265,8 @@ async fn receive_batch_flight_data( .expect("Error reading dictionary"); data = resp.next().await?.ok()?; - message = ipc::Message::root_as_message(&data.data_header[..]).expect("Error parsing message"); + message = + ipc::Message::root_as_message(&data.data_header[..]).expect("Error parsing message"); } Some(data) diff --git a/integration-testing/src/flight_client_scenarios/middleware.rs b/integration-testing/src/flight_client_scenarios/middleware.rs index 0694fef55c7..8a2ef6c05d6 100644 --- a/integration-testing/src/flight_client_scenarios/middleware.rs +++ b/integration-testing/src/flight_client_scenarios/middleware.rs @@ -15,10 +15,8 @@ // specific language governing permissions and limitations // under the License. -use arrow_format::flight::service::{ - flight_service_client::FlightServiceClient, -}; -use arrow_format::flight::data::{flight_descriptor::DescriptorType, FlightDescriptor}; +use arrow_format::flight::flight_service_client::FlightServiceClient; +use arrow_format::flight::{flight_descriptor::DescriptorType, FlightDescriptor}; use tonic::{Request, Status}; type Error = Box; diff --git a/integration-testing/src/flight_server_scenarios.rs b/integration-testing/src/flight_server_scenarios.rs index a8aab14712e..259b7b57533 100644 --- a/integration-testing/src/flight_server_scenarios.rs +++ b/integration-testing/src/flight_server_scenarios.rs @@ -17,7 +17,7 @@ use std::net::SocketAddr; -use arrow_format::flight::data::{FlightEndpoint, Location, Ticket}; +use arrow_format::flight::{FlightEndpoint, Location, Ticket}; use tokio::net::TcpListener; pub mod auth_basic_proto; diff --git a/integration-testing/src/flight_server_scenarios/auth_basic_proto.rs b/integration-testing/src/flight_server_scenarios/auth_basic_proto.rs index 5223aaa297b..bc7ff956796 100644 --- a/integration-testing/src/flight_server_scenarios/auth_basic_proto.rs +++ b/integration-testing/src/flight_server_scenarios/auth_basic_proto.rs @@ -18,13 +18,11 @@ use std::pin::Pin; use std::sync::Arc; -use arrow_format::flight::service::flight_service_server::{FlightService, FlightServiceServer}; -use arrow_format::flight::data::*; +use arrow_format::flight::flight_service_server::{FlightService, FlightServiceServer}; +use arrow_format::flight::*; use futures::{channel::mpsc, sink::SinkExt, Stream, StreamExt}; use tokio::sync::Mutex; -use tonic::{ - metadata::MetadataMap, transport::Server, Request, Response, Status, Streaming, -}; +use tonic::{metadata::MetadataMap, transport::Server, Request, Response, Status, Streaming}; type TonicStream = Pin + Send + Sync + 'static>>; @@ -60,10 +58,7 @@ pub struct AuthBasicProtoScenarioImpl { } impl AuthBasicProtoScenarioImpl { - async fn check_auth( - &self, - metadata: &MetadataMap, - ) -> Result { + async fn check_auth(&self, metadata: &MetadataMap) -> Result { let token = metadata .get_bin("auth-token-bin") .and_then(|v| v.to_bytes().ok()) @@ -71,10 +66,7 @@ impl AuthBasicProtoScenarioImpl { self.is_valid(token).await } - async fn is_valid( - &self, - token: Option, - ) -> Result { + async fn is_valid(&self, token: Option) -> Result { match token { Some(t) if t == *self.username => Ok(GrpcServerCallContext { peer_identity: self.username.to_string(), @@ -100,7 +92,7 @@ impl FlightService for AuthBasicProtoScenarioImpl { type ListFlightsStream = TonicStream>; type DoGetStream = TonicStream>; type DoPutStream = TonicStream>; - type DoActionStream = TonicStream>; + type DoActionStream = TonicStream>; type ListActionsStream = TonicStream>; type DoExchangeStream = TonicStream>; @@ -139,12 +131,10 @@ impl FlightService for AuthBasicProtoScenarioImpl { let req = req.expect("Error reading handshake request"); let HandshakeRequest { payload, .. } = req; - let auth = BasicAuth::decode(&*payload) - .expect("Error parsing handshake request"); + let auth = + BasicAuth::decode(&*payload).expect("Error parsing handshake request"); - let resp = if *auth.username == *username - && *auth.password == *password - { + let resp = if *auth.username == *username && *auth.password == *password { Ok(HandshakeResponse { payload: username.as_bytes().to_vec(), ..HandshakeResponse::default() @@ -189,7 +179,8 @@ impl FlightService for AuthBasicProtoScenarioImpl { &self, request: Request>, ) -> Result, Status> { - self.check_auth(request.metadata()).await?; + let metadata = request.metadata(); + self.check_auth(metadata).await?; Err(Status::unimplemented("Not yet implemented")) } @@ -200,7 +191,7 @@ impl FlightService for AuthBasicProtoScenarioImpl { let flight_context = self.check_auth(request.metadata()).await?; // Respond with the authenticated username. let buf = flight_context.peer_identity().as_bytes().to_vec(); - let result = arrow_format::flight::data::Result { body: buf }; + let result = arrow_format::flight::Result { body: buf }; let output = futures::stream::once(async { Ok(result) }); Ok(Response::new(Box::pin(output) as Self::DoActionStream)) } @@ -217,7 +208,8 @@ impl FlightService for AuthBasicProtoScenarioImpl { &self, request: Request>, ) -> Result, Status> { - self.check_auth(request.metadata()).await?; + let metadata = request.metadata(); + self.check_auth(metadata).await?; Err(Status::unimplemented("Not yet implemented")) } } diff --git a/integration-testing/src/flight_server_scenarios/integration_test.rs b/integration-testing/src/flight_server_scenarios/integration_test.rs index 28ded196fc0..6418ec73291 100644 --- a/integration-testing/src/flight_server_scenarios/integration_test.rs +++ b/integration-testing/src/flight_server_scenarios/integration_test.rs @@ -16,23 +16,20 @@ // under the License. use std::collections::HashMap; +use std::convert::TryFrom; use std::pin::Pin; use std::sync::Arc; -use std::convert::TryFrom; use arrow2::io::flight::{serialize_batch, serialize_schema}; -use arrow_format::flight::data::flight_descriptor::*; -use arrow_format::flight::service::flight_service_server::*; -use arrow_format::flight::data::*; +use arrow_format::flight::flight_descriptor::*; +use arrow_format::flight::flight_service_server::*; +use arrow_format::flight::*; +use arrow_format::ipc::Message::{root_as_message, Message, MessageHeader}; use arrow_format::ipc::Schema as ArrowSchema; -use arrow_format::ipc::Message::{Message, MessageHeader, root_as_message}; use arrow2::{ - array::Array, - datatypes::*, + array::Array, datatypes::*, io::flight::serialize_schema_to_info, io::ipc, record_batch::RecordBatch, - io::ipc, - io::flight::serialize_schema_to_info }; use futures::{channel::mpsc, sink::SinkExt, Stream, StreamExt}; @@ -85,7 +82,7 @@ impl FlightService for FlightServiceImpl { type ListFlightsStream = TonicStream>; type DoGetStream = TonicStream>; type DoPutStream = TonicStream>; - type DoActionStream = TonicStream>; + type DoActionStream = TonicStream>; type ListActionsStream = TonicStream>; type DoExchangeStream = TonicStream>; @@ -113,12 +110,7 @@ impl FlightService for FlightServiceImpl { let options = ipc::write::IpcWriteOptions::default(); - let schema = std::iter::once({ - Ok(serialize_schema( - &flight.schema, - &options, - )) - }); + let schema = std::iter::once(Ok(serialize_schema(&flight.schema, &options))); let batches = flight .chunks @@ -180,12 +172,10 @@ impl FlightService for FlightServiceImpl { let total_records: usize = flight.chunks.iter().map(|chunk| chunk.num_rows()).sum(); let options = ipc::write::IpcWriteOptions::default(); - let schema = - serialize_schema_to_info(&flight.schema, &options) - .expect( - "Could not generate schema bytes from schema stored by a DoPut; \ + let schema = serialize_schema_to_info(&flight.schema, &options).expect( + "Could not generate schema bytes from schema stored by a DoPut; \ this should be impossible", - ); + ); let info = FlightInfo { schema, diff --git a/integration-testing/src/flight_server_scenarios/middleware.rs b/integration-testing/src/flight_server_scenarios/middleware.rs index ed686cfc7a4..afc1e817d65 100644 --- a/integration-testing/src/flight_server_scenarios/middleware.rs +++ b/integration-testing/src/flight_server_scenarios/middleware.rs @@ -17,9 +17,9 @@ use std::pin::Pin; -use arrow_format::flight::data::flight_descriptor::DescriptorType; -use arrow_format::flight::service::flight_service_server::{FlightService, FlightServiceServer}; -use arrow_format::flight::data::*; +use arrow_format::flight::flight_descriptor::DescriptorType; +use arrow_format::flight::flight_service_server::{FlightService, FlightServiceServer}; +use arrow_format::flight::*; use futures::Stream; use tonic::{transport::Server, Request, Response, Status, Streaming}; @@ -50,7 +50,7 @@ impl FlightService for MiddlewareScenarioImpl { type ListFlightsStream = TonicStream>; type DoGetStream = TonicStream>; type DoPutStream = TonicStream>; - type DoActionStream = TonicStream>; + type DoActionStream = TonicStream>; type ListActionsStream = TonicStream>; type DoExchangeStream = TonicStream>; @@ -90,8 +90,7 @@ impl FlightService for MiddlewareScenarioImpl { let descriptor = request.into_inner(); - if descriptor.r#type == DescriptorType::Cmd as i32 && descriptor.cmd == b"success" - { + if descriptor.r#type == DescriptorType::Cmd as i32 && descriptor.cmd == b"success" { // Return a fake location - the test doesn't read it let endpoint = super::endpoint("foo", "grpc+tcp://localhost:10010");