Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Update integration-testing
Browse files Browse the repository at this point in the history
Signed-off-by: Chojan Shang <[email protected]>
  • Loading branch information
PsiACE committed Oct 27, 2021
1 parent b9fca94 commit 548387e
Show file tree
Hide file tree
Showing 11 changed files with 57 additions and 85 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ indexmap = { version = "^1.6", optional = true }
# used to print columns in a nice columnar format
comfy-table = { version = "4.0", optional = true, default-features = false }

arrow-format = { git = "https://github.com/PsiACE/arrow-format.git", optional = true, features = ["ipc"] }
arrow-format = { git = "https://github.com/DataEngineeringLabs/arrow-format.git", optional = true, features = ["ipc"] }
flatbuffers = { version = "=2.0.0", optional = true }

hex = { version = "^0.4", optional = true }
Expand Down
2 changes: 1 addition & 1 deletion integration-testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ logging = ["tracing-subscriber"]

[dependencies]
arrow2 = { path = "../", features = ["io_ipc", "io_ipc_compression", "io_flight", "io_json_integration"] }
arrow-format = { git = "https://github.com/PsiACE/arrow-format.git", features = ["ipc", "flight"] }
arrow-format = { git = "https://github.com/DataEngineeringLabs/arrow-format.git", features = ["ipc", "flight"] }
async-trait = "0.1.41"
clap = "2.33"
futures = "0.3"
Expand Down
7 changes: 2 additions & 5 deletions integration-testing/src/bin/flight-test-integration-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ async fn main() -> Result {
let port = matches.value_of("port").expect("Port is required");

match matches.value_of("scenario") {
Some("middleware") => {
flight_client_scenarios::middleware::run_scenario(host, port).await?
}
Some("middleware") => flight_client_scenarios::middleware::run_scenario(host, port).await?,
Some("auth:basic_proto") => {
flight_client_scenarios::auth_basic_proto::run_scenario(host, port).await?
}
Expand All @@ -53,8 +51,7 @@ async fn main() -> Result {
let path = matches
.value_of("path")
.expect("Path is required if scenario is not specified");
flight_client_scenarios::integration_test::run_scenario(host, port, path)
.await?;
flight_client_scenarios::integration_test::run_scenario(host, port, path).await?;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ async fn main() -> Result {
let port = matches.value_of("port").unwrap_or("0");

match matches.value_of("scenario") {
Some("middleware") => {
flight_server_scenarios::middleware::scenario_setup(port).await?
}
Some("middleware") => flight_server_scenarios::middleware::scenario_setup(port).await?,
Some("auth:basic_proto") => {
flight_server_scenarios::auth_basic_proto::scenario_setup(port).await?
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@

use crate::{AUTH_PASSWORD, AUTH_USERNAME};

use arrow_format::flight::data::{Action, HandshakeRequest, BasicAuth};
use arrow_format::flight::service::{
flight_service_client::FlightServiceClient,
};
use arrow_format::flight::flight_service_client::FlightServiceClient;
use arrow_format::flight::{Action, BasicAuth, HandshakeRequest};
use futures::{stream, StreamExt};
use prost::Message;
use tonic::{metadata::MetadataValue, Request, Status};
Expand Down Expand Up @@ -81,11 +79,7 @@ pub async fn run_scenario(host: &str, port: &str) -> Result {
Ok(())
}

async fn authenticate(
client: &mut Client,
username: &str,
password: &str,
) -> Result<String> {
async fn authenticate(client: &mut Client, username: &str, password: &str) -> Result<String> {
let auth = BasicAuth {
username: username.into(),
password: password.into(),
Expand Down
28 changes: 16 additions & 12 deletions integration-testing/src/flight_client_scenarios/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@

use crate::{read_json_file, ArrowFile};

use arrow2::{array::*, datatypes::*, io::flight::{self, deserialize_batch, serialize_batch}, io::ipc::{read, write}, record_batch::RecordBatch};
use arrow2::{
array::*,
datatypes::*,
io::flight::{self, deserialize_batch, serialize_batch},
io::ipc::{read, write},
record_batch::RecordBatch,
};
use arrow_format::flight::flight_service_client::FlightServiceClient;
use arrow_format::flight::{
flight_descriptor::DescriptorType, FlightData, FlightDescriptor, Location, Ticket,
};
use arrow_format::ipc;
use arrow_format::ipc::Message::MessageHeader;
use arrow_format::flight::data::{
flight_descriptor::DescriptorType,
FlightData, FlightDescriptor, Location, Ticket,
};
use arrow_format::flight::service::flight_service_client::FlightServiceClient;
use futures::{channel::mpsc, sink::SinkExt, stream, StreamExt};
use tonic::{Request, Streaming};

Expand Down Expand Up @@ -126,8 +131,7 @@ async fn send_batch(
batch: &RecordBatch,
options: &write::IpcWriteOptions,
) -> Result {
let (dictionary_flight_data, mut batch_flight_data) =
serialize_batch(batch, options);
let (dictionary_flight_data, mut batch_flight_data) = serialize_batch(batch, options);

upload_tx
.send_all(&mut stream::iter(dictionary_flight_data).map(Ok))
Expand Down Expand Up @@ -210,9 +214,8 @@ async fn consume_flight_location(
let metadata = counter.to_string().into_bytes();
assert_eq!(metadata, data.app_metadata);

let actual_batch =
deserialize_batch(&data, schema.clone(), true, &dictionaries_by_field)
.expect("Unable to convert flight data to Arrow batch");
let actual_batch = deserialize_batch(&data, schema.clone(), true, &dictionaries_by_field)
.expect("Unable to convert flight data to Arrow batch");

assert_eq!(expected_batch.schema(), actual_batch.schema());
assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
Expand Down Expand Up @@ -262,7 +265,8 @@ async fn receive_batch_flight_data(
.expect("Error reading dictionary");

data = resp.next().await?.ok()?;
message = ipc::Message::root_as_message(&data.data_header[..]).expect("Error parsing message");
message =
ipc::Message::root_as_message(&data.data_header[..]).expect("Error parsing message");
}

Some(data)
Expand Down
6 changes: 2 additions & 4 deletions integration-testing/src/flight_client_scenarios/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use arrow_format::flight::service::{
flight_service_client::FlightServiceClient,
};
use arrow_format::flight::data::{flight_descriptor::DescriptorType, FlightDescriptor};
use arrow_format::flight::flight_service_client::FlightServiceClient;
use arrow_format::flight::{flight_descriptor::DescriptorType, FlightDescriptor};
use tonic::{Request, Status};

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
Expand Down
2 changes: 1 addition & 1 deletion integration-testing/src/flight_server_scenarios.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use std::net::SocketAddr;

use arrow_format::flight::data::{FlightEndpoint, Location, Ticket};
use arrow_format::flight::{FlightEndpoint, Location, Ticket};
use tokio::net::TcpListener;

pub mod auth_basic_proto;
Expand Down
36 changes: 14 additions & 22 deletions integration-testing/src/flight_server_scenarios/auth_basic_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@
use std::pin::Pin;
use std::sync::Arc;

use arrow_format::flight::service::flight_service_server::{FlightService, FlightServiceServer};
use arrow_format::flight::data::*;
use arrow_format::flight::flight_service_server::{FlightService, FlightServiceServer};
use arrow_format::flight::*;
use futures::{channel::mpsc, sink::SinkExt, Stream, StreamExt};
use tokio::sync::Mutex;
use tonic::{
metadata::MetadataMap, transport::Server, Request, Response, Status, Streaming,
};
use tonic::{metadata::MetadataMap, transport::Server, Request, Response, Status, Streaming};

type TonicStream<T> = Pin<Box<dyn Stream<Item = T> + Send + Sync + 'static>>;

Expand Down Expand Up @@ -60,21 +58,15 @@ pub struct AuthBasicProtoScenarioImpl {
}

impl AuthBasicProtoScenarioImpl {
async fn check_auth(
&self,
metadata: &MetadataMap,
) -> Result<GrpcServerCallContext, Status> {
async fn check_auth(&self, metadata: &MetadataMap) -> Result<GrpcServerCallContext, Status> {
let token = metadata
.get_bin("auth-token-bin")
.and_then(|v| v.to_bytes().ok())
.and_then(|b| String::from_utf8(b.to_vec()).ok());
self.is_valid(token).await
}

async fn is_valid(
&self,
token: Option<String>,
) -> Result<GrpcServerCallContext, Status> {
async fn is_valid(&self, token: Option<String>) -> Result<GrpcServerCallContext, Status> {
match token {
Some(t) if t == *self.username => Ok(GrpcServerCallContext {
peer_identity: self.username.to_string(),
Expand All @@ -100,7 +92,7 @@ impl FlightService for AuthBasicProtoScenarioImpl {
type ListFlightsStream = TonicStream<Result<FlightInfo, Status>>;
type DoGetStream = TonicStream<Result<FlightData, Status>>;
type DoPutStream = TonicStream<Result<PutResult, Status>>;
type DoActionStream = TonicStream<Result<arrow_format::flight::data::Result, Status>>;
type DoActionStream = TonicStream<Result<arrow_format::flight::Result, Status>>;
type ListActionsStream = TonicStream<Result<ActionType, Status>>;
type DoExchangeStream = TonicStream<Result<FlightData, Status>>;

Expand Down Expand Up @@ -139,12 +131,10 @@ impl FlightService for AuthBasicProtoScenarioImpl {
let req = req.expect("Error reading handshake request");
let HandshakeRequest { payload, .. } = req;

let auth = BasicAuth::decode(&*payload)
.expect("Error parsing handshake request");
let auth =
BasicAuth::decode(&*payload).expect("Error parsing handshake request");

let resp = if *auth.username == *username
&& *auth.password == *password
{
let resp = if *auth.username == *username && *auth.password == *password {
Ok(HandshakeResponse {
payload: username.as_bytes().to_vec(),
..HandshakeResponse::default()
Expand Down Expand Up @@ -189,7 +179,8 @@ impl FlightService for AuthBasicProtoScenarioImpl {
&self,
request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoPutStream>, Status> {
self.check_auth(request.metadata()).await?;
let metadata = request.metadata();
self.check_auth(metadata).await?;
Err(Status::unimplemented("Not yet implemented"))
}

Expand All @@ -200,7 +191,7 @@ impl FlightService for AuthBasicProtoScenarioImpl {
let flight_context = self.check_auth(request.metadata()).await?;
// Respond with the authenticated username.
let buf = flight_context.peer_identity().as_bytes().to_vec();
let result = arrow_format::flight::data::Result { body: buf };
let result = arrow_format::flight::Result { body: buf };
let output = futures::stream::once(async { Ok(result) });
Ok(Response::new(Box::pin(output) as Self::DoActionStream))
}
Expand All @@ -217,7 +208,8 @@ impl FlightService for AuthBasicProtoScenarioImpl {
&self,
request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoExchangeStream>, Status> {
self.check_auth(request.metadata()).await?;
let metadata = request.metadata();
self.check_auth(metadata).await?;
Err(Status::unimplemented("Not yet implemented"))
}
}
32 changes: 11 additions & 21 deletions integration-testing/src/flight_server_scenarios/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,20 @@
// under the License.

use std::collections::HashMap;
use std::convert::TryFrom;
use std::pin::Pin;
use std::sync::Arc;
use std::convert::TryFrom;

use arrow2::io::flight::{serialize_batch, serialize_schema};
use arrow_format::flight::data::flight_descriptor::*;
use arrow_format::flight::service::flight_service_server::*;
use arrow_format::flight::data::*;
use arrow_format::flight::flight_descriptor::*;
use arrow_format::flight::flight_service_server::*;
use arrow_format::flight::*;
use arrow_format::ipc::Message::{root_as_message, Message, MessageHeader};
use arrow_format::ipc::Schema as ArrowSchema;
use arrow_format::ipc::Message::{Message, MessageHeader, root_as_message};

use arrow2::{
array::Array,
datatypes::*,
array::Array, datatypes::*, io::flight::serialize_schema_to_info, io::ipc,
record_batch::RecordBatch,
io::ipc,
io::flight::serialize_schema_to_info
};

use futures::{channel::mpsc, sink::SinkExt, Stream, StreamExt};
Expand Down Expand Up @@ -85,7 +82,7 @@ impl FlightService for FlightServiceImpl {
type ListFlightsStream = TonicStream<Result<FlightInfo, Status>>;
type DoGetStream = TonicStream<Result<FlightData, Status>>;
type DoPutStream = TonicStream<Result<PutResult, Status>>;
type DoActionStream = TonicStream<Result<arrow_format::flight::data::Result, Status>>;
type DoActionStream = TonicStream<Result<arrow_format::flight::Result, Status>>;
type ListActionsStream = TonicStream<Result<ActionType, Status>>;
type DoExchangeStream = TonicStream<Result<FlightData, Status>>;

Expand Down Expand Up @@ -113,12 +110,7 @@ impl FlightService for FlightServiceImpl {

let options = ipc::write::IpcWriteOptions::default();

let schema = std::iter::once({
Ok(serialize_schema(
&flight.schema,
&options,
))
});
let schema = std::iter::once(Ok(serialize_schema(&flight.schema, &options)));

let batches = flight
.chunks
Expand Down Expand Up @@ -180,12 +172,10 @@ impl FlightService for FlightServiceImpl {
let total_records: usize = flight.chunks.iter().map(|chunk| chunk.num_rows()).sum();

let options = ipc::write::IpcWriteOptions::default();
let schema =
serialize_schema_to_info(&flight.schema, &options)
.expect(
"Could not generate schema bytes from schema stored by a DoPut; \
let schema = serialize_schema_to_info(&flight.schema, &options).expect(
"Could not generate schema bytes from schema stored by a DoPut; \
this should be impossible",
);
);

let info = FlightInfo {
schema,
Expand Down
11 changes: 5 additions & 6 deletions integration-testing/src/flight_server_scenarios/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

use std::pin::Pin;

use arrow_format::flight::data::flight_descriptor::DescriptorType;
use arrow_format::flight::service::flight_service_server::{FlightService, FlightServiceServer};
use arrow_format::flight::data::*;
use arrow_format::flight::flight_descriptor::DescriptorType;
use arrow_format::flight::flight_service_server::{FlightService, FlightServiceServer};
use arrow_format::flight::*;
use futures::Stream;
use tonic::{transport::Server, Request, Response, Status, Streaming};

Expand Down Expand Up @@ -50,7 +50,7 @@ impl FlightService for MiddlewareScenarioImpl {
type ListFlightsStream = TonicStream<Result<FlightInfo, Status>>;
type DoGetStream = TonicStream<Result<FlightData, Status>>;
type DoPutStream = TonicStream<Result<PutResult, Status>>;
type DoActionStream = TonicStream<Result<arrow_format::flight::data::Result, Status>>;
type DoActionStream = TonicStream<Result<arrow_format::flight::Result, Status>>;
type ListActionsStream = TonicStream<Result<ActionType, Status>>;
type DoExchangeStream = TonicStream<Result<FlightData, Status>>;

Expand Down Expand Up @@ -90,8 +90,7 @@ impl FlightService for MiddlewareScenarioImpl {

let descriptor = request.into_inner();

if descriptor.r#type == DescriptorType::Cmd as i32 && descriptor.cmd == b"success"
{
if descriptor.r#type == DescriptorType::Cmd as i32 && descriptor.cmd == b"success" {
// Return a fake location - the test doesn't read it
let endpoint = super::endpoint("foo", "grpc+tcp://localhost:10010");

Expand Down

0 comments on commit 548387e

Please sign in to comment.