From c6f5371e9d1822ba9bdb160686c5e9a41b2ba2f4 Mon Sep 17 00:00:00 2001 From: "Jorge C. Leitao" Date: Sat, 2 Jul 2022 15:40:56 +0000 Subject: [PATCH] Simpler code --- integration-testing/README.md | 3 +- .../integration_test.rs | 122 ++++++++---------- .../src/flight_client_scenarios/middleware.rs | 1 - 3 files changed, 53 insertions(+), 73 deletions(-) diff --git a/integration-testing/README.md b/integration-testing/README.md index 824ebf4a721..766f77b1151 100644 --- a/integration-testing/README.md +++ b/integration-testing/README.md @@ -21,10 +21,11 @@ cargo run --bin flight-test-integration-client -- --host localhost --port 3333 - to run an integration test against a file, use ```bash -FILE="../testing/arrow-testing/data/arrow-ipc-stream/integration/1.0.0-littleendian/generated_primitive.json.gz" +FILE="../testing/arrow-testing/data/arrow-ipc-stream/integration/1.0.0-littleendian/generated_dictionary.json.gz" gzip -dc $FILE > generated.json cargo build --bin flight-test-integration-server cargo run --bin flight-test-integration-server -- --port 3333 & cargo run --bin flight-test-integration-client -- --host localhost --port 3333 --path generated.json +# kill with `fg` and stop process ``` diff --git a/integration-testing/src/flight_client_scenarios/integration_test.rs b/integration-testing/src/flight_client_scenarios/integration_test.rs index c421339cd2e..3cdcd17f2e7 100644 --- a/integration-testing/src/flight_client_scenarios/integration_test.rs +++ b/integration-testing/src/flight_client_scenarios/integration_test.rs @@ -38,7 +38,7 @@ use arrow_format::{ }, ipc::planus::ReadAsRoot, }; -use futures::{channel::mpsc, sink::SinkExt, stream, StreamExt}; +use futures::{stream::BoxStream, StreamExt, TryStreamExt}; use tonic::{Request, Streaming}; type Error = Box; @@ -59,6 +59,7 @@ pub async fn run_scenario(host: &str, port: u16, path: &str) -> Result { fields, .. } = read_json_file(path)?; + let ipc_schema = IpcSchema { fields, is_little_endian: true, @@ -71,9 +72,9 @@ pub async fn run_scenario(host: &str, port: u16, path: &str) -> Result { upload_data( client.clone(), &schema, - &ipc_schema.fields, + ipc_schema.fields.clone(), descriptor.clone(), - &chunks, + chunks.clone(), ) .await?; verify_data(client, descriptor, &schema, &ipc_schema, &chunks).await?; @@ -84,77 +85,56 @@ pub async fn run_scenario(host: &str, port: u16, path: &str) -> Result { async fn upload_data( mut client: Client, schema: &Schema, - fields: &[IpcField], + fields: Vec, descriptor: FlightDescriptor, - chunks: &[ChunkBox], + chunks: Vec, ) -> Result { - let (mut upload_tx, upload_rx) = mpsc::channel(10); - - let options = write::WriteOptions { compression: None }; - - let mut schema = flight::serialize_schema(schema, Some(fields)); - schema.flight_descriptor = Some(descriptor.clone()); - upload_tx.send(schema).await?; - - let mut original_data_iter = chunks.iter().enumerate(); + let stream = new_stream(schema, fields, descriptor, chunks); - if let Some((counter, first_chunk)) = original_data_iter.next() { - let metadata = counter.to_string().into_bytes(); - // Preload the first chunk into the channel before starting the request - send_chunk(&mut upload_tx, &metadata, first_chunk, fields, &options).await?; - - let outer = client.do_put(Request::new(upload_rx)).await?; - let mut inner = outer.into_inner(); + // put the stream in the client + let responses = client.do_put(Request::new(stream)).await?.into_inner(); - let r = inner - .next() - .await - .expect("No response received") - .expect("Invalid response received"); - assert_eq!(metadata, r.app_metadata); - - // Stream the rest of the chunkes - for (counter, chunk) in original_data_iter { - let metadata = counter.to_string().into_bytes(); - send_chunk(&mut upload_tx, &metadata, chunk, fields, &options).await?; - - let r = inner - .next() - .await - .expect("No response received") - .expect("Invalid response received"); - assert_eq!(metadata, r.app_metadata); - } - drop(upload_tx); - assert!( - inner.next().await.is_none(), - "Should not receive more results" - ); - } else { - drop(upload_tx); - client.do_put(Request::new(upload_rx)).await?; - } + // confirm that all chunks were received in the right order + let results = responses.try_collect::>().await?; + assert!(results + .into_iter() + // only record batches have a metadata; ignore dictionary batches + .filter(|r| !r.app_metadata.is_empty()) + .enumerate() + .all(|(counter, r)| r.app_metadata == counter.to_string().as_bytes())); Ok(()) } -async fn send_chunk( - upload_tx: &mut mpsc::Sender, - metadata: &[u8], - chunk: &ChunkBox, - fields: &[IpcField], - options: &write::WriteOptions, -) -> Result { - let (dictionary_flight_data, mut chunk_flight_data) = serialize_batch(chunk, fields, options)?; +fn new_stream( + schema: &Schema, + fields: Vec, + descriptor: FlightDescriptor, + chunks: Vec, +) -> BoxStream<'static, FlightData> { + let options = write::WriteOptions { compression: None }; - upload_tx - .send_all(&mut stream::iter(dictionary_flight_data).map(Ok)) - .await?; + let mut schema = flight::serialize_schema(schema, Some(&fields)); + schema.flight_descriptor = Some(descriptor); - // Only the record chunk's FlightData gets app_metadata - chunk_flight_data.app_metadata = metadata.to_vec(); - upload_tx.send(chunk_flight_data).await?; - Ok(()) + // iterator of [dictionaries0, chunk0, dictionaries1, chunk1, ...] + let iter = chunks + .into_iter() + .enumerate() + .flat_map(move |(counter, chunk)| { + let metadata = counter.to_string().into_bytes(); + let (mut dictionaries, mut chunk) = serialize_batch(&chunk, &fields, &options).unwrap(); + + // assign `app_metadata` to chunks + chunk.app_metadata = metadata.to_vec(); + dictionaries.push(chunk); + dictionaries + }); + + // the stream as per flight spec: the schema followed by stream of chunks + futures::stream::once(futures::future::ready(schema)) + .chain(futures::stream::iter(iter)) + .boxed() } async fn verify_data( @@ -209,16 +189,16 @@ async fn consume_flight_location( let mut client = FlightServiceClient::connect(location.uri).await?; let resp = client.do_get(ticket).await?; - let mut resp = resp.into_inner(); + let mut stream = resp.into_inner(); // We already have the schema from the FlightInfo, but the server sends it again as the // first FlightData. Ignore this one. - let _schema_again = resp.next().await.unwrap(); + let _schema_again = stream.next().await.unwrap(); let mut dictionaries = Default::default(); for (counter, expected_chunk) in expected_chunks.iter().enumerate() { - let data = read_dictionaries(&mut resp, &schema.fields, ipc_schema, &mut dictionaries) + let data = read_dictionaries(&mut stream, &schema.fields, ipc_schema, &mut dictionaries) .await .unwrap_or_else(|| { panic!( @@ -238,7 +218,7 @@ async fn consume_flight_location( } assert!( - resp.next().await.is_none(), + stream.next().await.is_none(), "Got more chunkes than the expected: {}", expected_chunks.len(), ); @@ -247,12 +227,12 @@ async fn consume_flight_location( } async fn read_dictionaries( - resp: &mut Streaming, + stream: &mut Streaming, fields: &[Field], ipc_schema: &IpcSchema, dictionaries: &mut Dictionaries, ) -> Option { - let mut data = resp.next().await?.ok()?; + let mut data = stream.next().await?.ok()?; let mut message = ipc::MessageRef::read_as_root(&data.data_header).expect("Error parsing first message"); @@ -275,7 +255,7 @@ async fn read_dictionaries( ) .expect("Error reading dictionary"); - data = resp.next().await?.ok()?; + data = stream.next().await?.ok()?; message = ipc::MessageRef::read_as_root(&data.data_header).expect("Error parsing message"); } diff --git a/integration-testing/src/flight_client_scenarios/middleware.rs b/integration-testing/src/flight_client_scenarios/middleware.rs index 29c96ce67ea..c5fd752768c 100644 --- a/integration-testing/src/flight_client_scenarios/middleware.rs +++ b/integration-testing/src/flight_client_scenarios/middleware.rs @@ -73,7 +73,6 @@ pub async fn run_scenario(host: &str, port: u16) -> Result { Ok(()) } -#[allow(clippy::unnecessary_wraps)] fn middleware_interceptor(mut req: Request<()>) -> Result, Status> { let metadata = req.metadata_mut(); metadata.insert("x-middleware", "expected value".parse().unwrap());