diff --git a/client/http-client/src/client.rs b/client/http-client/src/client.rs index 6bb2466bfa..541459c03e 100644 --- a/client/http-client/src/client.rs +++ b/client/http-client/src/client.rs @@ -168,17 +168,19 @@ pub struct HttpClient { impl ClientT for HttpClient { async fn notification<'a>(&self, method: &'a str, params: Option>) -> Result<(), Error> { let trace = RpcTracing::notification(method); - let _enter = trace.span().enter(); + async { + let notif = serde_json::to_string(&NotificationSer::new(method, params)).map_err(Error::ParseError)?; - let notif = serde_json::to_string(&NotificationSer::new(method, params)).map_err(Error::ParseError)?; + let fut = self.transport.send(notif); - let fut = self.transport.send(notif).in_current_span(); - - match tokio::time::timeout(self.request_timeout, fut).await { - Ok(Ok(ok)) => Ok(ok), - Err(_) => Err(Error::RequestTimeout), - Ok(Err(e)) => Err(Error::Transport(e.into())), + match tokio::time::timeout(self.request_timeout, fut).await { + Ok(Ok(ok)) => Ok(ok), + Err(_) => Err(Error::RequestTimeout), + Ok(Err(e)) => Err(Error::Transport(e.into())), + } } + .instrument(trace.into_span()) + .await } /// Perform a request towards the server. @@ -190,34 +192,37 @@ impl ClientT for HttpClient { let id = guard.inner(); let request = RequestSer::new(&id, method, params); let trace = RpcTracing::method_call(method); - let _enter = trace.span().enter(); - let raw = serde_json::to_string(&request).map_err(Error::ParseError)?; + async { + let raw = serde_json::to_string(&request).map_err(Error::ParseError)?; - let fut = self.transport.send_and_read_body(raw).in_current_span(); - let body = match tokio::time::timeout(self.request_timeout, fut).await { - Ok(Ok(body)) => body, - Err(_e) => { - return Err(Error::RequestTimeout); - } - Ok(Err(e)) => { - return Err(Error::Transport(e.into())); - } - }; + let fut = self.transport.send_and_read_body(raw); + let body = match tokio::time::timeout(self.request_timeout, fut).await { + Ok(Ok(body)) => body, + Err(_e) => { + return Err(Error::RequestTimeout); + } + Ok(Err(e)) => { + return Err(Error::Transport(e.into())); + } + }; - let response: Response<_> = match serde_json::from_slice(&body) { - Ok(response) => response, - Err(_) => { - let err: ErrorResponse = serde_json::from_slice(&body).map_err(Error::ParseError)?; - return Err(Error::Call(CallError::Custom(err.error_object().clone().into_owned()))); - } - }; + let response: Response<_> = match serde_json::from_slice(&body) { + Ok(response) => response, + Err(_) => { + let err: ErrorResponse = serde_json::from_slice(&body).map_err(Error::ParseError)?; + return Err(Error::Call(CallError::Custom(err.error_object().clone().into_owned()))); + } + }; - if response.id == id { - Ok(response.result) - } else { - Err(Error::InvalidRequestId) + if response.id == id { + Ok(response.result) + } else { + Err(Error::InvalidRequestId) + } } + .instrument(trace.into_span()) + .await } async fn batch_request<'a, R>(&self, batch: Vec<(&'a str, Option>)>) -> Result, Error> @@ -227,46 +232,47 @@ impl ClientT for HttpClient { let guard = self.id_manager.next_request_ids(batch.len())?; let ids: Vec = guard.inner(); let trace = RpcTracing::batch(); - let _enter = trace.span().enter(); - let mut batch_request = Vec::with_capacity(batch.len()); - // NOTE(niklasad1): `ID` is not necessarily monotonically increasing. - let mut ordered_requests = Vec::with_capacity(batch.len()); - let mut request_set = FxHashMap::with_capacity_and_hasher(batch.len(), Default::default()); + async { + let mut batch_request = Vec::with_capacity(batch.len()); + // NOTE(niklasad1): `ID` is not necessarily monotonically increasing. + let mut ordered_requests = Vec::with_capacity(batch.len()); + let mut request_set = FxHashMap::with_capacity_and_hasher(batch.len(), Default::default()); - for (pos, (method, params)) in batch.into_iter().enumerate() { - batch_request.push(RequestSer::new(&ids[pos], method, params)); - ordered_requests.push(&ids[pos]); - request_set.insert(&ids[pos], pos); - } + for (pos, (method, params)) in batch.into_iter().enumerate() { + batch_request.push(RequestSer::new(&ids[pos], method, params)); + ordered_requests.push(&ids[pos]); + request_set.insert(&ids[pos], pos); + } - let fut = self - .transport - .send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?) - .in_current_span(); + let fut = + self.transport.send_and_read_body(serde_json::to_string(&batch_request).map_err(Error::ParseError)?); - let body = match tokio::time::timeout(self.request_timeout, fut).await { - Ok(Ok(body)) => body, - Err(_e) => return Err(Error::RequestTimeout), - Ok(Err(e)) => return Err(Error::Transport(e.into())), - }; + let body = match tokio::time::timeout(self.request_timeout, fut).await { + Ok(Ok(body)) => body, + Err(_e) => return Err(Error::RequestTimeout), + Ok(Err(e)) => return Err(Error::Transport(e.into())), + }; - let rps: Vec> = - serde_json::from_slice(&body).map_err(|_| match serde_json::from_slice::(&body) { - Ok(e) => Error::Call(CallError::Custom(e.error_object().clone().into_owned())), - Err(e) => Error::ParseError(e), - })?; + let rps: Vec> = + serde_json::from_slice(&body).map_err(|_| match serde_json::from_slice::(&body) { + Ok(e) => Error::Call(CallError::Custom(e.error_object().clone().into_owned())), + Err(e) => Error::ParseError(e), + })?; - // NOTE: `R::default` is placeholder and will be replaced in loop below. - let mut responses = vec![R::default(); ordered_requests.len()]; - for rp in rps { - let pos = match request_set.get(&rp.id) { - Some(pos) => *pos, - None => return Err(Error::InvalidRequestId), - }; - responses[pos] = rp.result + // NOTE: `R::default` is placeholder and will be replaced in loop below. + let mut responses = vec![R::default(); ordered_requests.len()]; + for rp in rps { + let pos = match request_set.get(&rp.id) { + Some(pos) => *pos, + None => return Err(Error::InvalidRequestId), + }; + responses[pos] = rp.result + } + Ok(responses) } - Ok(responses) + .instrument(trace.into_span()) + .await } } diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 43dfb9089f..cdf07f6a27 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -243,102 +243,104 @@ impl Drop for Client { #[async_trait] impl ClientT for Client { async fn notification<'a>(&self, method: &'a str, params: Option>) -> Result<(), Error> { - // NOTE: we use this to guard against max number of concurrent requests. - let _req_id = self.id_manager.next_request_id()?; - let notif = NotificationSer::new(method, params); - let trace = RpcTracing::batch(); - let _enter = trace.span().enter(); - - let raw = serde_json::to_string(¬if).map_err(Error::ParseError)?; - tx_log_from_str(&raw, self.max_log_length); - - let mut sender = self.to_back.clone(); - let fut = sender.send(FrontToBack::Notification(raw)).in_current_span(); - - match future::select(fut, Delay::new(self.request_timeout)).await { - Either::Left((Ok(()), _)) => Ok(()), - Either::Left((Err(_), _)) => Err(self.read_error_from_backend().await), - Either::Right((_, _)) => Err(Error::RequestTimeout), - } - } + // NOTE: we use this to guard against max number of concurrent requests. + let _req_id = self.id_manager.next_request_id()?; + let notif = NotificationSer::new(method, params); + let trace = RpcTracing::batch(); + + async { + let raw = serde_json::to_string(¬if).map_err(Error::ParseError)?; + tx_log_from_str(&raw, self.max_log_length); + + let mut sender = self.to_back.clone(); + let fut = sender.send(FrontToBack::Notification(raw)); + + match future::select(fut, Delay::new(self.request_timeout)).await { + Either::Left((Ok(()), _)) => Ok(()), + Either::Left((Err(_), _)) => Err(self.read_error_from_backend().await), + Either::Right((_, _)) => Err(Error::RequestTimeout), + } + }.instrument(trace.into_span()).await + } async fn request<'a, R>(&self, method: &'a str, params: Option>) -> Result where R: DeserializeOwned, - { - let (send_back_tx, send_back_rx) = oneshot::channel(); - let guard = self.id_manager.next_request_id()?; - let id = guard.inner(); - let trace = RpcTracing::method_call(method); - let _enter = trace.span().enter(); - - let raw = serde_json::to_string(&RequestSer::new(&id, method, params)).map_err(Error::ParseError)?; - tx_log_from_str(&raw, self.max_log_length); - - if self - .to_back - .clone() - .send(FrontToBack::Request(RequestMessage { raw, id: id.clone(), send_back: Some(send_back_tx) })) - .await - .is_err() - { - return Err(self.read_error_from_backend().await); - } - - let res = call_with_timeout(self.request_timeout, send_back_rx).in_current_span().await; - let json_value = match res { - Ok(Ok(v)) => v, - Ok(Err(err)) => return Err(err), - Err(_) => return Err(self.read_error_from_backend().await), - }; - - rx_log_from_json(&Response::new(&json_value, id), self.max_log_length); - - serde_json::from_value(json_value).map_err(Error::ParseError) - } + { + let (send_back_tx, send_back_rx) = oneshot::channel(); + let guard = self.id_manager.next_request_id()?; + let id = guard.inner(); + let trace = RpcTracing::method_call(method); + + async { + let raw = serde_json::to_string(&RequestSer::new(&id, method, params)).map_err(Error::ParseError)?; + tx_log_from_str(&raw, self.max_log_length); + + if self + .to_back + .clone() + .send(FrontToBack::Request(RequestMessage { raw, id: id.clone(), send_back: Some(send_back_tx) })) + .await + .is_err() + { + return Err(self.read_error_from_backend().await); + } + + let res = call_with_timeout(self.request_timeout, send_back_rx).await; + let json_value = match res { + Ok(Ok(v)) => v, + Ok(Err(err)) => return Err(err), + Err(_) => return Err(self.read_error_from_backend().await), + }; + + rx_log_from_json(&Response::new(&json_value, id), self.max_log_length); + + serde_json::from_value(json_value).map_err(Error::ParseError) + }.instrument(trace.into_span()).await + } async fn batch_request<'a, R>(&self, batch: Vec<(&'a str, Option>)>) -> Result, Error> where R: DeserializeOwned + Default + Clone, - { - let guard = self.id_manager.next_request_ids(batch.len())?; - let batch_ids: Vec = guard.inner(); - let mut batches = Vec::with_capacity(batch.len()); - let log = RpcTracing::batch(); - let _enter = log.span().enter(); - - for (idx, (method, params)) in batch.into_iter().enumerate() { - batches.push(RequestSer::new(&batch_ids[idx], method, params)); - } - - let (send_back_tx, send_back_rx) = oneshot::channel(); - - let raw = serde_json::to_string(&batches).map_err(Error::ParseError)?; - - tx_log_from_str(&raw, self.max_log_length); - - if self - .to_back - .clone() - .send(FrontToBack::Batch(BatchMessage { raw, ids: batch_ids, send_back: send_back_tx })) - .await - .is_err() - { - return Err(self.read_error_from_backend().await); - } - - let res = call_with_timeout(self.request_timeout, send_back_rx).in_current_span().await; - let json_values = match res { - Ok(Ok(v)) => v, - Ok(Err(err)) => return Err(err), - Err(_) => return Err(self.read_error_from_backend().await), - }; - - rx_log_from_json(&json_values, self.max_log_length); - - let values: Result<_, _> = - json_values.into_iter().map(|val| serde_json::from_value(val).map_err(Error::ParseError)).collect(); - Ok(values?) + { + let trace = RpcTracing::batch(); + async { + let guard = self.id_manager.next_request_ids(batch.len())?; + let batch_ids: Vec = guard.inner(); + let mut batches = Vec::with_capacity(batch.len()); + for (idx, (method, params)) in batch.into_iter().enumerate() { + batches.push(RequestSer::new(&batch_ids[idx], method, params)); + } + + let (send_back_tx, send_back_rx) = oneshot::channel(); + + let raw = serde_json::to_string(&batches).map_err(Error::ParseError)?; + + tx_log_from_str(&raw, self.max_log_length); + + if self + .to_back + .clone() + .send(FrontToBack::Batch(BatchMessage { raw, ids: batch_ids, send_back: send_back_tx })) + .await + .is_err() + { + return Err(self.read_error_from_backend().await); + } + + let res = call_with_timeout(self.request_timeout, send_back_rx).await; + let json_values = match res { + Ok(Ok(v)) => v, + Ok(Err(err)) => return Err(err), + Err(_) => return Err(self.read_error_from_backend().await), + }; + + rx_log_from_json(&json_values, self.max_log_length); + + let values: Result<_, _> = + json_values.into_iter().map(|val| serde_json::from_value(val).map_err(Error::ParseError)).collect(); + Ok(values?) + }.instrument(trace.into_span()).await } } @@ -356,51 +358,52 @@ impl SubscriptionClientT for Client { ) -> Result, Error> where N: DeserializeOwned, - { - if subscribe_method == unsubscribe_method { - return Err(Error::SubscriptionNameConflict(unsubscribe_method.to_owned())); - } - - let guard = self.id_manager.next_request_ids(2)?; - let mut ids: Vec = guard.inner(); - let trace = RpcTracing::method_call(subscribe_method); - let _enter = trace.span().enter(); - - let id = ids[0].clone(); - - let raw = serde_json::to_string(&RequestSer::new(&id, subscribe_method, params)).map_err(Error::ParseError)?; - - tx_log_from_str(&raw, self.max_log_length); - - let (send_back_tx, send_back_rx) = oneshot::channel(); - if self - .to_back - .clone() - .send(FrontToBack::Subscribe(SubscriptionMessage { - raw, - subscribe_id: ids.swap_remove(0), - unsubscribe_id: ids.swap_remove(0), - unsubscribe_method: unsubscribe_method.to_owned(), - send_back: send_back_tx, - })) - .await - .is_err() - { - return Err(self.read_error_from_backend().await); - } - - let res = call_with_timeout(self.request_timeout, send_back_rx).in_current_span().await; - - let (notifs_rx, sub_id) = match res { - Ok(Ok(val)) => val, - Ok(Err(err)) => return Err(err), - Err(_) => return Err(self.read_error_from_backend().await), - }; - - rx_log_from_json(&Response::new(&sub_id, id), self.max_log_length); - - Ok(Subscription::new(self.to_back.clone(), notifs_rx, SubscriptionKind::Subscription(sub_id))) - } + { + if subscribe_method == unsubscribe_method { + return Err(Error::SubscriptionNameConflict(unsubscribe_method.to_owned())); + } + + let guard = self.id_manager.next_request_ids(2)?; + let mut ids: Vec = guard.inner(); + let trace = RpcTracing::method_call(subscribe_method); + + async { + let id = ids[0].clone(); + + let raw = serde_json::to_string(&RequestSer::new(&id, subscribe_method, params)).map_err(Error::ParseError)?; + + tx_log_from_str(&raw, self.max_log_length); + + let (send_back_tx, send_back_rx) = oneshot::channel(); + if self + .to_back + .clone() + .send(FrontToBack::Subscribe(SubscriptionMessage { + raw, + subscribe_id: ids.swap_remove(0), + unsubscribe_id: ids.swap_remove(0), + unsubscribe_method: unsubscribe_method.to_owned(), + send_back: send_back_tx, + })) + .await + .is_err() + { + return Err(self.read_error_from_backend().await); + } + + let res = call_with_timeout(self.request_timeout, send_back_rx).await; + + let (notifs_rx, sub_id) = match res { + Ok(Ok(val)) => val, + Ok(Err(err)) => return Err(err), + Err(_) => return Err(self.read_error_from_backend().await), + }; + + rx_log_from_json(&Response::new(&sub_id, id), self.max_log_length); + + Ok(Subscription::new(self.to_back.clone(), notifs_rx, SubscriptionKind::Subscription(sub_id))) + }.instrument(trace.into_span()).await + } /// Subscribe to a specific method. async fn subscribe_to_method<'a, N>(&self, method: &'a str) -> Result, Error> diff --git a/core/src/tracing.rs b/core/src/tracing.rs index eb491418a8..6f9312a4ae 100644 --- a/core/src/tracing.rs +++ b/core/src/tracing.rs @@ -28,8 +28,8 @@ impl RpcTracing { } /// Get the inner span. - pub fn span(&self) -> &tracing::Span { - &self.0 + pub fn into_span(self) -> tracing::Span { + self.0 } } diff --git a/http-server/src/server.rs b/http-server/src/server.rs index 40943e4808..0f57b302fe 100644 --- a/http-server/src/server.rs +++ b/http-server/src/server.rs @@ -686,43 +686,43 @@ async fn process_health_request( max_log_length: u32, ) -> Result, HyperError> { let trace = RpcTracing::method_call(&health_api.method); - let _enter = trace.span().enter(); - - tx_log_from_str("HTTP health API", max_log_length); + async { + tx_log_from_str("HTTP health API", max_log_length); + let response = match methods.method_with_name(&health_api.method) { + None => MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::MethodNotFound)), + Some((_name, method_callback)) => match method_callback.inner() { + MethodKind::Sync(callback) => { + (callback)(Id::Number(0), Params::new(None), max_response_body_size as usize) + } + MethodKind::Async(callback) => { + (callback)(Id::Number(0), Params::new(None), 0, max_response_body_size as usize, None).await + } + MethodKind::Subscription(_) | MethodKind::Unsubscription(_) => { + MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::InternalError)) + } + }, + }; - let response = match methods.method_with_name(&health_api.method) { - None => MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::MethodNotFound)), - Some((_name, method_callback)) => match method_callback.inner() { - MethodKind::Sync(callback) => (callback)(Id::Number(0), Params::new(None), max_response_body_size as usize), - MethodKind::Async(callback) => { - (callback)(Id::Number(0), Params::new(None), 0, max_response_body_size as usize, None) - .in_current_span() - .await - } + rx_log_from_str(&response.result, max_log_length); + middleware.on_result(&health_api.method, response.success, request_start); + middleware.on_response(&response.result, request_start); - MethodKind::Subscription(_) | MethodKind::Unsubscription(_) => { - MethodResponse::error(Id::Null, ErrorObject::from(ErrorCode::InternalError)) + if response.success { + #[derive(serde::Deserialize)] + struct RpcPayload<'a> { + #[serde(borrow)] + result: &'a serde_json::value::RawValue, } - }, - }; - rx_log_from_str(&response.result, max_log_length); - middleware.on_result(&health_api.method, response.success, request_start); - middleware.on_response(&response.result, request_start); - - if response.success { - #[derive(serde::Deserialize)] - struct RpcPayload<'a> { - #[serde(borrow)] - result: &'a serde_json::value::RawValue, + let payload: RpcPayload = serde_json::from_str(&response.result) + .expect("valid JSON-RPC response must have a result field and be valid JSON; qed"); + Ok(response::ok_response(payload.result.to_string())) + } else { + Ok(response::internal_error()) } - - let payload: RpcPayload = serde_json::from_str(&response.result) - .expect("valid JSON-RPC response must have a result field and be valid JSON; qed"); - Ok(response::ok_response(payload.result.to_string())) - } else { - Ok(response::internal_error()) } + .instrument(trace.into_span()) + .await } #[derive(Debug, Clone)] @@ -766,26 +766,25 @@ where let batch_stream = futures_util::stream::iter(batch); let trace = RpcTracing::batch(); - let _enter = trace.span().enter(); - - let batch_response = batch_stream - .try_fold( - BatchResponseBuilder::new_with_limit(max_response_size as usize), - |batch_response, (req, call)| async move { - let params = Params::new(req.params.map(|params| params.get())); - - let response = execute_call(Call { name: &req.method, params, id: req.id, call }).await; - - batch_response.append(&response) - }, - ) - .in_current_span() - .await; - - return match batch_response { - Ok(batch) => batch.finish(), - Err(batch_err) => batch_err, - }; + return async { + let batch_response = batch_stream + .try_fold( + BatchResponseBuilder::new_with_limit(max_response_size as usize), + |batch_response, (req, call)| async move { + let params = Params::new(req.params.map(|params| params.get())); + let response = execute_call(Call { name: &req.method, params, id: req.id, call }).await; + batch_response.append(&response) + }, + ) + .await; + + match batch_response { + Ok(batch) => batch.finish(), + Err(batch_err) => batch_err, + } + } + .instrument(trace.into_span()) + .await; } if let Ok(batch) = serde_json::from_slice::>(&data) { @@ -806,19 +805,19 @@ where async fn process_single_request(data: Vec, call: CallData<'_, M>) -> MethodResponse { if let Ok(req) = serde_json::from_slice::(&data) { let trace = RpcTracing::method_call(&req.method); - let _enter = trace.span().enter(); - - rx_log_from_json(&req, call.max_log_length); - - let params = Params::new(req.params.map(|params| params.get())); - let name = &req.method; - let id = req.id; - - execute_call(Call { name, params, id, call }).in_current_span().await + async { + rx_log_from_json(&req, call.max_log_length); + let params = Params::new(req.params.map(|params| params.get())); + let name = &req.method; + let id = req.id; + execute_call(Call { name, params, id, call }).await + } + .instrument(trace.into_span()) + .await } else if let Ok(req) = serde_json::from_slice::(&data) { let trace = RpcTracing::notification(&req.method); - let _enter = trace.span().enter(); - + let span = trace.into_span(); + let _enter = span.enter(); rx_log_from_json(&req, call.max_log_length); MethodResponse { result: String::new(), success: true } diff --git a/ws-server/src/server.rs b/ws-server/src/server.rs index e27d0306d9..74e261919e 100644 --- a/ws-server/src/server.rs +++ b/ws-server/src/server.rs @@ -861,27 +861,28 @@ where let batch_stream = futures_util::stream::iter(batch); let trace = RpcTracing::batch(); - let _enter = trace.span().enter(); - let max_response_size = call.max_response_body_size; - let batch_response = batch_stream - .try_fold( - BatchResponseBuilder::new_with_limit(max_response_size as usize), - |batch_response, (req, call)| async move { - let params = Params::new(req.params.map(|params| params.get())); + return async { + let max_response_size = call.max_response_body_size; - let response = - execute_call(Call { name: &req.method, params, id: req.id, call }).in_current_span().await; - - batch_response.append(response.as_inner()) - }, - ) - .await; + let batch_response = batch_stream + .try_fold( + BatchResponseBuilder::new_with_limit(max_response_size as usize), + |batch_response, (req, call)| async move { + let params = Params::new(req.params.map(|params| params.get())); + let response = execute_call(Call { name: &req.method, params, id: req.id, call }).await; + batch_response.append(response.as_inner()) + }, + ) + .await; - return match batch_response { - Ok(batch) => batch.finish(), - Err(batch_err) => batch_err, - }; + match batch_response { + Ok(batch) => batch.finish(), + Err(batch_err) => batch_err, + } + } + .instrument(trace.into_span()) + .await; } else { BatchResponse::error(Id::Null, ErrorObject::from(ErrorCode::InvalidRequest)) }; @@ -894,15 +895,18 @@ where async fn process_single_request(data: Vec, call: CallData<'_, M>) -> MethodResult { if let Ok(req) = serde_json::from_slice::(&data) { let trace = RpcTracing::method_call(&req.method); - let _enter = trace.span().enter(); - rx_log_from_json(&req, call.max_log_length); + async { + rx_log_from_json(&req, call.max_log_length); - let params = Params::new(req.params.map(|params| params.get())); - let name = &req.method; - let id = req.id; + let params = Params::new(req.params.map(|params| params.get())); + let name = &req.method; + let id = req.id; - execute_call(Call { name, params, id, call }).in_current_span().await + execute_call(Call { name, params, id, call }).await + } + .instrument(trace.into_span()) + .await } else { let (id, code) = prepare_error(&data); MethodResult::SendAndMiddleware(MethodResponse::error(id, ErrorObject::from(code)))