Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[client]: improve batch request API #910

Merged
merged 19 commits into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benches/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ pprof = { version = "0.10", features = ["flamegraph", "criterion"] }
criterion = { version = "0.3", features = ["async_tokio", "html_reports"] }
futures-channel = "0.3.15"
futures-util = "0.3.15"
jsonrpsee = { path = "../jsonrpsee", features = ["server"] }
jsonrpsee = { path = "../jsonrpsee", features = ["server", "client"] }
jsonrpsee_v0_15 = { package = "jsonrpsee", version = "=0.15.1", features = ["http-client", "ws-client", "client-ws-transport"] }
jsonrpc-ws-server = { version = "18.0.0", optional = true }
jsonrpc-http-server = { version = "18.0.0", optional = true }
Expand Down
74 changes: 66 additions & 8 deletions benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ criterion_group!(
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))).measurement_time(measurement_time_slow());
targets = SyncBencher::http_benches_slow, SyncBencher::websocket_benches_slow
);
criterion_group!(
/*criterion_group!(
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
name = async_benches;
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None)));
targets = AsyncBencher::http_benches, AsyncBencher::websocket_benches
);
);*/
criterion_group!(
name = async_benches_mid;
config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))).measurement_time(measurement_mid());
Expand All @@ -69,7 +69,7 @@ criterion_main!(
sync_benches,
sync_benches_mid,
sync_benches_slow,
async_benches,
//async_benches,
niklasad1 marked this conversation as resolved.
Show resolved Hide resolved
async_benches_mid,
async_slow_benches,
subscriptions
Expand Down Expand Up @@ -111,7 +111,7 @@ pub fn jsonrpsee_types_v2(crit: &mut Criterion) {
b.iter(|| {
let params = serde_json::value::RawValue::from_string("[1, 2]".to_string()).unwrap();

let request = RequestSer::new(&Id::Number(0), "say_hello", Some(params));
let request = RequestSer::borrowed(&Id::Number(0), &"say_hello", Some(&params));
v2_serialize(request);
})
});
Expand All @@ -124,7 +124,7 @@ pub fn jsonrpsee_types_v2(crit: &mut Criterion) {
builder.insert(1u64).unwrap();
builder.insert(2u32).unwrap();
let params = builder.to_rpc_params().expect("Valid params");
let request = RequestSer::new(&Id::Number(0), "say_hello", params);
let request = RequestSer::borrowed(&Id::Number(0), &"say_hello", params.as_deref());
v2_serialize(request);
})
});
Expand All @@ -134,7 +134,7 @@ pub fn jsonrpsee_types_v2(crit: &mut Criterion) {
b.iter(|| {
let params = serde_json::value::RawValue::from_string(r#"{"key": 1}"#.to_string()).unwrap();

let request = RequestSer::new(&Id::Number(0), "say_hello", Some(params));
let request = RequestSer::borrowed(&Id::Number(0), &"say_hello", Some(&params));
v2_serialize(request);
})
});
Expand All @@ -146,7 +146,7 @@ pub fn jsonrpsee_types_v2(crit: &mut Criterion) {
let mut builder = ObjectParams::new();
builder.insert("key", 1u32).unwrap();
let params = builder.to_rpc_params().expect("Valid params");
let request = RequestSer::new(&Id::Number(0), "say_hello", params);
let request = RequestSer::borrowed(&Id::Number(0), &"say_hello", params.as_deref());
v2_serialize(request);
})
});
Expand All @@ -163,6 +163,9 @@ trait RequestBencher {
http_concurrent_conn_calls(&rt, crit, &url, "http_concurrent_conn_calls", Self::REQUEST_TYPE, &[2, 4, 8]);
round_trip(&rt, crit, client.clone(), "http_round_trip", Self::REQUEST_TYPE);
batch_round_trip(&rt, crit, client, "http_batch_requests", Self::REQUEST_TYPE);

let c = Arc::new(master::http_client(&url));
master::batch_round_trip(&rt, crit, c, "http_batch_requests_new", Self::REQUEST_TYPE);
}

