Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: jsonrpsee as service and low-level API for more fine-grained API to disconnect peers etc #1224

Merged
merged 61 commits into from
Nov 16, 2023
Merged
Show file tree
Hide file tree
Changes from 28 commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
a2858ff
refactor new jsonrpc middleware
niklasad1 Sep 11, 2023
0bdbfd9
add jsonrpsee specific service trait
niklasad1 Oct 13, 2023
679543c
use tower::ServiceBuilder for composable middleware
niklasad1 Oct 13, 2023
1c2a8a6
revert changelog
niklasad1 Oct 13, 2023
264944c
fix nits
niklasad1 Oct 13, 2023
ed2ea57
types: impl Clone
niklasad1 Oct 16, 2023
461fed9
clarify examples
niklasad1 Oct 16, 2023
013f045
Update server/src/middleware/mod.rs
niklasad1 Oct 16, 2023
a8eb284
Update server/src/transport/http.rs
niklasad1 Oct 16, 2023
5c003a0
remove some boiler plate
niklasad1 Oct 16, 2023
7432768
add back logging
niklasad1 Oct 16, 2023
cb627ae
remove needless Arc
niklasad1 Oct 17, 2023
0fc11a3
remove clone bounds for Middleware
niklasad1 Oct 17, 2023
5b94aa8
add wrapper for tower::ServiceBuilder
niklasad1 Oct 17, 2023
6e9e308
fix docs
niklasad1 Oct 17, 2023
1ac49a1
add modify request example
niklasad1 Oct 17, 2023
d1dbce3
add rate limit example
niklasad1 Oct 17, 2023
2b10662
fix some nits in rate limiting middleware example
niklasad1 Oct 18, 2023
be0103d
Meta -> Context
niklasad1 Oct 18, 2023
4376602
restruct middleware module
niklasad1 Oct 19, 2023
9462220
Merge remote-tracking branch 'origin/master' into na-new-jsonrpc-midd…
niklasad1 Oct 19, 2023
c182c1f
fix broken links
niklasad1 Oct 20, 2023
cad5ffc
Merge remote-tracking branch 'origin/master' into na-new-jsonrpc-midd…
niklasad1 Oct 20, 2023
9b34959
jsonrpsee service PoC
niklasad1 Oct 23, 2023
cc45bcf
add example how to disconnect misbehaving peers
niklasad1 Oct 23, 2023
f3137de
cleanup example
niklasad1 Oct 24, 2023
77caa9c
cleanup
niklasad1 Oct 24, 2023
cc16285
complete example with http
niklasad1 Oct 31, 2023
c01d763
Merge remote-tracking branch 'origin/master' into na-jsonrpsee-service
niklasad1 Nov 3, 2023
a64767a
rename some stuff
niklasad1 Nov 3, 2023
75cda8e
remove needless deps
niklasad1 Nov 3, 2023
9213acc
Update server/src/server.rs
niklasad1 Nov 3, 2023
7a76b5b
revert unintentional change
niklasad1 Nov 3, 2023
0a9f393
Merge remote-tracking branch 'origin/na-jsonrpsee-service' into na-js…
niklasad1 Nov 3, 2023
6e93dea
Merge remote-tracking branch 'origin/master' into na-jsonrpsee-service
niklasad1 Nov 7, 2023
b50851b
add hyper low-level example
niklasad1 Nov 7, 2023
a8dafa0
address grumbles
niklasad1 Nov 7, 2023
3c95d2e
remove useless example
niklasad1 Nov 7, 2023
853f81d
improve jsonrpsee as service example
niklasad1 Nov 8, 2023
6499e1b
address grumbles
niklasad1 Nov 8, 2023
7c03ed3
Update examples/examples/jsonrpsee_as_service.rs
niklasad1 Nov 8, 2023
5d1dc3f
fix bad docs
niklasad1 Nov 8, 2023
e617563
Merge remote-tracking branch 'origin/na-jsonrpsee-service' into na-js…
niklasad1 Nov 8, 2023
e4a796c
fix grumbles: no leaky `Settings`
niklasad1 Nov 8, 2023
dfb288b
cleanup
niklasad1 Nov 8, 2023
a1d6ffb
unify server_cfg
niklasad1 Nov 8, 2023
3a124c3
grumbles: revert pub items
niklasad1 Nov 8, 2023
e69a310
Merge remote-tracking branch 'origin/master' into na-jsonrpsee-service
niklasad1 Nov 9, 2023
960142d
cleanup
niklasad1 Nov 9, 2023
439a89a
Update server/src/server.rs
niklasad1 Nov 9, 2023
0e27104
simplify doc links
niklasad1 Nov 9, 2023
4092202
fix `RpcServerBuilder::option_layer`
niklasad1 Nov 9, 2023
b16a917
Merge remote-tracking branch 'origin/master' into na-jsonrpsee-service
niklasad1 Nov 10, 2023
8381243
remove TransportProtocol from RpcServiceT
niklasad1 Nov 13, 2023
b242a61
fix test build
niklasad1 Nov 13, 2023
5573a3c
Update examples/examples/jsonrpsee_as_service.rs
niklasad1 Nov 13, 2023
cc3dc82
grumbles: remove a few clones in examples
niklasad1 Nov 14, 2023
ecbf1e6
Update examples/examples/jsonrpsee_as_service.rs
niklasad1 Nov 14, 2023
714f761
remove clone bounds for TowerServiceBuilder
niklasad1 Nov 14, 2023
d33abc4
remove missing clone impl
niklasad1 Nov 14, 2023
1b96fd6
remove more needless clone
niklasad1 Nov 14, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client/http-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ impl<L> HttpClientBuilder<L> {
}

