Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Bumped Prost and Tonic #550

Merged
merged 4 commits into from
Oct 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ jobs:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
toolchain: nightly-2021-07-09
toolchain: nightly-2021-10-24
override: true
- uses: Swatinem/rust-cache@v1
- name: Install Miri
Expand All @@ -93,7 +93,7 @@ jobs:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
toolchain: nightly-2021-07-09
toolchain: nightly-2021-10-24
override: true
- uses: Swatinem/rust-cache@v1
- name: Install Miri
Expand All @@ -112,6 +112,8 @@ jobs:
- uses: actions/checkout@v2
with:
submodules: true
- name: Install Rust
run: rustup update stable
- name: Setup parquet files
run: |
apt update && apt install python3-pip python3-venv -y -q
Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ bench = false
[dependencies]
num-traits = "0.2"
chrono = { version = "0.4", default_features = false, features = ["std"] }
chrono-tz = { version = "0.5", optional = true }
chrono-tz = { version = "0.6", optional = true }
# To efficiently cast numbers to strings
lexical-core = { version = "0.8", optional = true }
# We need to Hash values before sending them to an hasher. This
Expand All @@ -39,7 +39,7 @@ indexmap = { version = "^1.6", optional = true }
# used to print columns in a nice columnar format
comfy-table = { version = "4.0", optional = true, default-features = false }

arrow-format = { version = "*", optional = true, features = ["ipc"] }
arrow-format = { version = "0.3.0", optional = true, features = ["ipc"] }

hex = { version = "^0.4", optional = true }

Expand Down
8 changes: 4 additions & 4 deletions integration-testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ logging = ["tracing-subscriber"]