fn http_benches_mid(crit: &mut Criterion) {
Expand Down Expand Up @@ -192,6 +195,9 @@ trait RequestBencher {
ws_concurrent_conn_calls(&rt, crit, &url, "ws_concurrent_conn_calls", Self::REQUEST_TYPE, &[2, 4, 8]);
round_trip(&rt, crit, client.clone(), "ws_round_trip", Self::REQUEST_TYPE);
batch_round_trip(&rt, crit, client, "ws_batch_requests", Self::REQUEST_TYPE);

let c = Arc::new(rt.block_on(master::ws_client(&url)));
master::batch_round_trip(&rt, crit, c, "ws_batch_requests_new", Self::REQUEST_TYPE);
}

fn websocket_benches_mid(crit: &mut Criterion) {
Expand Down Expand Up @@ -302,7 +308,7 @@ fn batch_round_trip(

let bench_name = format!("{}/{}", name, fast_call);
let mut group = crit.benchmark_group(request.group_name(&bench_name));
for batch_size in [2, 5, 10, 50, 100usize].iter() {
for batch_size in [100, 10_000, 100_000].iter() {
let batch = vec![(fast_call, None); *batch_size];

group.throughput(Throughput::Elements(*batch_size as u64));
Expand Down Expand Up @@ -495,3 +501,55 @@ fn ws_custom_headers_handshake(rt: &TokioRuntime, crit: &mut Criterion, url: &st
}
group.finish();
}

pub(crate) mod master {
use super::{Arc, BenchmarkId, Criterion, RequestType, Throughput, TokioRuntime};
use jsonrpsee::core::client::*;
use jsonrpsee::core::params::ArrayParams;
use jsonrpsee::core::params::BatchRequestBuilder;
use jsonrpsee::http_client::*;
use jsonrpsee::ws_client::*;

pub(crate) async fn ws_client(url: &str) -> WsClient {
WsClientBuilder::default()
.max_request_body_size(u32::MAX)
.max_concurrent_requests(1024 * 1024)
.build(url)
.await
.unwrap()
}

pub(crate) fn http_client(url: &str) -> HttpClient {
HttpClientBuilder::default()
.max_request_body_size(u32::MAX)
.max_concurrent_requests(1024 * 1024)
.build(url)
.unwrap()
}

pub(crate) fn batch_round_trip(
rt: &TokioRuntime,
crit: &mut Criterion,
client: Arc<impl ClientT>,
name: &str,
request: RequestType,
) {
let fast_call = request.methods()[0];
assert!(fast_call.starts_with("fast_call"));

let bench_name = format!("{}/{}", name, fast_call);
let mut group = crit.benchmark_group(request.group_name(&bench_name));
for batch_size in [100, 10_000, 100_000].iter() {
let mut batch = BatchRequestBuilder::new();
for _ in 0..*batch_size {
batch.insert(fast_call, ArrayParams::new()).unwrap();
}

group.throughput(Throughput::Elements(*batch_size as u64));
group.bench_with_input(BenchmarkId::from_parameter(batch_size), batch_size, |b, _| {
b.to_async(rt).iter(|| async { client.batch_request::<String>(batch.clone()).await.unwrap() })
});
}
group.finish();
}
}
1 change: 1 addition & 0 deletions client/http-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ tokio = { version = "1.16", features = ["time"] }
tracing = "0.1.34"

[dev-dependencies]
tracing-subscriber = { version = "0.3.3", features = ["env-filter"] }
jsonrpsee-test-utils = { path = "../../test-utils" }
tokio = { version = "1.16", features = ["net", "rt-multi-thread", "macros"] }

Expand Down
98 changes: 63 additions & 35 deletions client/http-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,23 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

use std::borrow::Cow as StdCow;
use std::sync::Arc;
use std::time::Duration;

use crate::transport::HttpTransportClient;
use crate::types::{ErrorResponse, Id, NotificationSer, RequestSer, Response};
use crate::types::{ErrorResponse, NotificationSer, RequestSer, Response};
use async_trait::async_trait;
use hyper::http::HeaderMap;
use jsonrpsee_core::client::{CertificateStore, ClientT, IdKind, RequestIdManager, Subscription, SubscriptionClientT};
use jsonrpsee_core::client::{
generate_batch_id_range, BatchResponse, CertificateStore, ClientT, IdKind, RequestIdManager, Subscription,
SubscriptionClientT,
};
use jsonrpsee_core::params::BatchRequestBuilder;
use jsonrpsee_core::traits::ToRpcParams;
use jsonrpsee_core::{Error, JsonRawValue, TEN_MB_SIZE_BYTES};
use jsonrpsee_types::error::CallError;
use rustc_hash::FxHashMap;
use jsonrpsee_types::{ErrorObject, Id, TwoPointZero};
use serde::de::DeserializeOwned;
use tracing::instrument;

Expand Down Expand Up @@ -173,7 +177,8 @@ impl ClientT for HttpClient {
Params: ToRpcParams + Send,
{
let params = params.to_rpc_params()?;
let notif = serde_json::to_string(&NotificationSer::new(method, params)).map_err(Error::ParseError)?;
let notif =
serde_json::to_string(&NotificationSer::borrowed(&method, params.as_deref())).map_err(Error::ParseError)?;

let fut = self.transport.send(notif);

Expand All @@ -196,7 +201,7 @@ impl ClientT for HttpClient {
let id = guard.inner();
let params = params.to_rpc_params()?;

let request = RequestSer::new(&id, method, params);
let request = RequestSer::borrowed(&id, &method, params.as_deref());
let raw = serde_json::to_string(&request).map_err(Error::ParseError)?;

let fut = self.transport.send_and_read_body(raw);
Expand Down Expand Up @@ -230,23 +235,23 @@ impl ClientT for HttpClient {
}

#[instrument(name = "batch", skip(self, batch), level = "trace")]
async fn batch_request<'a, R>(&self, batch: BatchRequestBuilder<'a>) -> Result<Vec<R>, Error>
async fn batch_request<'a, R>(&self, batch: BatchRequestBuilder<'a>) -> Result<BatchResponse<'a, R>, Error>
where
R: DeserializeOwned + Default + Clone,
R: DeserializeOwned,
{
let batch = batch.build();
let guard = self.id_manager.next_request_ids(batch.len())?;
let ids: Vec<Id> = guard.inner();
let batch = batch.build()?;
let guard = self.id_manager.next_request_id()?;
let id_range = generate_batch_id_range(&guard, batch.len() as u64)?;

let mut batch_request = Vec::with_capacity(batch.len());
// NOTE(niklasad1): `ID` is not necessarily monotonically increasing.
let mut ordered_requests = Vec::with_capacity(batch.len());
let mut request_set = FxHashMap::with_capacity_and_hasher(batch.len(), Default::default());

for (pos, (method, params)) in batch.into_iter().enumerate() {
batch_request.push(RequestSer::new(&ids[pos], method, params));
ordered_requests.push(&ids[pos]);
request_set.insert(&ids[pos], pos);
for ((method, params), id) in batch.into_iter().zip(id_range.clone()) {
let id = self.id_manager.as_id_kind().into_id(id);
batch_request.push(RequestSer {
jsonrpc: TwoPointZero,
id,
method: method.into(),
params: params.map(StdCow::Owned),
});
}

let fut = self.transport.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?);
Expand All @@ -257,26 +262,49 @@ impl ClientT for HttpClient {
Ok(Err(e)) => return Err(Error::Transport(e.into())),
};

// NOTE: it's decoded first to `JsonRawValue` and then to `R` below to get
// a better error message if `R` couldn't be decoded.
let rps: Vec<Response<&JsonRawValue>> =
serde_json::from_slice(&body).map_err(|_| match serde_json::from_slice::<ErrorResponse>(&body) {
Ok(e) => Error::Call(CallError::Custom(e.error_object().clone().into_owned())),
Err(e) => Error::ParseError(e),
})?;

// NOTE: `R::default` is placeholder and will be replaced in loop below.
let mut responses = vec![R::default(); ordered_requests.len()];
for rp in rps {
let pos = match request_set.get(&rp.id) {
Some(pos) => *pos,
None => return Err(Error::InvalidRequestId),
let json_rps: Vec<&JsonRawValue> = serde_json::from_slice(&body).map_err(Error::ParseError)?;

let mut responses = Vec::with_capacity(json_rps.len());
let mut successful_calls = 0;
let mut failed_calls = 0;

for _ in 0..json_rps.len() {
let err_obj = ErrorObject::borrowed(0, &"", None);
responses.push(Err(ErrorResponse::borrowed(err_obj, Id::Null)));
}

for rp in json_rps {
let (id, res) = match serde_json::from_str::<Response<R>>(rp.get()).map_err(Error::ParseError) {
Ok(r) => {
let id = r.id.try_parse_inner_as_number().ok_or(Error::InvalidRequestId)?;
successful_calls += 1;
(id, Ok(r.into_owned()))
}
Err(err) => match serde_json::from_str::<ErrorResponse>(rp.get()).map_err(Error::ParseError) {
Ok(err) => {
let id = err.id().try_parse_inner_as_number().ok_or(Error::InvalidRequestId)?;
failed_calls += 1;
(id, Err(err.into_owned()))
}
Err(_) => {
return Err(err);
}
},
};
let result = serde_json::from_str(rp.result.get()).map_err(Error::ParseError)?;
responses[pos] = result;

let maybe_elem = id
.checked_sub(id_range.start)
.and_then(|p| p.try_into().ok())
.and_then(|p: usize| responses.get_mut(p));

if let Some(elem) = maybe_elem {
*elem = res;
} else {
return Err(Error::InvalidRequestId);
}
}

Ok(responses)
Ok(BatchResponse::new(successful_calls, responses, failed_calls))
}
}

Expand Down
Loading