/// Set custom tower middleware.
pub fn set_middleware<T>(self, service_builder: tower::ServiceBuilder<T>) -> HttpClientBuilder<T> {
pub fn set_http_middleware<T>(self, service_builder: tower::ServiceBuilder<T>) -> HttpClientBuilder<T> {
HttpClientBuilder {
certificate_store: self.certificate_store,
id_kind: self.id_kind,
Expand Down
42 changes: 35 additions & 7 deletions core/src/server/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,8 @@ pub struct MethodResponse {
pub result: String,
/// Indicates whether the call was successful or not.
pub success_or_error: MethodResponseResult,
/// Indicates whether the call was a subscription response.
pub is_subscription: bool,
}

impl MethodResponse {
Expand All @@ -192,6 +194,11 @@ impl MethodResponse {
pub fn is_error(&self) -> bool {
self.success_or_error.is_success()
}

/// Returns whether the call is a subscription.
pub fn is_subscription(&self) -> bool {
self.is_subscription
}
}

/// Represent the outcome of a method call success or failed.
Expand Down Expand Up @@ -226,6 +233,16 @@ impl MethodResponseResult {
}

impl MethodResponse {
/// Create a subscription response.
pub fn subscription_response<T>(id: Id, result: ResponsePayload<T>, max_response_size: usize) -> Self
where
T: Serialize + Clone,
{
let mut rp = Self::response(id, result, max_response_size);
rp.is_subscription = true;
rp
}

/// Send a JSON-RPC response to the client. If the serialization of `result` exceeds `max_response_size`,
/// an error will be sent instead.
pub fn response<T>(id: Id, result: ResponsePayload<T>, max_response_size: usize) -> Self
Expand All @@ -245,7 +262,7 @@ impl MethodResponse {
// Safety - serde_json does not emit invalid UTF-8.
let result = unsafe { String::from_utf8_unchecked(writer.into_bytes()) };

Self { result, success_or_error }
Self { result, success_or_error, is_subscription: false }
}
Err(err) => {
tracing::error!("Error serializing response: {:?}", err);
Expand All @@ -262,24 +279,35 @@ impl MethodResponse {
let result =
serde_json::to_string(&Response::new(err, id)).expect("JSON serialization infallible; qed");

Self { result, success_or_error: MethodResponseResult::Failed(err_code) }
Self { result, success_or_error: MethodResponseResult::Failed(err_code), is_subscription: false }
} else {
let err_code = ErrorCode::InternalError;
let result = serde_json::to_string(&Response::new(err_code.into(), id))
.expect("JSON serialization infallible; qed");
Self { result, success_or_error: MethodResponseResult::Failed(err_code.code()) }
Self {
result,
success_or_error: MethodResponseResult::Failed(err_code.code()),
is_subscription: false,
}
}
}
}
}

/// Create a subscription response error.
pub fn subscription_error<'a>(id: Id, err: impl Into<ErrorObject<'a>>) -> Self {
let mut rp = Self::error(id, err);
rp.is_subscription = true;
rp
}

/// Create a `MethodResponse` from an error.
pub fn error<'a>(id: Id, err: impl Into<ErrorObject<'a>>) -> Self {
let err: ErrorObject = err.into();
let err_code = err.code();
let err = ResponsePayload::error_borrowed(err);
let result = serde_json::to_string(&Response::new(err, id)).expect("JSON serialization infallible; qed");
Self { result, success_or_error: MethodResponseResult::Failed(err_code) }
Self { result, success_or_error: MethodResponseResult::Failed(err_code), is_subscription: false }
}
}

