Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow awaiting on server handles #550

Merged
merged 2 commits into from
Nov 8, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions benches/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpc_ws_se

/// Run jsonrpsee HTTP server for benchmarks.
#[cfg(not(feature = "jsonrpc-crate"))]
pub async fn http_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::http_server::HttpStopHandle) {
pub async fn http_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::http_server::HttpServerHandle) {
use jsonrpsee::http_server::{HttpServerBuilder, RpcModule};

let server = HttpServerBuilder::default()
Expand All @@ -88,7 +88,7 @@ pub async fn http_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::

/// Run jsonrpsee WebSocket server for benchmarks.
#[cfg(not(feature = "jsonrpc-crate"))]
pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::ws_server::WsStopHandle) {
pub async fn ws_server(handle: tokio::runtime::Handle) -> (String, jsonrpsee::ws_server::WsServerHandle) {
use jsonrpsee::ws_server::{RpcModule, WsServerBuilder};

let server = WsServerBuilder::default()
Expand Down
8 changes: 4 additions & 4 deletions examples/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@

use jsonrpsee::{
http_client::HttpClientBuilder,
http_server::{HttpServerBuilder, HttpStopHandle, RpcModule},
http_server::{HttpServerBuilder, HttpServerHandle, RpcModule},
rpc_params,
types::traits::Client,
};
Expand All @@ -49,12 +49,12 @@ async fn main() -> anyhow::Result<()> {
Ok(())
}

async fn run_server() -> anyhow::Result<(SocketAddr, HttpStopHandle)> {
async fn run_server() -> anyhow::Result<(SocketAddr, HttpServerHandle)> {
let server = HttpServerBuilder::default().build("127.0.0.1:0".parse()?)?;
let mut module = RpcModule::new(());
module.register_method("say_hello", |_, _| Ok("lo"))?;

let addr = server.local_addr()?;
let stop_handle = server.start(module)?;
Ok((addr, stop_handle))
let server_handle = server.start(module)?;
Ok((addr, server_handle))
}
4 changes: 2 additions & 2 deletions examples/proc_macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use jsonrpsee::{
proc_macros::rpc,
types::{async_trait, error::Error, Subscription},
ws_client::WsClientBuilder,
ws_server::{SubscriptionSink, WsServerBuilder, WsStopHandle},
ws_server::{SubscriptionSink, WsServerBuilder, WsServerHandle},
};
use std::net::SocketAddr;

Expand Down Expand Up @@ -89,7 +89,7 @@ async fn main() -> anyhow::Result<()> {
Ok(())
}

async fn run_server() -> anyhow::Result<(SocketAddr, WsStopHandle)> {
async fn run_server() -> anyhow::Result<(SocketAddr, WsServerHandle)> {
let server = WsServerBuilder::default().build("127.0.0.1:0").await?;

let addr = server.local_addr()?;
Expand Down
2 changes: 1 addition & 1 deletion http-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ pub use access_control::{
};
pub use jsonrpsee_types as types;
pub use jsonrpsee_utils::server::rpc_module::RpcModule;
pub use server::{Builder as HttpServerBuilder, Server as HttpServer, StopHandle as HttpStopHandle};
pub use server::{Builder as HttpServerBuilder, Server as HttpServer, ServerHandle as HttpServerHandle};

#[cfg(test)]
mod tests;
33 changes: 24 additions & 9 deletions http-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,7 @@

use crate::{response, AccessControl};
use futures_channel::mpsc;
use futures_util::future::join_all;
use futures_util::stream::StreamExt;
use futures_util::{future::join_all, stream::StreamExt, FutureExt};
use hyper::{
server::{conn::AddrIncoming, Builder as HyperBuilder},
service::{make_service_fn, service_fn},
Expand All @@ -49,7 +48,10 @@ use serde_json::value::RawValue;
use socket2::{Domain, Socket, Type};
use std::{
cmp,
future::Future,
net::{SocketAddr, TcpListener},
pin::Pin,
task::{Context, Poll},
};

/// Builder to create JSON-RPC HTTP server.
Expand Down Expand Up @@ -143,24 +145,37 @@ impl Default for Builder {
}
}

/// Handle used to stop the running server.
/// Handle used to run or stop the server.
#[derive(Debug)]
pub struct StopHandle {
pub struct ServerHandle {
stop_sender: mpsc::Sender<()>,
stop_handle: Option<tokio::task::JoinHandle<()>>,
pub(crate) handle: Option<tokio::task::JoinHandle<()>>,
Copy link
Member

@niklasad1 niklasad1 Nov 4, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is pub(crate) only needed for tests or why?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used for tests

}

impl StopHandle {
impl ServerHandle {
/// Requests server to stop. Returns an error if server was already stopped.
pub fn stop(mut self) -> Result<tokio::task::JoinHandle<()>, Error> {
let stop = self.stop_sender.try_send(()).map(|_| self.stop_handle.take());
let stop = self.stop_sender.try_send(()).map(|_| self.handle.take());
match stop {
Ok(Some(handle)) => Ok(handle),
_ => Err(Error::AlreadyStopped),
}
}
}

impl Future for ServerHandle {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let handle = match &mut self.handle {
Some(handle) => handle,
None => return Poll::Ready(()),
};

handle.poll_unpin(cx).map(|_| ())
}
}

/// An HTTP JSON RPC server.
#[derive(Debug)]
pub struct Server {
Expand All @@ -185,7 +200,7 @@ impl Server {
}

/// Start the server.
pub fn start(mut self, methods: impl Into<Methods>) -> Result<StopHandle, Error> {
pub fn start(mut self, methods: impl Into<Methods>) -> Result<ServerHandle, Error> {
let max_request_body_size = self.max_request_body_size;
let access_control = self.access_control;
let (tx, mut rx) = mpsc::channel(1);
Expand Down Expand Up @@ -298,7 +313,7 @@ impl Server {
let _ = server.with_graceful_shutdown(async move { rx.next().await.map_or((), |_| ()) }).await;
});

Ok(StopHandle { stop_handle: Some(handle), stop_sender: tx })
Ok(ServerHandle { handle: Some(handle), stop_sender: tx })
}
}

Expand Down
34 changes: 28 additions & 6 deletions http-server/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,17 @@
#![cfg(test)]

use std::net::SocketAddr;
use std::time::Duration;

use crate::types::error::{CallError, Error};
use crate::{server::StopHandle, HttpServerBuilder, RpcModule};
use crate::{server::ServerHandle, HttpServerBuilder, RpcModule};

use jsonrpsee_test_utils::helpers::*;
use jsonrpsee_test_utils::mocks::{Id, StatusCode, TestContext};
use jsonrpsee_test_utils::TimeoutFutureExt;
use serde_json::Value as JsonValue;

async fn server() -> (SocketAddr, StopHandle) {
async fn server() -> (SocketAddr, ServerHandle) {
let server = HttpServerBuilder::default().build("127.0.0.1:0".parse().unwrap()).unwrap();
let ctx = TestContext;
let mut module = RpcModule::new(ctx);
Expand Down Expand Up @@ -78,8 +79,8 @@ async fn server() -> (SocketAddr, StopHandle) {
})
.unwrap();

let stop_handle = server.start(module).unwrap();
(addr, stop_handle)
let server_handle = server.start(module).unwrap();
(addr, server_handle)
}

#[tokio::test]
Expand Down Expand Up @@ -380,6 +381,27 @@ async fn can_register_modules() {
#[tokio::test]
async fn stop_works() {
let _ = env_logger::try_init();
let (_addr, stop_handle) = server().with_default_timeout().await.unwrap();
assert!(matches!(stop_handle.stop().unwrap().await, Ok(_)));
let (_addr, server_handle) = server().with_default_timeout().await.unwrap();
assert!(matches!(server_handle.stop().unwrap().await, Ok(_)));
}

#[tokio::test]
async fn run_forever() {
const TIMEOUT: Duration = Duration::from_millis(200);

let _ = env_logger::try_init();
let (_addr, server_handle) = server().with_default_timeout().await.unwrap();
// Timed out.
server_handle.with_timeout(TIMEOUT).await.unwrap_err();
slumber marked this conversation as resolved.
Show resolved Hide resolved

let (_addr, server_handle) = server().await;
server_handle.handle.as_ref().unwrap().abort();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, you did this instead of stop because it moves the JoinHandle?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stop is a little bit different, here we want to emulate the actual server behavior -- if a task is cancelled or panics, the future handle resolves too.


// Cancelled task is still considered to be finished without errors.
// A subject to change.
server_handle.with_timeout(TIMEOUT).await.unwrap();

let (_addr, mut server_handle) = server().with_default_timeout().await.unwrap();
server_handle.handle.take();
server_handle.with_timeout(TIMEOUT).await.unwrap();
}
12 changes: 6 additions & 6 deletions tests/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,15 @@
// DEALINGS IN THE SOFTWARE.

use jsonrpsee::{
http_server::{HttpServerBuilder, HttpStopHandle},
http_server::{HttpServerBuilder, HttpServerHandle},
types::Error,
ws_server::{WsServerBuilder, WsStopHandle},
ws_server::{WsServerBuilder, WsServerHandle},
RpcModule,
};
use std::net::SocketAddr;
use std::time::Duration;

pub async fn websocket_server_with_subscription() -> (SocketAddr, WsStopHandle) {
pub async fn websocket_server_with_subscription() -> (SocketAddr, WsServerHandle) {
let server = WsServerBuilder::default().build("127.0.0.1:0").await.unwrap();

let mut module = RpcModule::new(());
Expand Down Expand Up @@ -88,9 +88,9 @@ pub async fn websocket_server_with_subscription() -> (SocketAddr, WsStopHandle)
.unwrap();

let addr = server.local_addr().unwrap();
let stop_handle = server.start(module).unwrap();
let server_handle = server.start(module).unwrap();

(addr, stop_handle)
(addr, server_handle)
}

pub async fn websocket_server() -> SocketAddr {
Expand All @@ -112,7 +112,7 @@ pub async fn websocket_server() -> SocketAddr {
addr
}

pub async fn http_server() -> (SocketAddr, HttpStopHandle) {
pub async fn http_server() -> (SocketAddr, HttpServerHandle) {
let server = HttpServerBuilder::default().build("127.0.0.1:0".parse().unwrap()).unwrap();
let mut module = RpcModule::new(());
let addr = server.local_addr().unwrap();
Expand Down
32 changes: 16 additions & 16 deletions tests/tests/resource_limiting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@

use jsonrpsee::{
http_client::HttpClientBuilder,
http_server::{HttpServerBuilder, HttpStopHandle},
http_server::{HttpServerBuilder, HttpServerHandle},
proc_macros::rpc,
types::{traits::Client, Error},
ws_client::WsClientBuilder,
ws_server::{WsServerBuilder, WsStopHandle},
ws_server::{WsServerBuilder, WsServerHandle},
RpcModule,
};
use tokio::time::sleep;
Expand Down Expand Up @@ -91,7 +91,7 @@ fn module_macro() -> RpcModule<()> {
().into_rpc()
}

async fn websocket_server(module: RpcModule<()>) -> Result<(SocketAddr, WsStopHandle), Error> {
async fn websocket_server(module: RpcModule<()>) -> Result<(SocketAddr, WsServerHandle), Error> {
let server = WsServerBuilder::default()
.register_resource("CPU", 6, 2)?
.register_resource("MEM", 10, 1)?
Expand All @@ -104,7 +104,7 @@ async fn websocket_server(module: RpcModule<()>) -> Result<(SocketAddr, WsStopHa
Ok((addr, handle))
}

async fn http_server(module: RpcModule<()>) -> Result<(SocketAddr, HttpStopHandle), Error> {
async fn http_server(module: RpcModule<()>) -> Result<(SocketAddr, HttpServerHandle), Error> {
let server = HttpServerBuilder::default()
.register_resource("CPU", 6, 2)?
.register_resource("MEM", 10, 1)?
Expand All @@ -128,7 +128,7 @@ fn assert_server_busy(fail: Result<String, Error>) {
}
}

async fn run_tests_on_ws_server(server_addr: SocketAddr, stop_handle: WsStopHandle) {
async fn run_tests_on_ws_server(server_addr: SocketAddr, server_handle: WsServerHandle) {
let server_url = format!("ws://{}", server_addr);
let client = WsClientBuilder::default().build(&server_url).await.unwrap();

Expand Down Expand Up @@ -162,10 +162,10 @@ async fn run_tests_on_ws_server(server_addr: SocketAddr, stop_handle: WsStopHand

// Client being active prevents the server from shutting down?!
drop(client);
stop_handle.stop().unwrap().await;
server_handle.stop().unwrap().await;
}

async fn run_tests_on_http_server(server_addr: SocketAddr, stop_handle: HttpStopHandle) {
async fn run_tests_on_http_server(server_addr: SocketAddr, server_handle: HttpServerHandle) {
let server_url = format!("http://{}", server_addr);
let client = HttpClientBuilder::default().build(&server_url).unwrap();

Expand All @@ -190,33 +190,33 @@ async fn run_tests_on_http_server(server_addr: SocketAddr, stop_handle: HttpStop

assert_eq!(passes, 3);

stop_handle.stop().unwrap().await.unwrap();
server_handle.stop().unwrap().await.unwrap();
}

#[tokio::test]
async fn ws_server_with_manual_module() {
let (server_addr, stop_handle) = websocket_server(module_manual().unwrap()).await.unwrap();
let (server_addr, server_handle) = websocket_server(module_manual().unwrap()).await.unwrap();

run_tests_on_ws_server(server_addr, stop_handle).await;
run_tests_on_ws_server(server_addr, server_handle).await;
}

#[tokio::test]
async fn ws_server_with_macro_module() {
let (server_addr, stop_handle) = websocket_server(module_macro()).await.unwrap();
let (server_addr, server_handle) = websocket_server(module_macro()).await.unwrap();

run_tests_on_ws_server(server_addr, stop_handle).await;
run_tests_on_ws_server(server_addr, server_handle).await;
}

#[tokio::test]
async fn http_server_with_manual_module() {
let (server_addr, stop_handle) = http_server(module_manual().unwrap()).await.unwrap();
let (server_addr, server_handle) = http_server(module_manual().unwrap()).await.unwrap();

run_tests_on_http_server(server_addr, stop_handle).await;
run_tests_on_http_server(server_addr, server_handle).await;
}

#[tokio::test]
async fn http_server_with_macro_module() {
let (server_addr, stop_handle) = http_server(module_macro()).await.unwrap();
let (server_addr, server_handle) = http_server(module_macro()).await.unwrap();

run_tests_on_http_server(server_addr, stop_handle).await;
run_tests_on_http_server(server_addr, server_handle).await;
}
21 changes: 16 additions & 5 deletions ws-server/src/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,16 +166,17 @@ impl StopMonitor {
self.0.shutdown_requested.load(Ordering::Relaxed)
}

pub(crate) fn handle(&self) -> StopHandle {
StopHandle(Arc::downgrade(&self.0))
pub(crate) fn handle(&self) -> ServerHandle {
ServerHandle(Arc::downgrade(&self.0))
}
}

/// Handle that is able to stop the running server.
/// Handle that is able to stop the running server or wait for it to finish
/// its execution.
#[derive(Debug, Clone)]
pub struct StopHandle(Weak<MonitorInner>);
pub struct ServerHandle(Weak<MonitorInner>);

impl StopHandle {
impl ServerHandle {
/// Requests server to stop. Returns an error if server was already stopped.
///
/// Returns a future that can be awaited for when the server shuts down.
Expand All @@ -190,6 +191,16 @@ impl StopHandle {
}
}

impl Future for ServerHandle {
type Output = ();

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut shutdown_waiter = ShutdownWaiter(self.0.clone());

shutdown_waiter.poll_unpin(cx)
}
}

/// A `Future` that resolves once the server has stopped.
#[derive(Debug)]
pub struct ShutdownWaiter(Weak<MonitorInner>);
Expand Down
2 changes: 1 addition & 1 deletion ws-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ mod server;
#[cfg(test)]
mod tests;

pub use future::{ShutdownWaiter as WsShutdownWaiter, StopHandle as WsStopHandle};
pub use future::{ServerHandle as WsServerHandle, ShutdownWaiter as WsShutdownWaiter};
pub use jsonrpsee_types as types;
pub use jsonrpsee_utils::server::rpc_module::{RpcModule, SubscriptionSink};
pub use server::{Builder as WsServerBuilder, Server as WsServer};
Loading