From 5e01e6beb344cd0465d82d07cbcb21b495ac3f4e Mon Sep 17 00:00:00 2001 From: patrick Date: Thu, 28 Jul 2022 10:11:54 +0100 Subject: [PATCH 1/6] add id to tracing span --- client/http-client/src/client.rs | 2 +- core/src/client/async_client/mod.rs | 7 ++++--- core/src/tracing.rs | 5 +++-- http-server/src/server.rs | 5 +++-- ws-server/src/server.rs | 2 +- 5 files changed, 12 insertions(+), 9 deletions(-) diff --git a/client/http-client/src/client.rs b/client/http-client/src/client.rs index 6bb2466bfa..22231f79c4 100644 --- a/client/http-client/src/client.rs +++ b/client/http-client/src/client.rs @@ -189,7 +189,7 @@ impl ClientT for HttpClient { let guard = self.id_manager.next_request_id()?; let id = guard.inner(); let request = RequestSer::new(&id, method, params); - let trace = RpcTracing::method_call(method); + let trace = RpcTracing::method_call(method, &id); let _enter = trace.span().enter(); let raw = serde_json::to_string(&request).map_err(Error::ParseError)?; diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 43dfb9089f..7f2cca77e9 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -269,7 +269,7 @@ impl ClientT for Client { let (send_back_tx, send_back_rx) = oneshot::channel(); let guard = self.id_manager.next_request_id()?; let id = guard.inner(); - let trace = RpcTracing::method_call(method); + let trace = RpcTracing::method_call(method, &id); let _enter = trace.span().enter(); let raw = serde_json::to_string(&RequestSer::new(&id, method, params)).map_err(Error::ParseError)?; @@ -363,10 +363,11 @@ impl SubscriptionClientT for Client { let guard = self.id_manager.next_request_ids(2)?; let mut ids: Vec = guard.inner(); - let trace = RpcTracing::method_call(subscribe_method); + let id = ids[0].clone(); + + let trace = RpcTracing::method_call(subscribe_method, &id); let _enter = trace.span().enter(); - let id = ids[0].clone(); let raw = serde_json::to_string(&RequestSer::new(&id, subscribe_method, params)).map_err(Error::ParseError)?; diff --git a/core/src/tracing.rs b/core/src/tracing.rs index eb491418a8..ae789a2ab2 100644 --- a/core/src/tracing.rs +++ b/core/src/tracing.rs @@ -1,5 +1,6 @@ use serde::Serialize; use tracing::Level; +use jsonrpsee_types::Id; #[derive(Debug)] /// Wrapper over [`tracing::Span`] to trace individual method calls, notifications and similar. @@ -9,8 +10,8 @@ impl RpcTracing { /// Create a `method_call` tracing target. /// /// To enable this you need to call `RpcTracing::method_call("some_method").span().enable()`. - pub fn method_call(method: &str) -> Self { - Self(tracing::span!(tracing::Level::DEBUG, "method_call", %method)) + pub fn method_call(method: &str, id:&Id) -> Self { + Self(tracing::span!(tracing::Level::DEBUG, "method_call", %method, id =? id)) } /// Create a `notification` tracing target. diff --git a/http-server/src/server.rs b/http-server/src/server.rs index 40943e4808..f219e0a436 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -685,7 +685,8 @@ async fn process_health_request( request_start: M::Instant, max_log_length: u32, ) -> Result, HyperError> { - let trace = RpcTracing::method_call(&health_api.method); + let id = Id::Number(100); + let trace = RpcTracing::method_call(&health_api.method, &id); let _enter = trace.span().enter(); tx_log_from_str("HTTP health API", max_log_length); @@ -805,7 +806,7 @@ where async fn process_single_request(data: Vec, call: CallData<'_, M>) -> MethodResponse { if let Ok(req) = serde_json::from_slice::(&data) { - let trace = RpcTracing::method_call(&req.method); + let trace = RpcTracing::method_call(&req.method, &req.id); let _enter = trace.span().enter(); rx_log_from_json(&req, call.max_log_length); diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index e27d0306d9..33a2f75826 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -893,7 +893,7 @@ where async fn process_single_request(data: Vec, call: CallData<'_, M>) -> MethodResult { if let Ok(req) = serde_json::from_slice::(&data) { - let trace = RpcTracing::method_call(&req.method); + let trace = RpcTracing::method_call(&req.method, &req.id); let _enter = trace.span().enter(); rx_log_from_json(&req, call.max_log_length); From ffb1fc41911aea487fa31b75b77793f8745ea403 Mon Sep 17 00:00:00 2001 From: patrick Date: Thu, 28 Jul 2022 22:24:51 +0100 Subject: [PATCH 2/6] Revert "add id to tracing span" This reverts commit 5e01e6beb344cd0465d82d07cbcb21b495ac3f4e. --- client/http-client/src/client.rs | 2 +- core/src/client/async_client/mod.rs | 7 +++---- core/src/tracing.rs | 5 ++--- http-server/src/server.rs | 5 ++--- ws-server/src/server.rs | 2 +- 5 files changed, 9 insertions(+), 12 deletions(-) diff --git a/client/http-client/src/client.rs b/client/http-client/src/client.rs index 22231f79c4..6bb2466bfa 100644 --- a/client/http-client/src/client.rs +++ b/client/http-client/src/client.rs @@ -189,7 +189,7 @@ impl ClientT for HttpClient { let guard = self.id_manager.next_request_id()?; let id = guard.inner(); let request = RequestSer::new(&id, method, params); - let trace = RpcTracing::method_call(method, &id); + let trace = RpcTracing::method_call(method); let _enter = trace.span().enter(); let raw = serde_json::to_string(&request).map_err(Error::ParseError)?; diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 7f2cca77e9..43dfb9089f 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -269,7 +269,7 @@ impl ClientT for Client { let (send_back_tx, send_back_rx) = oneshot::channel(); let guard = self.id_manager.next_request_id()?; let id = guard.inner(); - let trace = RpcTracing::method_call(method, &id); + let trace = RpcTracing::method_call(method); let _enter = trace.span().enter(); let raw = serde_json::to_string(&RequestSer::new(&id, method, params)).map_err(Error::ParseError)?; @@ -363,11 +363,10 @@ impl SubscriptionClientT for Client { let guard = self.id_manager.next_request_ids(2)?; let mut ids: Vec = guard.inner(); - let id = ids[0].clone(); - - let trace = RpcTracing::method_call(subscribe_method, &id); + let trace = RpcTracing::method_call(subscribe_method); let _enter = trace.span().enter(); + let id = ids[0].clone(); let raw = serde_json::to_string(&RequestSer::new(&id, subscribe_method, params)).map_err(Error::ParseError)?; diff --git a/core/src/tracing.rs b/core/src/tracing.rs index ae789a2ab2..eb491418a8 100644 --- a/core/src/tracing.rs +++ b/core/src/tracing.rs @@ -1,6 +1,5 @@ use serde::Serialize; use tracing::Level; -use jsonrpsee_types::Id; #[derive(Debug)] /// Wrapper over [`tracing::Span`] to trace individual method calls, notifications and similar. @@ -10,8 +9,8 @@ impl RpcTracing { /// Create a `method_call` tracing target. /// /// To enable this you need to call `RpcTracing::method_call("some_method").span().enable()`. - pub fn method_call(method: &str, id:&Id) -> Self { - Self(tracing::span!(tracing::Level::DEBUG, "method_call", %method, id =? id)) + pub fn method_call(method: &str) -> Self { + Self(tracing::span!(tracing::Level::DEBUG, "method_call", %method)) } /// Create a `notification` tracing target. diff --git a/http-server/src/server.rs b/http-server/src/server.rs index f219e0a436..40943e4808 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -685,8 +685,7 @@ async fn process_health_request( request_start: M::Instant, max_log_length: u32, ) -> Result, HyperError> { - let id = Id::Number(100); - let trace = RpcTracing::method_call(&health_api.method, &id); + let trace = RpcTracing::method_call(&health_api.method); let _enter = trace.span().enter(); tx_log_from_str("HTTP health API", max_log_length); @@ -806,7 +805,7 @@ where async fn process_single_request(data: Vec, call: CallData<'_, M>) -> MethodResponse { if let Ok(req) = serde_json::from_slice::(&data) { - let trace = RpcTracing::method_call(&req.method, &req.id); + let trace = RpcTracing::method_call(&req.method); let _enter = trace.span().enter(); rx_log_from_json(&req, call.max_log_length); diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index 33a2f75826..e27d0306d9 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -893,7 +893,7 @@ where async fn process_single_request(data: Vec, call: CallData<'_, M>) -> MethodResult { if let Ok(req) = serde_json::from_slice::(&data) { - let trace = RpcTracing::method_call(&req.method, &req.id); + let trace = RpcTracing::method_call(&req.method); let _enter = trace.span().enter(); rx_log_from_json(&req, call.max_log_length); From 55765c034ae43f34e90c75b94e5dd31a08c213e3 Mon Sep 17 00:00:00 2001 From: patrick Date: Thu, 28 Jul 2022 22:44:46 +0100 Subject: [PATCH 3/6] Avoid using Span::enter() in async functions, following tracing's doc instruction https://docs.rs/tracing/latest/tracing/struct.Span.html#in-asynchronous-code --- core/src/tracing.rs | 5 ++++ http-server/src/server.rs | 53 +++++++++++++++++---------------------- 2 files changed, 28 insertions(+), 30 deletions(-) diff --git a/core/src/tracing.rs b/core/src/tracing.rs index eb491418a8..4e4b3fb0c0 100644 --- a/core/src/tracing.rs +++ b/core/src/tracing.rs @@ -31,6 +31,11 @@ impl RpcTracing { pub fn span(&self) -> &tracing::Span { &self.0 } + + /// Get the inner span. + pub fn into_span(self) -> tracing::Span { + self.0 + } } /// Helper for writing trace logs from str. diff --git a/http-server/src/server.rs b/http-server/src/server.rs index 40943e4808..d0bf790bf9 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -686,11 +686,9 @@ async fn process_health_request( max_log_length: u32, ) -> Result, HyperError> { let trace = RpcTracing::method_call(&health_api.method); - let _enter = trace.span().enter(); - - tx_log_from_str("HTTP health API", max_log_length); - - let response = match methods.method_with_name(&health_api.method) { + let response = async { + tx_log_from_str("HTTP health API", max_log_length); + match methods.method_with_name(&health_api.method) { None => MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::MethodNotFound)), Some((_name, method_callback)) => match method_callback.inner() { MethodKind::Sync(callback) => (callback)(Id::Number(0), Params::new(None), max_response_body_size as usize), @@ -704,7 +702,7 @@ async fn process_health_request( MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::InternalError)) } }, - }; + }}.instrument(trace.into_span()).await; rx_log_from_str(&response.result, max_log_length); middleware.on_result(&health_api.method, response.success, request_start); @@ -766,21 +764,18 @@ where let batch_stream = futures_util::stream::iter(batch); let trace = RpcTracing::batch(); - let _enter = trace.span().enter(); - - let batch_response = batch_stream - .try_fold( - BatchResponseBuilder::new_with_limit(max_response_size as usize), - |batch_response, (req, call)| async move { - let params = Params::new(req.params.map(|params| params.get())); - - let response = execute_call(Call { name: &req.method, params, id: req.id, call }).await; - - batch_response.append(&response) - }, - ) - .in_current_span() - .await; + let batch_response = async{ + batch_stream + .try_fold( + BatchResponseBuilder::new_with_limit(max_response_size as usize), + |batch_response, (req, call)| async move { + let params = Params::new(req.params.map(|params| params.get())); + let response = execute_call(Call { name: &req.method, params, id: req.id, call }).await; + batch_response.append(&response) + }, + ) + .await + }.instrument(trace.into_span()).await; return match batch_response { Ok(batch) => batch.finish(), @@ -806,15 +801,13 @@ where async fn process_single_request(data: Vec, call: CallData<'_, M>) -> MethodResponse { if let Ok(req) = serde_json::from_slice::(&data) { let trace = RpcTracing::method_call(&req.method); - let _enter = trace.span().enter(); - - rx_log_from_json(&req, call.max_log_length); - - let params = Params::new(req.params.map(|params| params.get())); - let name = &req.method; - let id = req.id; - - execute_call(Call { name, params, id, call }).in_current_span().await + async{ + rx_log_from_json(&req, call.max_log_length); + let params = Params::new(req.params.map(|params| params.get())); + let name = &req.method; + let id = req.id; + execute_call(Call { name, params, id, call }).await + }.instrument(trace.into_span()).await } else if let Ok(req) = serde_json::from_slice::(&data) { let trace = RpcTracing::notification(&req.method); let _enter = trace.span().enter(); From e337eab0358f01f4adc6c0c2baa48eb8b5099a9e Mon Sep 17 00:00:00 2001 From: patrick Date: Fri, 29 Jul 2022 01:14:52 +0100 Subject: [PATCH 4/6] * fixed all Span::enter() * clean up --- client/http-client/src/client.rs | 135 +++++++------- core/src/client/async_client/mod.rs | 269 ++++++++++++++-------------- core/src/tracing.rs | 5 - http-server/src/server.rs | 78 ++++---- ws-server/src/server.rs | 49 ++--- 5 files changed, 268 insertions(+), 268 deletions(-) diff --git a/client/http-client/src/client.rs b/client/http-client/src/client.rs index 6bb2466bfa..de23122f97 100644 --- a/client/http-client/src/client.rs +++ b/client/http-client/src/client.rs @@ -168,17 +168,17 @@ pub struct HttpClient { impl ClientT for HttpClient { async fn notification<'a>(&self, method: &'a str, params: Option>) -> Result<(), Error> { let trace = RpcTracing::notification(method); - let _enter = trace.span().enter(); + async { + let notif = serde_json::to_string(&NotificationSer::new(method, params)).map_err(Error::ParseError)?; - let notif = serde_json::to_string(&NotificationSer::new(method, params)).map_err(Error::ParseError)?; + let fut = self.transport.send(notif); - let fut = self.transport.send(notif).in_current_span(); - - match tokio::time::timeout(self.request_timeout, fut).await { - Ok(Ok(ok)) => Ok(ok), - Err(_) => Err(Error::RequestTimeout), - Ok(Err(e)) => Err(Error::Transport(e.into())), - } + match tokio::time::timeout(self.request_timeout, fut).await { + Ok(Ok(ok)) => Ok(ok), + Err(_) => Err(Error::RequestTimeout), + Ok(Err(e)) => Err(Error::Transport(e.into())), + } + }.instrument(trace.span().clone()).await } /// Perform a request towards the server. @@ -190,34 +190,35 @@ impl ClientT for HttpClient { let id = guard.inner(); let request = RequestSer::new(&id, method, params); let trace = RpcTracing::method_call(method); - let _enter = trace.span().enter(); - let raw = serde_json::to_string(&request).map_err(Error::ParseError)?; + async { + let raw = serde_json::to_string(&request).map_err(Error::ParseError)?; - let fut = self.transport.send_and_read_body(raw).in_current_span(); - let body = match tokio::time::timeout(self.request_timeout, fut).await { - Ok(Ok(body)) => body, - Err(_e) => { - return Err(Error::RequestTimeout); - } - Ok(Err(e)) => { - return Err(Error::Transport(e.into())); - } - }; + let fut = self.transport.send_and_read_body(raw); + let body = match tokio::time::timeout(self.request_timeout, fut).await { + Ok(Ok(body)) => body, + Err(_e) => { + return Err(Error::RequestTimeout); + } + Ok(Err(e)) => { + return Err(Error::Transport(e.into())); + } + }; - let response: Response<_> = match serde_json::from_slice(&body) { - Ok(response) => response, - Err(_) => { - let err: ErrorResponse = serde_json::from_slice(&body).map_err(Error::ParseError)?; - return Err(Error::Call(CallError::Custom(err.error_object().clone().into_owned()))); - } - }; + let response: Response<_> = match serde_json::from_slice(&body) { + Ok(response) => response, + Err(_) => { + let err: ErrorResponse = serde_json::from_slice(&body).map_err(Error::ParseError)?; + return Err(Error::Call(CallError::Custom(err.error_object().clone().into_owned()))); + } + }; - if response.id == id { - Ok(response.result) - } else { - Err(Error::InvalidRequestId) - } + if response.id == id { + Ok(response.result) + } else { + Err(Error::InvalidRequestId) + } + }.instrument(trace.span().clone()).await } async fn batch_request<'a, R>(&self, batch: Vec<(&'a str, Option>)>) -> Result, Error> @@ -227,46 +228,46 @@ impl ClientT for HttpClient { let guard = self.id_manager.next_request_ids(batch.len())?; let ids: Vec = guard.inner(); let trace = RpcTracing::batch(); - let _enter = trace.span().enter(); - let mut batch_request = Vec::with_capacity(batch.len()); - // NOTE(niklasad1): `ID` is not necessarily monotonically increasing. - let mut ordered_requests = Vec::with_capacity(batch.len()); - let mut request_set = FxHashMap::with_capacity_and_hasher(batch.len(), Default::default()); + async { + let mut batch_request = Vec::with_capacity(batch.len()); + // NOTE(niklasad1): `ID` is not necessarily monotonically increasing. + let mut ordered_requests = Vec::with_capacity(batch.len()); + let mut request_set = FxHashMap::with_capacity_and_hasher(batch.len(), Default::default()); - for (pos, (method, params)) in batch.into_iter().enumerate() { - batch_request.push(RequestSer::new(&ids[pos], method, params)); - ordered_requests.push(&ids[pos]); - request_set.insert(&ids[pos], pos); - } + for (pos, (method, params)) in batch.into_iter().enumerate() { + batch_request.push(RequestSer::new(&ids[pos], method, params)); + ordered_requests.push(&ids[pos]); + request_set.insert(&ids[pos], pos); + } - let fut = self - .transport - .send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?) - .in_current_span(); + let fut = self + .transport + .send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?); - let body = match tokio::time::timeout(self.request_timeout, fut).await { - Ok(Ok(body)) => body, - Err(_e) => return Err(Error::RequestTimeout), - Ok(Err(e)) => return Err(Error::Transport(e.into())), - }; + let body = match tokio::time::timeout(self.request_timeout, fut).await { + Ok(Ok(body)) => body, + Err(_e) => return Err(Error::RequestTimeout), + Ok(Err(e)) => return Err(Error::Transport(e.into())), + }; - let rps: Vec> = - serde_json::from_slice(&body).map_err(|_| match serde_json::from_slice::(&body) { - Ok(e) => Error::Call(CallError::Custom(e.error_object().clone().into_owned())), - Err(e) => Error::ParseError(e), - })?; + let rps: Vec> = + serde_json::from_slice(&body).map_err(|_| match serde_json::from_slice::(&body) { + Ok(e) => Error::Call(CallError::Custom(e.error_object().clone().into_owned())), + Err(e) => Error::ParseError(e), + })?; - // NOTE: `R::default` is placeholder and will be replaced in loop below. - let mut responses = vec![R::default(); ordered_requests.len()]; - for rp in rps { - let pos = match request_set.get(&rp.id) { - Some(pos) => *pos, - None => return Err(Error::InvalidRequestId), - }; - responses[pos] = rp.result - } - Ok(responses) + // NOTE: `R::default` is placeholder and will be replaced in loop below. + let mut responses = vec![R::default(); ordered_requests.len()]; + for rp in rps { + let pos = match request_set.get(&rp.id) { + Some(pos) => *pos, + None => return Err(Error::InvalidRequestId), + }; + responses[pos] = rp.result + } + Ok(responses) + }.instrument(trace.span().clone()).await } } diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 43dfb9089f..a11f0357c5 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -243,102 +243,104 @@ impl Drop for Client { #[async_trait] impl ClientT for Client { async fn notification<'a>(&self, method: &'a str, params: Option>) -> Result<(), Error> { - // NOTE: we use this to guard against max number of concurrent requests. - let _req_id = self.id_manager.next_request_id()?; - let notif = NotificationSer::new(method, params); - let trace = RpcTracing::batch(); - let _enter = trace.span().enter(); - - let raw = serde_json::to_string(¬if).map_err(Error::ParseError)?; - tx_log_from_str(&raw, self.max_log_length); - - let mut sender = self.to_back.clone(); - let fut = sender.send(FrontToBack::Notification(raw)).in_current_span(); - - match future::select(fut, Delay::new(self.request_timeout)).await { - Either::Left((Ok(()), _)) => Ok(()), - Either::Left((Err(_), _)) => Err(self.read_error_from_backend().await), - Either::Right((_, _)) => Err(Error::RequestTimeout), - } - } + // NOTE: we use this to guard against max number of concurrent requests. + let _req_id = self.id_manager.next_request_id()?; + let notif = NotificationSer::new(method, params); + let trace = RpcTracing::batch(); + + async { + let raw = serde_json::to_string(¬if).map_err(Error::ParseError)?; + tx_log_from_str(&raw, self.max_log_length); + + let mut sender = self.to_back.clone(); + let fut = sender.send(FrontToBack::Notification(raw)); + + match future::select(fut, Delay::new(self.request_timeout)).await { + Either::Left((Ok(()), _)) => Ok(()), + Either::Left((Err(_), _)) => Err(self.read_error_from_backend().await), + Either::Right((_, _)) => Err(Error::RequestTimeout), + } + }.instrument(trace.span().clone()).await + } async fn request<'a, R>(&self, method: &'a str, params: Option>) -> Result where R: DeserializeOwned, - { - let (send_back_tx, send_back_rx) = oneshot::channel(); - let guard = self.id_manager.next_request_id()?; - let id = guard.inner(); - let trace = RpcTracing::method_call(method); - let _enter = trace.span().enter(); - - let raw = serde_json::to_string(&RequestSer::new(&id, method, params)).map_err(Error::ParseError)?; - tx_log_from_str(&raw, self.max_log_length); - - if self - .to_back - .clone() - .send(FrontToBack::Request(RequestMessage { raw, id: id.clone(), send_back: Some(send_back_tx) })) - .await - .is_err() - { - return Err(self.read_error_from_backend().await); - } - - let res = call_with_timeout(self.request_timeout, send_back_rx).in_current_span().await; - let json_value = match res { - Ok(Ok(v)) => v, - Ok(Err(err)) => return Err(err), - Err(_) => return Err(self.read_error_from_backend().await), - }; - - rx_log_from_json(&Response::new(&json_value, id), self.max_log_length); - - serde_json::from_value(json_value).map_err(Error::ParseError) - } + { + let (send_back_tx, send_back_rx) = oneshot::channel(); + let guard = self.id_manager.next_request_id()?; + let id = guard.inner(); + let trace = RpcTracing::method_call(method); + + async { + let raw = serde_json::to_string(&RequestSer::new(&id, method, params)).map_err(Error::ParseError)?; + tx_log_from_str(&raw, self.max_log_length); + + if self + .to_back + .clone() + .send(FrontToBack::Request(RequestMessage { raw, id: id.clone(), send_back: Some(send_back_tx) })) + .await + .is_err() + { + return Err(self.read_error_from_backend().await); + } + + let res = call_with_timeout(self.request_timeout, send_back_rx).await; + let json_value = match res { + Ok(Ok(v)) => v, + Ok(Err(err)) => return Err(err), + Err(_) => return Err(self.read_error_from_backend().await), + }; + + rx_log_from_json(&Response::new(&json_value, id), self.max_log_length); + + serde_json::from_value(json_value).map_err(Error::ParseError) + }.instrument(trace.span().clone()).await + } async fn batch_request<'a, R>(&self, batch: Vec<(&'a str, Option>)>) -> Result, Error> where R: DeserializeOwned + Default + Clone, - { - let guard = self.id_manager.next_request_ids(batch.len())?; - let batch_ids: Vec = guard.inner(); - let mut batches = Vec::with_capacity(batch.len()); - let log = RpcTracing::batch(); - let _enter = log.span().enter(); - - for (idx, (method, params)) in batch.into_iter().enumerate() { - batches.push(RequestSer::new(&batch_ids[idx], method, params)); - } - - let (send_back_tx, send_back_rx) = oneshot::channel(); - - let raw = serde_json::to_string(&batches).map_err(Error::ParseError)?; - - tx_log_from_str(&raw, self.max_log_length); - - if self - .to_back - .clone() - .send(FrontToBack::Batch(BatchMessage { raw, ids: batch_ids, send_back: send_back_tx })) - .await - .is_err() - { - return Err(self.read_error_from_backend().await); - } - - let res = call_with_timeout(self.request_timeout, send_back_rx).in_current_span().await; - let json_values = match res { - Ok(Ok(v)) => v, - Ok(Err(err)) => return Err(err), - Err(_) => return Err(self.read_error_from_backend().await), - }; - - rx_log_from_json(&json_values, self.max_log_length); - - let values: Result<_, _> = - json_values.into_iter().map(|val| serde_json::from_value(val).map_err(Error::ParseError)).collect(); - Ok(values?) + { + let trace = RpcTracing::batch(); + async { + let guard = self.id_manager.next_request_ids(batch.len())?; + let batch_ids: Vec = guard.inner(); + let mut batches = Vec::with_capacity(batch.len()); + for (idx, (method, params)) in batch.into_iter().enumerate() { + batches.push(RequestSer::new(&batch_ids[idx], method, params)); + } + + let (send_back_tx, send_back_rx) = oneshot::channel(); + + let raw = serde_json::to_string(&batches).map_err(Error::ParseError)?; + + tx_log_from_str(&raw, self.max_log_length); + + if self + .to_back + .clone() + .send(FrontToBack::Batch(BatchMessage { raw, ids: batch_ids, send_back: send_back_tx })) + .await + .is_err() + { + return Err(self.read_error_from_backend().await); + } + + let res = call_with_timeout(self.request_timeout, send_back_rx).await; + let json_values = match res { + Ok(Ok(v)) => v, + Ok(Err(err)) => return Err(err), + Err(_) => return Err(self.read_error_from_backend().await), + }; + + rx_log_from_json(&json_values, self.max_log_length); + + let values: Result<_, _> = + json_values.into_iter().map(|val| serde_json::from_value(val).map_err(Error::ParseError)).collect(); + Ok(values?) + }.instrument(trace.span().clone()).await } } @@ -356,51 +358,52 @@ impl SubscriptionClientT for Client { ) -> Result, Error> where N: DeserializeOwned, - { - if subscribe_method == unsubscribe_method { - return Err(Error::SubscriptionNameConflict(unsubscribe_method.to_owned())); - } - - let guard = self.id_manager.next_request_ids(2)?; - let mut ids: Vec = guard.inner(); - let trace = RpcTracing::method_call(subscribe_method); - let _enter = trace.span().enter(); - - let id = ids[0].clone(); - - let raw = serde_json::to_string(&RequestSer::new(&id, subscribe_method, params)).map_err(Error::ParseError)?; - - tx_log_from_str(&raw, self.max_log_length); - - let (send_back_tx, send_back_rx) = oneshot::channel(); - if self - .to_back - .clone() - .send(FrontToBack::Subscribe(SubscriptionMessage { - raw, - subscribe_id: ids.swap_remove(0), - unsubscribe_id: ids.swap_remove(0), - unsubscribe_method: unsubscribe_method.to_owned(), - send_back: send_back_tx, - })) - .await - .is_err() - { - return Err(self.read_error_from_backend().await); - } - - let res = call_with_timeout(self.request_timeout, send_back_rx).in_current_span().await; - - let (notifs_rx, sub_id) = match res { - Ok(Ok(val)) => val, - Ok(Err(err)) => return Err(err), - Err(_) => return Err(self.read_error_from_backend().await), - }; - - rx_log_from_json(&Response::new(&sub_id, id), self.max_log_length); - - Ok(Subscription::new(self.to_back.clone(), notifs_rx, SubscriptionKind::Subscription(sub_id))) - } + { + if subscribe_method == unsubscribe_method { + return Err(Error::SubscriptionNameConflict(unsubscribe_method.to_owned())); + } + + let guard = self.id_manager.next_request_ids(2)?; + let mut ids: Vec = guard.inner(); + let trace = RpcTracing::method_call(subscribe_method); + + async { + let id = ids[0].clone(); + + let raw = serde_json::to_string(&RequestSer::new(&id, subscribe_method, params)).map_err(Error::ParseError)?; + + tx_log_from_str(&raw, self.max_log_length); + + let (send_back_tx, send_back_rx) = oneshot::channel(); + if self + .to_back + .clone() + .send(FrontToBack::Subscribe(SubscriptionMessage { + raw, + subscribe_id: ids.swap_remove(0), + unsubscribe_id: ids.swap_remove(0), + unsubscribe_method: unsubscribe_method.to_owned(), + send_back: send_back_tx, + })) + .await + .is_err() + { + return Err(self.read_error_from_backend().await); + } + + let res = call_with_timeout(self.request_timeout, send_back_rx).await; + + let (notifs_rx, sub_id) = match res { + Ok(Ok(val)) => val, + Ok(Err(err)) => return Err(err), + Err(_) => return Err(self.read_error_from_backend().await), + }; + + rx_log_from_json(&Response::new(&sub_id, id), self.max_log_length); + + Ok(Subscription::new(self.to_back.clone(), notifs_rx, SubscriptionKind::Subscription(sub_id))) + }.instrument(trace.span().clone()).await + } /// Subscribe to a specific method. async fn subscribe_to_method<'a, N>(&self, method: &'a str) -> Result, Error> diff --git a/core/src/tracing.rs b/core/src/tracing.rs index 4e4b3fb0c0..eb491418a8 100644 --- a/core/src/tracing.rs +++ b/core/src/tracing.rs @@ -31,11 +31,6 @@ impl RpcTracing { pub fn span(&self) -> &tracing::Span { &self.0 } - - /// Get the inner span. - pub fn into_span(self) -> tracing::Span { - self.0 - } } /// Helper for writing trace logs from str. diff --git a/http-server/src/server.rs b/http-server/src/server.rs index d0bf790bf9..fa43ea3fdf 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -686,41 +686,39 @@ async fn process_health_request( max_log_length: u32, ) -> Result, HyperError> { let trace = RpcTracing::method_call(&health_api.method); - let response = async { + async { tx_log_from_str("HTTP health API", max_log_length); - match methods.method_with_name(&health_api.method) { - None => MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::MethodNotFound)), - Some((_name, method_callback)) => match method_callback.inner() { - MethodKind::Sync(callback) => (callback)(Id::Number(0), Params::new(None), max_response_body_size as usize), - MethodKind::Async(callback) => { - (callback)(Id::Number(0), Params::new(None), 0, max_response_body_size as usize, None) - .in_current_span() - .await - } + let response = match methods.method_with_name(&health_api.method) { + None => MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::MethodNotFound)), + Some((_name, method_callback)) => match method_callback.inner() { + MethodKind::Sync(callback) => (callback)(Id::Number(0), Params::new(None), max_response_body_size as usize), + MethodKind::Async(callback) => { + (callback)(Id::Number(0), Params::new(None), 0, max_response_body_size as usize, None).await + } + MethodKind::Subscription(_) | MethodKind::Unsubscription(_) => { + MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::InternalError)) + } + }, + }; - MethodKind::Subscription(_) | MethodKind::Unsubscription(_) => { - MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::InternalError)) - } - }, - }}.instrument(trace.into_span()).await; + rx_log_from_str(&response.result, max_log_length); + middleware.on_result(&health_api.method, response.success, request_start); + middleware.on_response(&response.result, request_start); - rx_log_from_str(&response.result, max_log_length); - middleware.on_result(&health_api.method, response.success, request_start); - middleware.on_response(&response.result, request_start); + if response.success { + #[derive(serde::Deserialize)] + struct RpcPayload<'a> { + #[serde(borrow)] + result: &'a serde_json::value::RawValue, + } - if response.success { - #[derive(serde::Deserialize)] - struct RpcPayload<'a> { - #[serde(borrow)] - result: &'a serde_json::value::RawValue, + let payload: RpcPayload = serde_json::from_str(&response.result) + .expect("valid JSON-RPC response must have a result field and be valid JSON; qed"); + Ok(response::ok_response(payload.result.to_string())) + } else { + Ok(response::internal_error()) } - - let payload: RpcPayload = serde_json::from_str(&response.result) - .expect("valid JSON-RPC response must have a result field and be valid JSON; qed"); - Ok(response::ok_response(payload.result.to_string())) - } else { - Ok(response::internal_error()) - } + }.instrument(trace.span().clone()).await } #[derive(Debug, Clone)] @@ -764,8 +762,8 @@ where let batch_stream = futures_util::stream::iter(batch); let trace = RpcTracing::batch(); - let batch_response = async{ - batch_stream + return async { + let batch_response = batch_stream .try_fold( BatchResponseBuilder::new_with_limit(max_response_size as usize), |batch_response, (req, call)| async move { @@ -774,13 +772,13 @@ where batch_response.append(&response) }, ) - .await - }.instrument(trace.into_span()).await; + .await; - return match batch_response { - Ok(batch) => batch.finish(), - Err(batch_err) => batch_err, - }; + match batch_response { + Ok(batch) => batch.finish(), + Err(batch_err) => batch_err, + } + }.instrument(trace.span().clone()).await; } if let Ok(batch) = serde_json::from_slice::>(&data) { @@ -801,13 +799,13 @@ where async fn process_single_request(data: Vec, call: CallData<'_, M>) -> MethodResponse { if let Ok(req) = serde_json::from_slice::(&data) { let trace = RpcTracing::method_call(&req.method); - async{ + async { rx_log_from_json(&req, call.max_log_length); let params = Params::new(req.params.map(|params| params.get())); let name = &req.method; let id = req.id; execute_call(Call { name, params, id, call }).await - }.instrument(trace.into_span()).await + }.instrument(trace.span().clone()).await } else if let Ok(req) = serde_json::from_slice::(&data) { let trace = RpcTracing::notification(&req.method); let _enter = trace.span().enter(); diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index e27d0306d9..48c704064a 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -861,27 +861,29 @@ where let batch_stream = futures_util::stream::iter(batch); let trace = RpcTracing::batch(); - let _enter = trace.span().enter(); - let max_response_size = call.max_response_body_size; - let batch_response = batch_stream - .try_fold( - BatchResponseBuilder::new_with_limit(max_response_size as usize), - |batch_response, (req, call)| async move { - let params = Params::new(req.params.map(|params| params.get())); + return async { + let max_response_size = call.max_response_body_size; - let response = - execute_call(Call { name: &req.method, params, id: req.id, call }).in_current_span().await; + let batch_response = batch_stream + .try_fold( + BatchResponseBuilder::new_with_limit(max_response_size as usize), + |batch_response, (req, call)| async move { + let params = Params::new(req.params.map(|params| params.get())); - batch_response.append(response.as_inner()) - }, - ) - .await; + let response = + execute_call(Call { name: &req.method, params, id: req.id, call }).await; - return match batch_response { - Ok(batch) => batch.finish(), - Err(batch_err) => batch_err, - }; + batch_response.append(response.as_inner()) + }, + ) + .await; + + match batch_response { + Ok(batch) => batch.finish(), + Err(batch_err) => batch_err, + } + }.instrument(trace.span().clone()).await } else { BatchResponse::error(Id::Null, ErrorObject::from(ErrorCode::InvalidRequest)) }; @@ -894,15 +896,16 @@ where async fn process_single_request(data: Vec, call: CallData<'_, M>) -> MethodResult { if let Ok(req) = serde_json::from_slice::(&data) { let trace = RpcTracing::method_call(&req.method); - let _enter = trace.span().enter(); - rx_log_from_json(&req, call.max_log_length); + async { + rx_log_from_json(&req, call.max_log_length); - let params = Params::new(req.params.map(|params| params.get())); - let name = &req.method; - let id = req.id; + let params = Params::new(req.params.map(|params| params.get())); + let name = &req.method; + let id = req.id; - execute_call(Call { name, params, id, call }).in_current_span().await + execute_call(Call { name, params, id, call }).in_current_span().await + }.instrument(trace.span().clone()).await } else { let (id, code) = prepare_error(&data); MethodResult::SendAndMiddleware(MethodResponse::error(id, ErrorObject::from(code))) From 76ce4e09537cb58b56e668121c95ea0a760db5fa Mon Sep 17 00:00:00 2001 From: patrick Date: Fri, 29 Jul 2022 01:30:48 +0100 Subject: [PATCH 5/6] fix fmt --- client/http-client/src/client.rs | 17 +++++++++++------ http-server/src/server.rs | 16 ++++++++++++---- ws-server/src/server.rs | 15 ++++++++------- 3 files changed, 31 insertions(+), 17 deletions(-) diff --git a/client/http-client/src/client.rs b/client/http-client/src/client.rs index de23122f97..37d706d422 100644 --- a/client/http-client/src/client.rs +++ b/client/http-client/src/client.rs @@ -178,7 +178,9 @@ impl ClientT for HttpClient { Err(_) => Err(Error::RequestTimeout), Ok(Err(e)) => Err(Error::Transport(e.into())), } - }.instrument(trace.span().clone()).await + } + .instrument(trace.span().clone()) + .await } /// Perform a request towards the server. @@ -218,7 +220,9 @@ impl ClientT for HttpClient { } else { Err(Error::InvalidRequestId) } - }.instrument(trace.span().clone()).await + } + .instrument(trace.span().clone()) + .await } async fn batch_request<'a, R>(&self, batch: Vec<(&'a str, Option>)>) -> Result, Error> @@ -241,9 +245,8 @@ impl ClientT for HttpClient { request_set.insert(&ids[pos], pos); } - let fut = self - .transport - .send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?); + let fut = + self.transport.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?); let body = match tokio::time::timeout(self.request_timeout, fut).await { Ok(Ok(body)) => body, @@ -267,7 +270,9 @@ impl ClientT for HttpClient { responses[pos] = rp.result } Ok(responses) - }.instrument(trace.span().clone()).await + } + .instrument(trace.span().clone()) + .await } } diff --git a/http-server/src/server.rs b/http-server/src/server.rs index fa43ea3fdf..b3507a0d74 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -691,7 +691,9 @@ async fn process_health_request( let response = match methods.method_with_name(&health_api.method) { None => MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::MethodNotFound)), Some((_name, method_callback)) => match method_callback.inner() { - MethodKind::Sync(callback) => (callback)(Id::Number(0), Params::new(None), max_response_body_size as usize), + MethodKind::Sync(callback) => { + (callback)(Id::Number(0), Params::new(None), max_response_body_size as usize) + } MethodKind::Async(callback) => { (callback)(Id::Number(0), Params::new(None), 0, max_response_body_size as usize, None).await } @@ -718,7 +720,9 @@ async fn process_health_request( } else { Ok(response::internal_error()) } - }.instrument(trace.span().clone()).await + } + .instrument(trace.span().clone()) + .await } #[derive(Debug, Clone)] @@ -778,7 +782,9 @@ where Ok(batch) => batch.finish(), Err(batch_err) => batch_err, } - }.instrument(trace.span().clone()).await; + } + .instrument(trace.span().clone()) + .await; } if let Ok(batch) = serde_json::from_slice::>(&data) { @@ -805,7 +811,9 @@ async fn process_single_request(data: Vec, call: CallData<'_, let name = &req.method; let id = req.id; execute_call(Call { name, params, id, call }).await - }.instrument(trace.span().clone()).await + } + .instrument(trace.span().clone()) + .await } else if let Ok(req) = serde_json::from_slice::(&data) { let trace = RpcTracing::notification(&req.method); let _enter = trace.span().enter(); diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index 48c704064a..e778516d06 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -870,10 +870,7 @@ where BatchResponseBuilder::new_with_limit(max_response_size as usize), |batch_response, (req, call)| async move { let params = Params::new(req.params.map(|params| params.get())); - - let response = - execute_call(Call { name: &req.method, params, id: req.id, call }).await; - + let response = execute_call(Call { name: &req.method, params, id: req.id, call }).await; batch_response.append(response.as_inner()) }, ) @@ -883,7 +880,9 @@ where Ok(batch) => batch.finish(), Err(batch_err) => batch_err, } - }.instrument(trace.span().clone()).await + } + .instrument(trace.span().clone()) + .await; } else { BatchResponse::error(Id::Null, ErrorObject::from(ErrorCode::InvalidRequest)) }; @@ -904,8 +903,10 @@ async fn process_single_request(data: Vec, call: CallData<'_, let name = &req.method; let id = req.id; - execute_call(Call { name, params, id, call }).in_current_span().await - }.instrument(trace.span().clone()).await + execute_call(Call { name, params, id, call }).await + } + .instrument(trace.span().clone()) + .await } else { let (id, code) = prepare_error(&data); MethodResult::SendAndMiddleware(MethodResponse::error(id, ErrorObject::from(code))) From 05174b4ccdea56b62551085e24dd6a3bbac300d8 Mon Sep 17 00:00:00 2001 From: patrick Date: Fri, 29 Jul 2022 09:27:31 +0100 Subject: [PATCH 6/6] changed RpcTracing::span -> into_span instead of cloning the span --- client/http-client/src/client.rs | 6 +++--- core/src/client/async_client/mod.rs | 8 ++++---- core/src/tracing.rs | 4 ++-- http-server/src/server.rs | 10 +++++----- ws-server/src/server.rs | 4 ++-- 5 files changed, 16 insertions(+), 16 deletions(-) diff --git a/client/http-client/src/client.rs b/client/http-client/src/client.rs index 37d706d422..541459c03e 100644 --- a/client/http-client/src/client.rs +++ b/client/http-client/src/client.rs @@ -179,7 +179,7 @@ impl ClientT for HttpClient { Ok(Err(e)) => Err(Error::Transport(e.into())), } } - .instrument(trace.span().clone()) + .instrument(trace.into_span()) .await } @@ -221,7 +221,7 @@ impl ClientT for HttpClient { Err(Error::InvalidRequestId) } } - .instrument(trace.span().clone()) + .instrument(trace.into_span()) .await } @@ -271,7 +271,7 @@ impl ClientT for HttpClient { } Ok(responses) } - .instrument(trace.span().clone()) + .instrument(trace.into_span()) .await } } diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index a11f0357c5..cdf07f6a27 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -260,7 +260,7 @@ impl ClientT for Client { Either::Left((Err(_), _)) => Err(self.read_error_from_backend().await), Either::Right((_, _)) => Err(Error::RequestTimeout), } - }.instrument(trace.span().clone()).await + }.instrument(trace.into_span()).await } async fn request<'a, R>(&self, method: &'a str, params: Option>) -> Result @@ -296,7 +296,7 @@ impl ClientT for Client { rx_log_from_json(&Response::new(&json_value, id), self.max_log_length); serde_json::from_value(json_value).map_err(Error::ParseError) - }.instrument(trace.span().clone()).await + }.instrument(trace.into_span()).await } async fn batch_request<'a, R>(&self, batch: Vec<(&'a str, Option>)>) -> Result, Error> @@ -340,7 +340,7 @@ impl ClientT for Client { let values: Result<_, _> = json_values.into_iter().map(|val| serde_json::from_value(val).map_err(Error::ParseError)).collect(); Ok(values?) - }.instrument(trace.span().clone()).await + }.instrument(trace.into_span()).await } } @@ -402,7 +402,7 @@ impl SubscriptionClientT for Client { rx_log_from_json(&Response::new(&sub_id, id), self.max_log_length); Ok(Subscription::new(self.to_back.clone(), notifs_rx, SubscriptionKind::Subscription(sub_id))) - }.instrument(trace.span().clone()).await + }.instrument(trace.into_span()).await } /// Subscribe to a specific method. diff --git a/core/src/tracing.rs b/core/src/tracing.rs index eb491418a8..6f9312a4ae 100644 --- a/core/src/tracing.rs +++ b/core/src/tracing.rs @@ -28,8 +28,8 @@ impl RpcTracing { } /// Get the inner span. - pub fn span(&self) -> &tracing::Span { - &self.0 + pub fn into_span(self) -> tracing::Span { + self.0 } } diff --git a/http-server/src/server.rs b/http-server/src/server.rs index b3507a0d74..0f57b302fe 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -721,7 +721,7 @@ async fn process_health_request( Ok(response::internal_error()) } } - .instrument(trace.span().clone()) + .instrument(trace.into_span()) .await } @@ -783,7 +783,7 @@ where Err(batch_err) => batch_err, } } - .instrument(trace.span().clone()) + .instrument(trace.into_span()) .await; } @@ -812,12 +812,12 @@ async fn process_single_request(data: Vec, call: CallData<'_, let id = req.id; execute_call(Call { name, params, id, call }).await } - .instrument(trace.span().clone()) + .instrument(trace.into_span()) .await } else if let Ok(req) = serde_json::from_slice::(&data) { let trace = RpcTracing::notification(&req.method); - let _enter = trace.span().enter(); - + let span = trace.into_span(); + let _enter = span.enter(); rx_log_from_json(&req, call.max_log_length); MethodResponse { result: String::new(), success: true } diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index e778516d06..74e261919e 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -881,7 +881,7 @@ where Err(batch_err) => batch_err, } } - .instrument(trace.span().clone()) + .instrument(trace.into_span()) .await; } else { BatchResponse::error(Id::Null, ErrorObject::from(ErrorCode::InvalidRequest)) @@ -905,7 +905,7 @@ async fn process_single_request(data: Vec, call: CallData<'_, execute_call(Call { name, params, id, call }).await } - .instrument(trace.span().clone()) + .instrument(trace.into_span()) .await } else { let (id, code) = prepare_error(&data);