Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Simpler code
Browse files Browse the repository at this point in the history
  • Loading branch information
jorgecarleitao committed Jul 2, 2022
1 parent 2a15331 commit c6f5371
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 73 deletions.
3 changes: 2 additions & 1 deletion integration-testing/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,11 @@ cargo run --bin flight-test-integration-client -- --host localhost --port 3333 -
to run an integration test against a file, use

```bash
FILE="../testing/arrow-testing/data/arrow-ipc-stream/integration/1.0.0-littleendian/generated_primitive.json.gz"
FILE="../testing/arrow-testing/data/arrow-ipc-stream/integration/1.0.0-littleendian/generated_dictionary.json.gz"
gzip -dc $FILE > generated.json

cargo build --bin flight-test-integration-server
cargo run --bin flight-test-integration-server -- --port 3333 &
cargo run --bin flight-test-integration-client -- --host localhost --port 3333 --path generated.json
# kill with `fg` and stop process
```
122 changes: 51 additions & 71 deletions integration-testing/src/flight_client_scenarios/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use arrow_format::{
},
ipc::planus::ReadAsRoot,
};
use futures::{channel::mpsc, sink::SinkExt, stream, StreamExt};
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
use tonic::{Request, Streaming};

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
Expand All @@ -59,6 +59,7 @@ pub async fn run_scenario(host: &str, port: u16, path: &str) -> Result {
fields,
..
} = read_json_file(path)?;