Expand All @@ -305,13 +333,13 @@ impl BatchResponseBuilder {
///
/// Fails if the max limit is exceeded and returns to error response to
/// return early in order to not process method call responses which are thrown away anyway.
pub fn append(&mut self, response: &MethodResponse) -> Result<(), String> {
pub fn append(&mut self, response: &MethodResponse) -> Result<(), MethodResponse> {
// `,` will occupy one extra byte for each entry
// on the last item the `,` is replaced by `]`.
let len = response.result.len() + self.result.len() + 1;

if len > self.max_response_size {
Err(batch_response_error(Id::Null, reject_too_big_batch_response(self.max_response_size)))
Err(MethodResponse::error(Id::Null, reject_too_big_batch_response(self.max_response_size)))
} else {
self.result.push_str(&response.result);
self.result.push(',');
Expand Down Expand Up @@ -409,6 +437,6 @@ mod tests {
let batch = BatchResponseBuilder::new_with_limit(63).append(&method).unwrap_err();

let exp_err = r#"{"jsonrpc":"2.0","error":{"code":-32011,"message":"The batch response was too large","data":"Exceeded max limit of 63"},"id":null}"#;
assert_eq!(batch, exp_err);
assert_eq!(batch.result, exp_err);
}
}
38 changes: 37 additions & 1 deletion core/src/server/rpc_module.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,32 @@ pub enum MethodCallback {
Unsubscription(UnsubscriptionMethod),
}

/// The kind of the JSON-RPC method call, it can be a subscription, method call or unknown.
#[derive(Debug, Copy, Clone)]
pub enum MethodKind {
/// Subscription Call.
Subscription,
/// Unsubscription Call.
Unsubscription,
/// Method call.
MethodCall,
/// Unknown method.
Unknown,
}

impl std::fmt::Display for MethodKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let s = match self {
Self::Subscription => "subscription",
Self::MethodCall => "method call",
Self::Unknown => "unknown",
Self::Unsubscription => "unsubscription",
};

write!(f, "{s}")
}
}

/// Result of a method, either direct value or a future of one.
pub enum MethodResult<T> {
/// Result by value
Expand Down Expand Up @@ -219,6 +245,16 @@ impl Methods {
self.callbacks.get_key_value(method_name).map(|(k, v)| (*k, v))
}

/// Returns the kind of the method callback,
pub fn method_kind(&self, method_name: &str) -> MethodKind {
match self.method(method_name) {
None => MethodKind::Unknown,
Some(MethodCallback::Async(_)) | Some(MethodCallback::Sync(_)) => MethodKind::MethodCall,
Some(MethodCallback::Subscription(_)) => MethodKind::Subscription,
Some(MethodCallback::Unsubscription(_)) => MethodKind::Unsubscription,
}
}

/// Helper to call a method on the `RPC module` without having to spin up a server.
///
/// The params must be serializable as JSON array, see [`ToRpcParams`] for further documentation.
Expand Down Expand Up @@ -307,7 +343,7 @@ impl Methods {
) -> RawRpcResponse {
let (tx, mut rx) = mpsc::channel(buf_size);
let id = req.id.clone();
let params = Params::new(req.params.map(|params| params.get()));
let params = Params::new(req.params.as_ref().map(|params| params.as_ref().get()));

let response = match self.method(&req.method) {
None => MethodResponse::error(req.id, ErrorObject::from(ErrorCode::MethodNotFound)),
Expand Down
7 changes: 5 additions & 2 deletions core/src/server/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ impl PendingSubscriptionSink {
/// the return value is simply ignored because no further notification are propagated
/// once reject has been called.
pub async fn reject(self, err: impl Into<ErrorObjectOwned>) {
let err = MethodResponse::error(self.id, err.into());
let err = MethodResponse::subscription_error(self.id, err.into());
_ = self.inner.send(err.result.clone()).await;
_ = self.subscribe.send(err);
}
Expand All @@ -269,7 +269,7 @@ impl PendingSubscriptionSink {
///
/// Panics if the subscription response exceeded the `max_response_size`.
pub async fn accept(self) -> Result<SubscriptionSink, PendingSubscriptionAcceptError> {
let response = MethodResponse::response(
let response = MethodResponse::subscription_response(
self.id,
ResponsePayload::result_borrowed(&self.uniq_sub.sub_id),
self.inner.max_response_size() as usize,
Expand Down Expand Up @@ -438,6 +438,9 @@ impl Subscription {
let raw = self.rx.recv().await?;

tracing::debug!("[Subscription::next]: rx {}", raw);

// clippy complains about this but it doesn't compile without the extra res binding.
#[allow(clippy::let_and_return)]
let res = match serde_json::from_str::<SubscriptionResponse<T>>(&raw) {
Ok(r) => Some(Ok((r.params.result, r.params.subscription.into_owned()))),
Err(e) => match serde_json::from_str::<SubscriptionError<serde_json::Value>>(&raw) {
Expand Down
7 changes: 7 additions & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,3 +20,10 @@ tower-http = { version = "0.4.0", features = ["full"] }
tower = { version = "0.4.13", features = ["full"] }
hyper = "0.14.20"
console-subscriber = "0.2.0"

[features]
server = []

[dependencies]
soketto = { version = "0.7.1", features = ["http"] }
tokio-util = "0.7.9"
2 changes: 1 addition & 1 deletion examples/examples/cors_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
// modifying requests / responses. These features are independent of one another
// and can also be used separately.
// In this example, we use both features.
let server = Server::builder().set_middleware(middleware).build("127.0.0.1:0".parse::<SocketAddr>()?).await?;
let server = Server::builder().set_http_middleware(middleware).build("127.0.0.1:0".parse::<SocketAddr>()?).await?;

let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _| {
Expand Down
5 changes: 3 additions & 2 deletions examples/examples/host_filter_middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ use std::net::SocketAddr;
use jsonrpsee::core::client::ClientT;
use jsonrpsee::http_client::HttpClientBuilder;
use jsonrpsee::rpc_params;
use jsonrpsee::server::middleware::HostFilterLayer;
use jsonrpsee::server::middleware::http::HostFilterLayer;
use jsonrpsee::server::{RpcModule, Server};

#[tokio::main]
Expand Down Expand Up @@ -65,7 +65,8 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
// `HostFilerLayer::new` only fails on invalid URIs..
.layer(HostFilterLayer::new(["example.com"]).unwrap());

let server = Server::builder().set_middleware(service_builder).build("127.0.0.1:0".parse::<SocketAddr>()?).await?;
let server =
Server::builder().set_http_middleware(service_builder).build("127.0.0.1:0".parse::<SocketAddr>()?).await?;

let addr = server.local_addr()?;

Expand Down
2 changes: 1 addition & 1 deletion examples/examples/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ async fn main() -> anyhow::Result<()> {
.on_response(DefaultOnResponse::new().include_headers(true).latency_unit(LatencyUnit::Micros)),
);

let client = HttpClientBuilder::default().set_middleware(middleware).build(url)?;
let client = HttpClientBuilder::default().set_http_middleware(middleware).build(url)?;
let params = rpc_params![1_u64, 2, 3];
let response: Result<String, _> = client.request("say_hello", params).await;
tracing::info!("r: {:?}", response);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,14 @@
// IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
// DEALINGS IN THE SOFTWARE.

//! This example sets a custom tower service middleware to the RPC implementation.
//! jsonrpsee supports two kinds of middlewares `http_middleware` and `rpc_middleware`.
//!
//! This example demonstrates how to use the `http_middleware` which applies for each
//! HTTP request.
//!
//! A typical use-case for this to apply a specific CORS policy which applies both
//! for HTTP and WebSocket.
//!
//! It works with both `WebSocket` and `HTTP` which is done in the example.

use hyper::body::Bytes;
use hyper::http::HeaderValue;
Expand Down Expand Up @@ -104,7 +109,8 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
.layer(cors)
.timeout(Duration::from_secs(2));

let server = Server::builder().set_middleware(service_builder).build("127.0.0.1:0".parse::<SocketAddr>()?).await?;
let server =
Server::builder().set_http_middleware(service_builder).build("127.0.0.1:0".parse::<SocketAddr>()?).await?;

let addr = server.local_addr()?;

Expand Down
5 changes: 3 additions & 2 deletions examples/examples/http_proxy_middleware.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ use std::time::Duration;
use jsonrpsee::core::client::ClientT;
use jsonrpsee::http_client::HttpClientBuilder;
use jsonrpsee::rpc_params;
use jsonrpsee::server::middleware::ProxyGetRequestLayer;
use jsonrpsee::server::middleware::http::ProxyGetRequestLayer;
use jsonrpsee::server::{RpcModule, Server};

#[tokio::main]
Expand Down Expand Up @@ -87,7 +87,8 @@ async fn run_server() -> anyhow::Result<SocketAddr> {
.layer(ProxyGetRequestLayer::new("/health", "system_health")?)
.timeout(Duration::from_secs(2));

let server = Server::builder().set_middleware(service_builder).build("127.0.0.1:0".parse::<SocketAddr>()?).await?;
let server =
Server::builder().set_http_middleware(service_builder).build("127.0.0.1:0".parse::<SocketAddr>()?).await?;

let addr = server.local_addr()?;

Expand Down
Loading