Skip to content
This repository has been archived by the owner on Feb 18, 2024. It is now read-only.

Commit

Permalink
Bumped prost and tonic (#550)
Browse files Browse the repository at this point in the history
  • Loading branch information
Chojan Shang authored Oct 29, 2021
1 parent 326fc9d commit ce30fb3
Show file tree
Hide file tree
Showing 11 changed files with 53 additions and 79 deletions.
6 changes: 4 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ jobs:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
toolchain: nightly-2021-07-09
toolchain: nightly-2021-10-24
override: true
- uses: Swatinem/rust-cache@v1
- name: Install Miri
Expand All @@ -93,7 +93,7 @@ jobs:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
toolchain: nightly-2021-07-09
toolchain: nightly-2021-10-24
override: true
- uses: Swatinem/rust-cache@v1
- name: Install Miri
Expand All @@ -112,6 +112,8 @@ jobs:
- uses: actions/checkout@v2
with:
submodules: true
- name: Install Rust
run: rustup update stable
- name: Setup parquet files
run: |
apt update && apt install python3-pip python3-venv -y -q
Expand Down
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ bench = false
[dependencies]
num-traits = "0.2"
chrono = { version = "0.4", default_features = false, features = ["std"] }
chrono-tz = { version = "0.5", optional = true }
chrono-tz = { version = "0.6", optional = true }
# To efficiently cast numbers to strings
lexical-core = { version = "0.8", optional = true }
# We need to Hash values before sending them to an hasher. This
Expand All @@ -39,7 +39,7 @@ indexmap = { version = "^1.6", optional = true }
# used to print columns in a nice columnar format
comfy-table = { version = "4.0", optional = true, default-features = false }

arrow-format = { version = "*", optional = true, features = ["ipc"] }
arrow-format = { version = "0.3.0", optional = true, features = ["ipc"] }

hex = { version = "^0.4", optional = true }

Expand Down
8 changes: 4 additions & 4 deletions integration-testing/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,15 @@ logging = ["tracing-subscriber"]

[dependencies]
arrow2 = { path = "../", features = ["io_ipc", "io_ipc_compression", "io_flight", "io_json_integration"] }
arrow-format = { version = "*", features = ["ipc", "flight-service"] }
arrow-format = { version = "0.3.0", features = ["full"] }
async-trait = "0.1.41"
clap = "2.33"
futures = "0.3"
hex = "0.4"
prost = "0.8"
prost = "0.9"
serde = { version = "1.0", features = ["rc"] }
serde_derive = "1.0"
serde_json = { version = "1.0", features = ["preserve_order"] }
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread"] }
tonic = "0.5.2"
tracing-subscriber = { version = "0.2.15", optional = true }
tonic = "0.6.0"
tracing-subscriber = { version = "0.3.1", optional = true }
7 changes: 2 additions & 5 deletions integration-testing/src/bin/flight-test-integration-client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ async fn main() -> Result {
let port = matches.value_of("port").expect("Port is required");

match matches.value_of("scenario") {
Some("middleware") => {
flight_client_scenarios::middleware::run_scenario(host, port).await?
}
Some("middleware") => flight_client_scenarios::middleware::run_scenario(host, port).await?,
Some("auth:basic_proto") => {
flight_client_scenarios::auth_basic_proto::run_scenario(host, port).await?
}
Expand All @@ -53,8 +51,7 @@ async fn main() -> Result {
let path = matches
.value_of("path")
.expect("Path is required if scenario is not specified");
flight_client_scenarios::integration_test::run_scenario(host, port, path)
.await?;
flight_client_scenarios::integration_test::run_scenario(host, port, path).await?;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ async fn main() -> Result {
let port = matches.value_of("port").unwrap_or("0");

match matches.value_of("scenario") {
Some("middleware") => {
flight_server_scenarios::middleware::scenario_setup(port).await?
}
Some("middleware") => flight_server_scenarios::middleware::scenario_setup(port).await?,
Some("auth:basic_proto") => {
flight_server_scenarios::auth_basic_proto::scenario_setup(port).await?
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,8 @@

use crate::{AUTH_PASSWORD, AUTH_USERNAME};

use arrow_format::flight::data::{Action, HandshakeRequest, BasicAuth};
use arrow_format::flight::service::{
flight_service_client::FlightServiceClient,
};
use arrow_format::flight::data::{Action, BasicAuth, HandshakeRequest};
use arrow_format::flight::service::flight_service_client::FlightServiceClient;
use futures::{stream, StreamExt};
use prost::Message;
use tonic::{metadata::MetadataValue, Request, Status};
Expand Down Expand Up @@ -81,11 +79,7 @@ pub async fn run_scenario(host: &str, port: &str) -> Result {
Ok(())
}

async fn authenticate(
client: &mut Client,
username: &str,
password: &str,
) -> Result<String> {
async fn authenticate(client: &mut Client, username: &str, password: &str) -> Result<String> {
let auth = BasicAuth {
username: username.into(),
password: password.into(),
Expand Down
26 changes: 15 additions & 11 deletions integration-testing/src/flight_client_scenarios/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@

use crate::{read_json_file, ArrowFile};

use arrow2::{array::*, datatypes::*, io::flight::{self, deserialize_batch, serialize_batch}, io::ipc::{read, write}, record_batch::RecordBatch};
use arrow_format::ipc;
use arrow_format::ipc::Message::MessageHeader;
use arrow2::{
array::*,
datatypes::*,
io::flight::{self, deserialize_batch, serialize_batch},
io::ipc::{read, write},
record_batch::RecordBatch,
};
use arrow_format::flight::data::{
flight_descriptor::DescriptorType,
FlightData, FlightDescriptor, Location, Ticket,
flight_descriptor::DescriptorType, FlightData, FlightDescriptor, Location, Ticket,
};
use arrow_format::flight::service::flight_service_client::FlightServiceClient;
use arrow_format::ipc;
use arrow_format::ipc::Message::MessageHeader;
use futures::{channel::mpsc, sink::SinkExt, stream, StreamExt};
use tonic::{Request, Streaming};

Expand Down Expand Up @@ -126,8 +131,7 @@ async fn send_batch(
batch: &RecordBatch,
options: &write::IpcWriteOptions,
) -> Result {
let (dictionary_flight_data, mut batch_flight_data) =
serialize_batch(batch, options);
let (dictionary_flight_data, mut batch_flight_data) = serialize_batch(batch, options);

upload_tx
.send_all(&mut stream::iter(dictionary_flight_data).map(Ok))
Expand Down Expand Up @@ -210,9 +214,8 @@ async fn consume_flight_location(
let metadata = counter.to_string().into_bytes();
assert_eq!(metadata, data.app_metadata);

let actual_batch =
deserialize_batch(&data, schema.clone(), true, &dictionaries_by_field)
.expect("Unable to convert flight data to Arrow batch");
let actual_batch = deserialize_batch(&data, schema.clone(), true, &dictionaries_by_field)
.expect("Unable to convert flight data to Arrow batch");

assert_eq!(expected_batch.schema(), actual_batch.schema());
assert_eq!(expected_batch.num_columns(), actual_batch.num_columns());
Expand Down Expand Up @@ -262,7 +265,8 @@ async fn receive_batch_flight_data(
.expect("Error reading dictionary");

data = resp.next().await?.ok()?;
message = ipc::Message::root_as_message(&data.data_header[..]).expect("Error parsing message");
message =
ipc::Message::root_as_message(&data.data_header[..]).expect("Error parsing message");
}

Some(data)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
// specific language governing permissions and limitations
// under the License.

use arrow_format::flight::service::{
flight_service_client::FlightServiceClient,
};
use arrow_format::flight::data::{flight_descriptor::DescriptorType, FlightDescriptor};
use arrow_format::flight::service::flight_service_client::FlightServiceClient;
use tonic::{Request, Status};

type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
Expand Down
30 changes: 11 additions & 19 deletions integration-testing/src/flight_server_scenarios/auth_basic_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@
use std::pin::Pin;
use std::sync::Arc;

use arrow_format::flight::service::flight_service_server::{FlightService, FlightServiceServer};
use arrow_format::flight::data::*;
use arrow_format::flight::service::flight_service_server::{FlightService, FlightServiceServer};
use futures::{channel::mpsc, sink::SinkExt, Stream, StreamExt};
use tokio::sync::Mutex;
use tonic::{
metadata::MetadataMap, transport::Server, Request, Response, Status, Streaming,
};
use tonic::{metadata::MetadataMap, transport::Server, Request, Response, Status, Streaming};

type TonicStream<T> = Pin<Box<dyn Stream<Item = T> + Send + Sync + 'static>>;

Expand Down Expand Up @@ -60,21 +58,15 @@ pub struct AuthBasicProtoScenarioImpl {
}

impl AuthBasicProtoScenarioImpl {
async fn check_auth(
&self,
metadata: &MetadataMap,
) -> Result<GrpcServerCallContext, Status> {
async fn check_auth(&self, metadata: &MetadataMap) -> Result<GrpcServerCallContext, Status> {
let token = metadata
.get_bin("auth-token-bin")
.and_then(|v| v.to_bytes().ok())
.and_then(|b| String::from_utf8(b.to_vec()).ok());
self.is_valid(token).await
}

async fn is_valid(
&self,
token: Option<String>,
) -> Result<GrpcServerCallContext, Status> {
async fn is_valid(&self, token: Option<String>) -> Result<GrpcServerCallContext, Status> {
match token {
Some(t) if t == *self.username => Ok(GrpcServerCallContext {
peer_identity: self.username.to_string(),
Expand Down Expand Up @@ -139,12 +131,10 @@ impl FlightService for AuthBasicProtoScenarioImpl {
let req = req.expect("Error reading handshake request");
let HandshakeRequest { payload, .. } = req;

let auth = BasicAuth::decode(&*payload)
.expect("Error parsing handshake request");
let auth =
BasicAuth::decode(&*payload).expect("Error parsing handshake request");

let resp = if *auth.username == *username
&& *auth.password == *password
{
let resp = if *auth.username == *username && *auth.password == *password {
Ok(HandshakeResponse {
payload: username.as_bytes().to_vec(),
..HandshakeResponse::default()
Expand Down Expand Up @@ -189,7 +179,8 @@ impl FlightService for AuthBasicProtoScenarioImpl {
&self,
request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoPutStream>, Status> {
self.check_auth(request.metadata()).await?;
let metadata = request.metadata();
self.check_auth(metadata).await?;
Err(Status::unimplemented("Not yet implemented"))
}

Expand Down Expand Up @@ -217,7 +208,8 @@ impl FlightService for AuthBasicProtoScenarioImpl {
&self,
request: Request<Streaming<FlightData>>,
) -> Result<Response<Self::DoExchangeStream>, Status> {
self.check_auth(request.metadata()).await?;
let metadata = request.metadata();
self.check_auth(metadata).await?;
Err(Status::unimplemented("Not yet implemented"))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,23 +16,20 @@
// under the License.

use std::collections::HashMap;
use std::convert::TryFrom;
use std::pin::Pin;
use std::sync::Arc;
use std::convert::TryFrom;

use arrow2::io::flight::{serialize_batch, serialize_schema};
use arrow_format::flight::data::*;
use arrow_format::flight::data::flight_descriptor::*;
use arrow_format::flight::service::flight_service_server::*;
use arrow_format::flight::data::*;
use arrow_format::ipc::Message::{root_as_message, Message, MessageHeader};
use arrow_format::ipc::Schema as ArrowSchema;
use arrow_format::ipc::Message::{Message, MessageHeader, root_as_message};

use arrow2::{
array::Array,
datatypes::*,
array::Array, datatypes::*, io::flight::serialize_schema_to_info, io::ipc,
record_batch::RecordBatch,
io::ipc,
io::flight::serialize_schema_to_info
};

use futures::{channel::mpsc, sink::SinkExt, Stream, StreamExt};
Expand Down Expand Up @@ -113,12 +110,7 @@ impl FlightService for FlightServiceImpl {

let options = ipc::write::IpcWriteOptions::default();

let schema = std::iter::once({
Ok(serialize_schema(
&flight.schema,
&options,
))
});
let schema = std::iter::once(Ok(serialize_schema(&flight.schema, &options)));

let batches = flight
.chunks
Expand Down Expand Up @@ -180,12 +172,10 @@ impl FlightService for FlightServiceImpl {
let total_records: usize = flight.chunks.iter().map(|chunk| chunk.num_rows()).sum();

let options = ipc::write::IpcWriteOptions::default();
let schema =
serialize_schema_to_info(&flight.schema, &options)
.expect(
"Could not generate schema bytes from schema stored by a DoPut; \
let schema = serialize_schema_to_info(&flight.schema, &options).expect(
"Could not generate schema bytes from schema stored by a DoPut; \
this should be impossible",
);
);

let info = FlightInfo {
schema,
Expand Down
5 changes: 2 additions & 3 deletions integration-testing/src/flight_server_scenarios/middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

use std::pin::Pin;

use arrow_format::flight::data::*;
use arrow_format::flight::data::flight_descriptor::DescriptorType;
use arrow_format::flight::service::flight_service_server::{FlightService, FlightServiceServer};
use arrow_format::flight::data::*;
use futures::Stream;
use tonic::{transport::Server, Request, Response, Status, Streaming};

Expand Down Expand Up @@ -90,8 +90,7 @@ impl FlightService for MiddlewareScenarioImpl {

let descriptor = request.into_inner();

if descriptor.r#type == DescriptorType::Cmd as i32 && descriptor.cmd == b"success"
{
if descriptor.r#type == DescriptorType::Cmd as i32 && descriptor.cmd == b"success" {
// Return a fake location - the test doesn't read it
let endpoint = super::endpoint("foo", "grpc+tcp://localhost:10010");

Expand Down

0 comments on commit ce30fb3

Please sign in to comment.