[dependencies]
arrow2 = { path = "../", features = ["io_ipc", "io_ipc_compression", "io_flight", "io_json_integration"] }
arrow-format = { version = "*", features = ["ipc", "flight-service"] }
arrow-format = { version = "0.3.0", features = ["full"] }
async-trait = "0.1.41"
clap = "2.33"
futures = "0.3"
hex = "0.4"
prost = "0.8"
prost = "0.9"
serde = { version = "1.0", features = ["rc"] }
serde_derive = "1.0"
serde_json = { version = "1.0", features = ["preserve_order"] }
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] }
tonic = "0.5.2"
tracing-subscriber = { version = "0.2.15", optional = true }
tonic = "0.6.0"
tracing-subscriber = { version = "0.3.1", optional = true }
7 changes: 2 additions & 5 deletions integration-testing/src/bin/flight-test-integration-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ async fn main() -> Result {
let port = matches.value_of("port").expect("Port is required");

match matches.value_of("scenario") {
Some("middleware") => {
flight_client_scenarios::middleware::run_scenario(host, port).await?
}
Some("middleware") => flight_client_scenarios::middleware::run_scenario(host, port).await?,
Some("auth:basic_proto") => {
flight_client_scenarios::auth_basic_proto::run_scenario(host, port).await?
}
Expand All @@ -53,8 +51,7 @@ async fn main() -> Result {
let path = matches
.value_of("path")
.expect("Path is required if scenario is not specified");
flight_client_scenarios::integration_test::run_scenario(host, port, path)
.await?;
flight_client_scenarios::integration_test::run_scenario(host, port, path).await?;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ async fn main() -> Result {
let port = matches.value_of("port").unwrap_or("0");

match matches.value_of("scenario") {
Some("middleware") => {
flight_server_scenarios::middleware::scenario_setup(port).await?
}
Some("middleware") => flight_server_scenarios::middleware::scenario_setup(port).await?,
Some("auth:basic_proto") => {
flight_server_scenarios::auth_basic_proto::scenario_setup(port).await?
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@

use crate::{AUTH_PASSWORD, AUTH_USERNAME};

use arrow_format::flight::data::{Action, HandshakeRequest, BasicAuth};
use arrow_format::flight::service::{
flight_service_client::FlightServiceClient,
};
use arrow_format::flight::data::{Action, BasicAuth, HandshakeRequest};
use arrow_format::flight::service::flight_service_client::FlightServiceClient;
use futures::{stream, StreamExt};
use prost::Message;
use tonic::{metadata::MetadataValue, Request, Status};
Expand Down Expand Up @@ -81,11 +79,7 @@ pub async fn run_scenario(host: &str, port: &str) -> Result {
Ok(())
}

async fn authenticate(
client: &mut Client,
username: &str,
password: &str,
) -> Result<String> {
async fn authenticate(client: &mut Client, username: &str, password: &str) -> Result<String> {
let auth = BasicAuth {
username: username.into(),
password: password.into(),
Expand Down
26 changes: 15 additions & 11 deletions integration-testing/src/flight_client_scenarios/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@

use crate::{read_json_file, ArrowFile};

use arrow2::{array::*, datatypes::*, io::flight::{self, deserialize_batch, serialize_batch}, io::ipc::{read, write}, record_batch::RecordBatch};
use arrow_format::ipc;
use arrow_format::ipc::Message::MessageHeader;
use arrow2::{
array::*,
datatypes::*,
io::flight::{self, deserialize_batch, serialize_batch},
io::ipc::{read, write},
record_batch::RecordBatch,
};
use arrow_format::flight::data::{
flight_descriptor::DescriptorType,
FlightData, FlightDescriptor, Location, Ticket,
flight_descriptor::DescriptorType, FlightData, FlightDescriptor, Location, Ticket,
};
use arrow_format::flight::service::flight_service_client::FlightServiceClient;
use arrow_format::ipc;
use arrow_format::ipc::Message::MessageHeader;
use futures::{channel::mpsc, sink::SinkExt, stream, StreamExt};
use tonic::{Request, Streaming};

Expand Down Expand Up @@ -126,8 +131,7 @@ async fn send_batch(
batch: &RecordBatch,
options: &write::IpcWriteOptions,
) -> Result {
let (dictionary_flight_data, mut batch_flight_data) =
serialize_batch(batch, options);
let (dictionary_flight_data, mut batch_flight_data) = serialize_batch(batch, options);

upload_tx
.send_all(&mut stream::iter(dictionary_flight_data).map(Ok))
Expand Down Expand Up @@ -210,9 +214,8 @@ async fn consume_flight_location(
let metadata = counter.to_string().into_bytes();
assert_eq!(metadata, data.app_metadata);

let actual_batch =
deserialize_batch(&data, schema.clone(), true, &dictionaries_by_field)
.expect("Unable to convert flight data to Arrow batch");
let actual_batch = deserialize_batch(&data, schema.clone(), true, &dictionaries_by_field)
.expect("Unable to convert flight data to Arrow batch");

assert_eq!(expected_batch.schema(), actual_batch.schema());
assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
Expand Down Expand Up @@ -262,7 +265,8 @@ async fn receive_batch_flight_data(
.expect("Error reading dictionary");

data = resp.next().await?.ok()?;
message = ipc::Message::root_as_message(&data.data_header[..]).expect("Error parsing message");
message =
ipc::Message::root_as_message(&data.data_header[..]).expect("Error parsing message");
}

Some(data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use arrow_format::flight::service::{
flight_service_client::FlightServiceClient,
};
use arrow_format::flight::data::{flight_descriptor::DescriptorType, FlightDescriptor};
use arrow_format::flight::service::flight_service_client::FlightServiceClient;
use tonic::{Request, Status};

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
Expand Down
30 changes: 11 additions & 19 deletions integration-testing/src/flight_server_scenarios/auth_basic_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@
use std::pin::Pin;
use std::sync::Arc;

use arrow_format::flight::service::flight_service_server::{FlightService, FlightServiceServer};
use arrow_format::flight::data::*;
use arrow_format::flight::service::flight_service_server::{FlightService, FlightServiceServer};
use futures::{channel::mpsc, sink::SinkExt, Stream, StreamExt};
use tokio::sync::Mutex;
use tonic::{
metadata::MetadataMap, transport::Server, Request, Response, Status, Streaming,
};
use tonic::{metadata::MetadataMap, transport::Server, Request, Response, Status, Streaming};

type TonicStream<T> = Pin<Box<dyn Stream<Item = T> + Send + Sync + 'static>>;

Expand Down Expand Up @@ -60,21 +58,15 @@ pub struct AuthBasicProtoScenarioImpl {
}

impl AuthBasicProtoScenarioImpl {
async fn check_auth(
&self,
metadata: &MetadataMap,
) -> Result<GrpcServerCallContext, Status> {
async fn check_auth(&self, metadata: &MetadataMap) -> Result<GrpcServerCallContext, Status> {
let token = metadata
.get_bin("auth-token-bin")
.and_then(|v| v.to_bytes().ok())
.and_then(|b| String::from_utf8(b.to_vec()).ok());
self.is_valid(token).await
}

async fn is_valid(
&self,
token: Option<String>,
) -> Result<GrpcServerCallContext, Status> {
async fn is_valid(&self, token: Option<String>) -> Result<GrpcServerCallContext, Status> {
match token {
Some(t) if t == *self.username => Ok(GrpcServerCallContext {
peer_identity: self.username.to_string(),
Expand Down Expand Up @@ -139,12 +131,10 @@ impl FlightService for AuthBasicProtoScenarioImpl {
let req = req.expect("Error reading handshake request");
let HandshakeRequest { payload, .. } = req;

let auth = BasicAuth::decode(&*payload)
.expect("Error parsing handshake request");
let auth =
BasicAuth::decode(&*payload).expect("Error parsing handshake request");

let resp = if *auth.username == *username
&& *auth.password == *password
{
let resp = if *auth.username == *username && *auth.password == *password {
Ok(HandshakeResponse {
payload: username.as_bytes().to_vec(),
..HandshakeResponse::default()
Expand Down Expand Up @@ -189,7 +179,8 @@ impl FlightService for AuthBasicProtoScenarioImpl {
&self,
request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoPutStream>, Status> {
self.check_auth(request.metadata()).await?;
let metadata = request.metadata();
self.check_auth(metadata).await?;
Err(Status::unimplemented("Not yet implemented"))
}

Expand Down Expand Up @@ -217,7 +208,8 @@ impl FlightService for AuthBasicProtoScenarioImpl {
&self,
request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoExchangeStream>, Status> {
self.check_auth(request.metadata()).await?;
let metadata = request.metadata();
self.check_auth(metadata).await?;
Err(Status::unimplemented("Not yet implemented"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,20 @@
// under the License.

use std::collections::HashMap;
use std::convert::TryFrom;
use std::pin::Pin;
use std::sync::Arc;
use std::convert::TryFrom;

use arrow2::io::flight::{serialize_batch, serialize_schema};
use arrow_format::flight::data::*;
use arrow_format::flight::data::flight_descriptor::*;
use arrow_format::flight::service::flight_service_server::*;
use arrow_format::flight::data::*;
use arrow_format::ipc::Message::{root_as_message, Message, MessageHeader};
use arrow_format::ipc::Schema as ArrowSchema;
use arrow_format::ipc::Message::{Message, MessageHeader, root_as_message};

use arrow2::{
array::Array,
datatypes::*,
array::Array, datatypes::*, io::flight::serialize_schema_to_info, io::ipc,
record_batch::RecordBatch,
io::ipc,
io::flight::serialize_schema_to_info
};

use futures::{channel::mpsc, sink::SinkExt, Stream, StreamExt};
Expand Down Expand Up @@ -113,12 +110,7 @@ impl FlightService for FlightServiceImpl {

let options = ipc::write::IpcWriteOptions::default();

let schema = std::iter::once({
Ok(serialize_schema(
&flight.schema,
&options,
))
});
let schema = std::iter::once(Ok(serialize_schema(&flight.schema, &options)));

let batches = flight
.chunks
Expand Down Expand Up @@ -180,12 +172,10 @@ impl FlightService for FlightServiceImpl {
let total_records: usize = flight.chunks.iter().map(|chunk| chunk.num_rows()).sum();

let options = ipc::write::IpcWriteOptions::default();
let schema =
serialize_schema_to_info(&flight.schema, &options)
.expect(
"Could not generate schema bytes from schema stored by a DoPut; \
let schema = serialize_schema_to_info(&flight.schema, &options).expect(
"Could not generate schema bytes from schema stored by a DoPut; \
this should be impossible",
);
);

let info = FlightInfo {
schema,
Expand Down
5 changes: 2 additions & 3 deletions integration-testing/src/flight_server_scenarios/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

use std::pin::Pin;

use arrow_format::flight::data::*;
use arrow_format::flight::data::flight_descriptor::DescriptorType;
use arrow_format::flight::service::flight_service_server::{FlightService, FlightServiceServer};
use arrow_format::flight::data::*;
use futures::Stream;
use tonic::{transport::Server, Request, Response, Status, Streaming};

Expand Down Expand Up @@ -90,8 +90,7 @@ impl FlightService for MiddlewareScenarioImpl {

let descriptor = request.into_inner();

if descriptor.r#type == DescriptorType::Cmd as i32 && descriptor.cmd == b"success"
{
if descriptor.r#type == DescriptorType::Cmd as i32 && descriptor.cmd == b"success" {
// Return a fake location - the test doesn't read it
let endpoint = super::endpoint("foo", "grpc+tcp://localhost:10010");

Expand Down