let ipc_schema = IpcSchema {
fields,
is_little_endian: true,
Expand All @@ -71,9 +72,9 @@ pub async fn run_scenario(host: &str, port: u16, path: &str) -> Result {
upload_data(
client.clone(),
&schema,
&ipc_schema.fields,
ipc_schema.fields.clone(),
descriptor.clone(),
&chunks,
chunks.clone(),
)
.await?;
verify_data(client, descriptor, &schema, &ipc_schema, &chunks).await?;
Expand All @@ -84,77 +85,56 @@ pub async fn run_scenario(host: &str, port: u16, path: &str) -> Result {
async fn upload_data(
mut client: Client,
schema: &Schema,
fields: &[IpcField],
fields: Vec<IpcField>,
descriptor: FlightDescriptor,
chunks: &[ChunkBox],
chunks: Vec<ChunkBox>,
) -> Result {
let (mut upload_tx, upload_rx) = mpsc::channel(10);

let options = write::WriteOptions { compression: None };

let mut schema = flight::serialize_schema(schema, Some(fields));
schema.flight_descriptor = Some(descriptor.clone());
upload_tx.send(schema).await?;

let mut original_data_iter = chunks.iter().enumerate();
let stream = new_stream(schema, fields, descriptor, chunks);

if let Some((counter, first_chunk)) = original_data_iter.next() {
let metadata = counter.to_string().into_bytes();
// Preload the first chunk into the channel before starting the request
send_chunk(&mut upload_tx, &metadata, first_chunk, fields, &options).await?;

let outer = client.do_put(Request::new(upload_rx)).await?;
let mut inner = outer.into_inner();
// put the stream in the client
let responses = client.do_put(Request::new(stream)).await?.into_inner();

let r = inner
.next()
.await
.expect("No response received")
.expect("Invalid response received");
assert_eq!(metadata, r.app_metadata);

// Stream the rest of the chunkes
for (counter, chunk) in original_data_iter {
let metadata = counter.to_string().into_bytes();
send_chunk(&mut upload_tx, &metadata, chunk, fields, &options).await?;

let r = inner
.next()
.await
.expect("No response received")
.expect("Invalid response received");
assert_eq!(metadata, r.app_metadata);
}
drop(upload_tx);
assert!(
inner.next().await.is_none(),
"Should not receive more results"
);
} else {
drop(upload_tx);
client.do_put(Request::new(upload_rx)).await?;
}
// confirm that all chunks were received in the right order
let results = responses.try_collect::<Vec<_>>().await?;
assert!(results
.into_iter()
// only record batches have a metadata; ignore dictionary batches
.filter(|r| !r.app_metadata.is_empty())
.enumerate()
.all(|(counter, r)| r.app_metadata == counter.to_string().as_bytes()));

Ok(())
}

async fn send_chunk(
upload_tx: &mut mpsc::Sender<FlightData>,
metadata: &[u8],
chunk: &ChunkBox,
fields: &[IpcField],
options: &write::WriteOptions,
) -> Result {
let (dictionary_flight_data, mut chunk_flight_data) = serialize_batch(chunk, fields, options)?;
fn new_stream(
schema: &Schema,
fields: Vec<IpcField>,
descriptor: FlightDescriptor,
chunks: Vec<ChunkBox>,
) -> BoxStream<'static, FlightData> {
let options = write::WriteOptions { compression: None };

upload_tx
.send_all(&mut stream::iter(dictionary_flight_data).map(Ok))
.await?;
let mut schema = flight::serialize_schema(schema, Some(&fields));
schema.flight_descriptor = Some(descriptor);

// Only the record chunk's FlightData gets app_metadata
chunk_flight_data.app_metadata = metadata.to_vec();
upload_tx.send(chunk_flight_data).await?;
Ok(())
// iterator of [dictionaries0, chunk0, dictionaries1, chunk1, ...]
let iter = chunks
.into_iter()
.enumerate()
.flat_map(move |(counter, chunk)| {
let metadata = counter.to_string().into_bytes();
let (mut dictionaries, mut chunk) = serialize_batch(&chunk, &fields, &options).unwrap();

// assign `app_metadata` to chunks
chunk.app_metadata = metadata.to_vec();
dictionaries.push(chunk);
dictionaries
});

// the stream as per flight spec: the schema followed by stream of chunks
futures::stream::once(futures::future::ready(schema))
.chain(futures::stream::iter(iter))
.boxed()
}

async fn verify_data(
Expand Down Expand Up @@ -209,16 +189,16 @@ async fn consume_flight_location(

let mut client = FlightServiceClient::connect(location.uri).await?;
let resp = client.do_get(ticket).await?;
let mut resp = resp.into_inner();
let mut stream = resp.into_inner();

// We already have the schema from the FlightInfo, but the server sends it again as the
// first FlightData. Ignore this one.
let _schema_again = resp.next().await.unwrap();
let _schema_again = stream.next().await.unwrap();

let mut dictionaries = Default::default();

for (counter, expected_chunk) in expected_chunks.iter().enumerate() {
let data = read_dictionaries(&mut resp, &schema.fields, ipc_schema, &mut dictionaries)
let data = read_dictionaries(&mut stream, &schema.fields, ipc_schema, &mut dictionaries)
.await
.unwrap_or_else(|| {
panic!(
Expand All @@ -238,7 +218,7 @@ async fn consume_flight_location(
}

assert!(
resp.next().await.is_none(),
stream.next().await.is_none(),
"Got more chunkes than the expected: {}",
expected_chunks.len(),
);
Expand All @@ -247,12 +227,12 @@ async fn consume_flight_location(
}

async fn read_dictionaries(
resp: &mut Streaming<FlightData>,
stream: &mut Streaming<FlightData>,
fields: &[Field],
ipc_schema: &IpcSchema,
dictionaries: &mut Dictionaries,
) -> Option<FlightData> {
let mut data = resp.next().await?.ok()?;
let mut data = stream.next().await?.ok()?;
let mut message =
ipc::MessageRef::read_as_root(&data.data_header).expect("Error parsing first message");

Expand All @@ -275,7 +255,7 @@ async fn read_dictionaries(
)
.expect("Error reading dictionary");

data = resp.next().await?.ok()?;
data = stream.next().await?.ok()?;
message = ipc::MessageRef::read_as_root(&data.data_header).expect("Error parsing message");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ pub async fn run_scenario(host: &str, port: u16) -> Result {
Ok(())
}

#[allow(clippy::unnecessary_wraps)]
fn middleware_interceptor(mut req: Request<()>) -> Result<Request<()>, Status> {
let metadata = req.metadata_mut();
metadata.insert("x-middleware", "expected value".parse().unwrap());
Expand Down

0 comments on commit c6f5371

Please sign in to comment.