Skip to content

Commit

Permalink
Complete mid-level FlightClient (#3402)
Browse files Browse the repository at this point in the history
* Implement `FlightClient::do_put` and `FlightClient::do_exchange`

* Implement ArrowClient::{list_flights, list_actions, do_action, get_schema}

* Apply suggestions from code review

Co-authored-by: Liang-Chi Hsieh <[email protected]>
Co-authored-by: Raphael Taylor-Davies <[email protected]>

* remove outdated comment

* make foo/bar placeholders in test more specific

* simplify tests

Co-authored-by: Liang-Chi Hsieh <[email protected]>
Co-authored-by: Raphael Taylor-Davies <[email protected]>
  • Loading branch information
3 people authored Jan 5, 2023
1 parent 81abc1a commit 42ffc3f
Show file tree
Hide file tree
Showing 4 changed files with 638 additions and 73 deletions.
303 changes: 281 additions & 22 deletions arrow-flight/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,17 @@
// under the License.

use crate::{
decode::FlightRecordBatchStream, flight_service_client::FlightServiceClient,
FlightDescriptor, FlightInfo, HandshakeRequest, Ticket,
decode::FlightRecordBatchStream, flight_service_client::FlightServiceClient, Action,
ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightInfo,
HandshakeRequest, PutResult, Ticket,
};
use arrow_schema::Schema;
use bytes::Bytes;
use futures::{future::ready, stream, StreamExt, TryStreamExt};
use futures::{
future::ready,
stream::{self, BoxStream},
Stream, StreamExt, TryStreamExt,
};
use tonic::{metadata::MetadataMap, transport::Channel};

use crate::error::{FlightError, Result};
Expand Down Expand Up @@ -160,19 +166,20 @@ impl FlightClient {
/// returning a [`FlightRecordBatchStream`] for reading
/// [`RecordBatch`](arrow_array::RecordBatch)es.
///
/// # Note
///
/// To access the returned [`FlightData`] use
/// [`FlightRecordBatchStream::into_inner()`]
///
/// # Example:
/// ```no_run
/// # async fn run() {
/// # use bytes::Bytes;
/// # use arrow_flight::FlightClient;
/// # use arrow_flight::Ticket;
/// # use arrow_array::RecordBatch;
/// # use tonic::transport::Channel;
/// # use futures::stream::TryStreamExt;
/// # let channel = Channel::from_static("http://localhost:1234")
/// # .connect()
/// # .await
/// # .expect("error connecting");
/// # let channel: tonic::transport::Channel = unimplemented!();
/// # let ticket = Ticket { ticket: Bytes::from("foo") };
/// let mut client = FlightClient::new(channel);
///
Expand All @@ -199,8 +206,7 @@ impl FlightClient {
.do_get(request)
.await?
.into_inner()
// convert to FlightError
.map_err(|e| e.into());
.map_err(FlightError::Tonic);

Ok(FlightRecordBatchStream::new_from_flight_data(
response_stream,
Expand All @@ -217,11 +223,7 @@ impl FlightClient {
/// # async fn run() {
/// # use arrow_flight::FlightClient;
/// # use arrow_flight::FlightDescriptor;
/// # use tonic::transport::Channel;
/// # let channel = Channel::from_static("http://localhost:1234")
/// # .connect()
/// # .await
/// # .expect("error connecting");
/// # let channel: tonic::transport::Channel = unimplemented!();
/// let mut client = FlightClient::new(channel);
///
/// // Send a 'CMD' request to the server
Expand Down Expand Up @@ -256,13 +258,270 @@ impl FlightClient {
Ok(response)
}

// TODO other methods
// list_flights
// get_schema
// do_put
// do_action
// list_actions
// do_exchange
/// Make a `DoPut` call to the server with the provided
/// [`Stream`](futures::Stream) of [`FlightData`] and returning a
/// stream of [`PutResult`].
///
/// # Example:
/// ```no_run
/// # async fn run() {
/// # use futures::{TryStreamExt, StreamExt};
/// # use std::sync::Arc;
/// # use arrow_array::UInt64Array;
/// # use arrow_array::RecordBatch;
/// # use arrow_flight::{FlightClient, FlightDescriptor, PutResult};
/// # use arrow_flight::encode::FlightDataEncoderBuilder;
/// # let batch = RecordBatch::try_from_iter(vec![
/// # ("col2", Arc::new(UInt64Array::from_iter([10, 23, 33])) as _)
/// # ]).unwrap();
/// # let channel: tonic::transport::Channel = unimplemented!();
/// let mut client = FlightClient::new(channel);
///
/// // encode the batch as a stream of `FlightData`
/// let flight_data_stream = FlightDataEncoderBuilder::new()
/// .build(futures::stream::iter(vec![Ok(batch)]))
/// // data encoder return Results, but do_put requires FlightData
/// .map(|batch|batch.unwrap());
///
/// // send the stream and get the results as `PutResult`
/// let response: Vec<PutResult>= client
/// .do_put(flight_data_stream)
/// .await
/// .unwrap()
/// .try_collect() // use TryStreamExt to collect stream
/// .await
/// .expect("error calling do_put");
/// # }
/// ```
pub async fn do_put<S: Stream<Item = FlightData> + Send + 'static>(
&mut self,
request: S,
) -> Result<BoxStream<'static, Result<PutResult>>> {
let request = self.make_request(request);

let response = self
.inner
.do_put(request)
.await?
.into_inner()
.map_err(FlightError::Tonic);

Ok(response.boxed())
}

/// Make a `DoExchange` call to the server with the provided
/// [`Stream`](futures::Stream) of [`FlightData`] and returning a
/// stream of [`FlightData`].
///
/// # Example:
/// ```no_run
/// # async fn run() {
/// # use futures::{TryStreamExt, StreamExt};
/// # use std::sync::Arc;
/// # use arrow_array::UInt64Array;
/// # use arrow_array::RecordBatch;
/// # use arrow_flight::{FlightClient, FlightDescriptor, PutResult};
/// # use arrow_flight::encode::FlightDataEncoderBuilder;
/// # let batch = RecordBatch::try_from_iter(vec![
/// # ("col2", Arc::new(UInt64Array::from_iter([10, 23, 33])) as _)
/// # ]).unwrap();
/// # let channel: tonic::transport::Channel = unimplemented!();
/// let mut client = FlightClient::new(channel);
///
/// // encode the batch as a stream of `FlightData`
/// let flight_data_stream = FlightDataEncoderBuilder::new()
/// .build(futures::stream::iter(vec![Ok(batch)]))
/// // data encoder return Results, but do_exchange requires FlightData
/// .map(|batch|batch.unwrap());
///
/// // send the stream and get the results as `RecordBatches`
/// let response: Vec<RecordBatch> = client
/// .do_exchange(flight_data_stream)
/// .await
/// .unwrap()
/// .try_collect() // use TryStreamExt to collect stream
/// .await
/// .expect("error calling do_exchange");
/// # }
/// ```
pub async fn do_exchange<S: Stream<Item = FlightData> + Send + 'static>(
&mut self,
request: S,
) -> Result<FlightRecordBatchStream> {
let request = self.make_request(request);

let response = self
.inner
.do_exchange(request)
.await?
.into_inner()
.map_err(FlightError::Tonic);

Ok(FlightRecordBatchStream::new_from_flight_data(response))
}

/// Make a `ListFlights` call to the server with the provided
/// critera and returning a [`Stream`](futures::Stream) of [`FlightInfo`].
///
/// # Example:
/// ```no_run
/// # async fn run() {
/// # use futures::TryStreamExt;
/// # use bytes::Bytes;
/// # use arrow_flight::{FlightInfo, FlightClient};
/// # let channel: tonic::transport::Channel = unimplemented!();
/// let mut client = FlightClient::new(channel);
///
/// // Send 'Name=Foo' bytes as the "expression" to the server
/// // and gather the returned FlightInfo
/// let responses: Vec<FlightInfo> = client
/// .list_flights(Bytes::from("Name=Foo"))
/// .await
/// .expect("error listing flights")
/// .try_collect() // use TryStreamExt to collect stream
/// .await
/// .expect("error gathering flights");
/// # }
/// ```
pub async fn list_flights(
&mut self,
expression: impl Into<Bytes>,
) -> Result<BoxStream<'static, Result<FlightInfo>>> {
let request = Criteria {
expression: expression.into(),
};

let request = self.make_request(request);

let response = self
.inner
.list_flights(request)
.await?
.into_inner()
.map_err(FlightError::Tonic);

Ok(response.boxed())
}

/// Make a `GetSchema` call to the server with the provided
/// [`FlightDescriptor`] and returning the associated [`Schema`].
///
/// # Example:
/// ```no_run
/// # async fn run() {
/// # use bytes::Bytes;
/// # use arrow_flight::{FlightDescriptor, FlightClient};
/// # use arrow_schema::Schema;
/// # let channel: tonic::transport::Channel = unimplemented!();
/// let mut client = FlightClient::new(channel);
///
/// // Request the schema result of a 'CMD' request to the server
/// let request = FlightDescriptor::new_cmd(b"MOAR DATA".to_vec());
///
/// let schema: Schema = client
/// .get_schema(request)
/// .await
/// .expect("error making request");
/// # }
/// ```
pub async fn get_schema(
&mut self,
flight_descriptor: FlightDescriptor,
) -> Result<Schema> {
let request = self.make_request(flight_descriptor);

let schema_result = self.inner.get_schema(request).await?.into_inner();

// attempt decode from IPC
let schema: Schema = schema_result.try_into()?;

Ok(schema)
}

/// Make a `ListActions` call to the server and returning a
/// [`Stream`](futures::Stream) of [`ActionType`].
///
/// # Example:
/// ```no_run
/// # async fn run() {
/// # use futures::TryStreamExt;
/// # use arrow_flight::{ActionType, FlightClient};
/// # use arrow_schema::Schema;
/// # let channel: tonic::transport::Channel = unimplemented!();
/// let mut client = FlightClient::new(channel);
///
/// // List available actions on the server:
/// let actions: Vec<ActionType> = client
/// .list_actions()
/// .await
/// .expect("error listing actions")
/// .try_collect() // use TryStreamExt to collect stream
/// .await
/// .expect("error gathering actions");
/// # }
/// ```
pub async fn list_actions(
&mut self,
) -> Result<BoxStream<'static, Result<ActionType>>> {
let request = self.make_request(Empty {});

let action_stream = self
.inner
.list_actions(request)
.await?
.into_inner()
.map_err(FlightError::Tonic);

Ok(action_stream.boxed())
}

/// Make a `DoAction` call to the server and returning a
/// [`Stream`](futures::Stream) of opaque [`Bytes`].
///
/// # Example:
/// ```no_run
/// # async fn run() {
/// # use bytes::Bytes;
/// # use futures::TryStreamExt;
/// # use arrow_flight::{Action, FlightClient};
/// # use arrow_schema::Schema;
/// # let channel: tonic::transport::Channel = unimplemented!();
/// let mut client = FlightClient::new(channel);
///
/// let request = Action::new("my_action", "the body");
///
/// // Make a request to run the action on the server
/// let results: Vec<Bytes> = client
/// .do_action(request)
/// .await
/// .expect("error executing acton")
/// .try_collect() // use TryStreamExt to collect stream
/// .await
/// .expect("error gathering action results");
/// # }
/// ```
pub async fn do_action(
&mut self,
action: Action,
) -> Result<BoxStream<'static, Result<Bytes>>> {
let request = self.make_request(action);

let result_stream = self
.inner
.do_action(request)
.await?
.into_inner()
.map_err(FlightError::Tonic)
.map(|r| {
r.map(|r| {
// unwrap inner bytes
let crate::Result { body } = r;
body
})
});

Ok(result_stream.boxed())
}

/// return a Request, adding any configured metadata
fn make_request<T>(&self, t: T) -> tonic::Request<T> {
Expand Down
Loading

0 comments on commit 42ffc3f

Please sign in to comment.