Skip to content

Commit

Permalink
Max/client pool (#5188)
Browse files Browse the repository at this point in the history
* tcp conn tracker

* make default decay const

* first pass connpool

* err handling conpool start

* added notes for next features

* first version working

* first pass spin out client_pool

* cancel token

* logging change

* bump default decay time

* bugfix: make sure to apply gateway score filtering when choosing initial node

* add duplicate packets received to troubleshooting

* client_pool.rs mod

* client pool example

* clippy

* client pool example done

* added disconnect to client pool

* update mod file

* add cancel token disconnect fn

* comments

* comments

* add clone

* added disconnect thread

* update example files tcpproxy

* client pool docs

* remove comments for future ffi push + lower default pool size from 4 to 2

* comment on ffi

* update command help

* clone impl

* remove clone

* fix clippy

* fix clippy again

* fix test

* tweaked text grammar

* updated comment in example

* future is now

* cherry

* cherry

* fix borked rebase

* fix fmt

* wasm fix

---------

Co-authored-by: Jędrzej Stuczyński <[email protected]>
  • Loading branch information
mfahampshire and jstuczyn authored Jan 14, 2025
1 parent 0a47d5d commit e454d71
Show file tree
Hide file tree
Showing 35 changed files with 923 additions and 230 deletions.
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions documentation/docs/pages/developers/rust/_meta.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"development-status": "Development Status",
"mixnet": "Mixnet Module",
"tcpproxy": "TcpProxy Module",
"client-pool": "Client Pool",
"ffi": "FFI",
"tutorials": "Tutorials (Coming Soon)"
}
7 changes: 7 additions & 0 deletions documentation/docs/pages/developers/rust/client-pool.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Client Pool

We have a configurable-size Client Pool for processes that require multiple clients in quick succession (this is used by default by the [`TcpProxyClient`](./tcpproxy) for instance)

This will be useful for developers looking to build connection logic, or just are using raw SDK clients in a sitatuation where there are multiple connections with a lot of churn.

> You can find this code [here](https://github.com/nymtech/nym/blob/develop/sdk/rust/nym-sdk/examples/client_pool.rs)
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{
"architecture": "Architecture",
"example": "Example"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# Client Pool Architecture

## Motivations
In situations where multiple connections are expected, and the number of connections can vary greatly, the Client Pool reduces time spent waiting for the creation of a Mixnet Client blocking your code sending traffic through the Mixnet. Instead, a configurable number of Clients can be generated and run in the background which can be very quickly grabbed, used, and disconnected.

The Pool can be simply run as a background process for the runtime of your program.

## Clients & Lifetimes
The Client Pool creates **ephemeral Mixnet Clients** which are used and then disconnected. Using the [`TcpProxy`](../tcpproxy) as an example, Clients are used for the lifetime of a single incoming TCP connection; after the TCP connection is closed, the Mixnet client is disconnected.

Clients are popped from the pool when in use, and another Client is created to take its place. If connections are coming in faster than Clients are replenished, you can instead generate an ephemeral Client on the fly, or wait; this is up to the developer to decide. You can see an example of this logic in the example on the next page.

## Runtime Loop
Aside from a few helper / getter functions and a graceful `disconnect_pool()`, the Client Pool is mostly made up of a very simple loop around some conditional logic making up `start()`:
- if the number of Clients in the pool is `< client_pool_reserve_number` (set on `new()`) then create more,
- if the number of Clients in the pool `== client_pool_reserve_number` (set on `new()`) then `sleep`,
- if `client_pool_reserve_number == 0` just `sleep`.

`disconnect_pool()` will cause this loop to `break` via cancellation token.
100 changes: 100 additions & 0 deletions documentation/docs/pages/developers/rust/client-pool/example.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
# Client Pool Example

> You can find this code [here](https://github.com/nymtech/nym/blob/develop/sdk/rust/nym-sdk/examples/client_pool.rs)
```rust
use anyhow::Result;
use nym_network_defaults::setup_env;
use nym_sdk::client_pool::ClientPool;
use nym_sdk::mixnet::{MixnetClientBuilder, NymNetworkDetails};
use tokio::signal::ctrl_c;

// This client pool is used internally by the TcpProxyClient but can also be used by the Mixnet module, in case you're quickly swapping clients in and out but won't want to use the TcpProxy module.
//
// Run with: cargo run --example client_pool -- ../../../envs/<NETWORK>.env
#[tokio::main]
async fn main() -> Result<()> {
nym_bin_common::logging::setup_logging();
setup_env(std::env::args().nth(1));

let conn_pool = ClientPool::new(2); // Start the Client Pool with 2 Clients always being kept in reserve
let client_maker = conn_pool.clone();
tokio::spawn(async move {
client_maker.start().await?;
Ok::<(), anyhow::Error>(())
});

println!("\n\nWaiting a few seconds to fill pool\n\n");
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await;

let pool_clone_one = conn_pool.clone();
let pool_clone_two = conn_pool.clone();

tokio::spawn(async move {
let client_one = match pool_clone_one.get_mixnet_client().await {
Some(client) => {
println!("Grabbed client {} from pool", client.nym_address());
client
}
None => {
println!("Not enough clients in pool, creating ephemeral client");
let net = NymNetworkDetails::new_from_env();
let client = MixnetClientBuilder::new_ephemeral()
.network_details(net)
.build()?
.connect_to_mixnet()
.await?;
println!(
"Using {} for the moment, created outside of the connection pool",
client.nym_address()
);
client
}
};
let our_address = client_one.nym_address();
println!("\n\nClient 1: {our_address}\n\n");
client_one.disconnect().await;
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; // Emulate doing something
return Ok::<(), anyhow::Error>(());
});

tokio::spawn(async move {
let client_two = match pool_clone_two.get_mixnet_client().await {
Some(client) => {
println!("Grabbed client {} from pool", client.nym_address());
client
}
None => {
println!("Not enough clients in pool, creating ephemeral client");
let net = NymNetworkDetails::new_from_env();
let client = MixnetClientBuilder::new_ephemeral()
.network_details(net)
.build()?
.connect_to_mixnet()
.await?;
println!(
"Using {} for the moment, created outside of the connection pool",
client.nym_address()
);
client
}
};
let our_address = *client_two.nym_address();
println!("\n\nClient 2: {our_address}\n\n");
client_two.disconnect().await;
tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; // Emulate doing something
return Ok::<(), anyhow::Error>(());
});

wait_for_ctrl_c(conn_pool).await?;
Ok(())
}

async fn wait_for_ctrl_c(pool: ClientPool) -> Result<()> {
println!("\n\nPress CTRL_C to disconnect pool\n\n");
ctrl_c().await?;
println!("CTRL_C received. Killing client pool");
pool.disconnect_pool().await;
Ok(())
}
```
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ In the future the SDK will be made up of several modules, each of which will all
|-----------|---------------------------------------------------------------------------------------|----------|
| Mixnet | Create / load clients & keypairs, subscribe to Mixnet events, send & receive messages | ✔️ |
| TcpProxy | Utilise the TcpProxyClient and TcpProxyServer abstractions for streaming | ✔️ |
| ClientPool| Create a pool of quickly useable Mixnet clients | ✔️ |
| Ecash | Create & verify Ecash credentials ||
| Validator | Sign & broadcast Nyx blockchain transactions, query the blockchain ||

The `Mixnet` module currently exposes the logic of two clients: the [websocket client](../clients/websocket), and the [socks client](../clients/socks5).

The `TcpProxy` module exposes functionality to set up client/server instances that expose a localhost TcpSocket to read/write to.

The `ClientPool` is a configurable pool of ephemeral clients which can be created as a background process and quickly grabbed.
9 changes: 6 additions & 3 deletions documentation/docs/pages/developers/rust/ffi.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ The main functionality of exposed functions will be imported from `sdk/ffi/share

Furthermore, the `shared/` code makes sure that client access is thread-safe, and that client actions happen in blocking threads on the Rust side of the FFI boundary.

### Mixnet Module
## Mixnet Module
This is the basic mixnet component of the SDK, exposing client functionality with which people can build custom interfaces with the Mixnet. These functions are exposed to both Go and C/C++ via the `sdk/ffi/shared/` crate.

| `shared/lib.rs` function | Rust Function |
Expand All @@ -36,13 +36,13 @@ This is the basic mixnet component of the SDK, exposing client functionality wit

> We have also implemented `listen_for_incoming_internal()` which is a wrapper around the Mixnet client's `wait_for_messages()`. This is a helper method for listening out for and handling incoming messages.
#### Currently Unsupported Functionality
### Currently Unsupported Functionality
At the time of writing the following functionality is not exposed to the shared FFI library:
- `split_sender()`: the ability to [split a client into sender and receiver](./mixnet/examples/split-send) for concurrent send/receive.
- The use of [custom network topologies](./mixnet/examples/custom-topology).
- `Socks5::new()`: creation and use of the [socks5/4a/4 proxy client](./mixnet/examples/socks).

### TcpProxy Module
## TcpProxy Module
A connection abstraction which exposes a local TCP socket which developers are able to interact with basically as expected, being able to read/write to/from a bytestream, without really having to take into account the workings of the Mixnet/Sphinx/the [message-based](../concepts/messages) format of the underlying client.

<Callout type="info" emoji="ℹ️">
Expand All @@ -58,3 +58,6 @@ A connection abstraction which exposes a local TCP socket which developers are a
| `proxy_server_new_internal(upstream_address: &str, config_dir: &str, env: Option<String>)` | `NymProxyServer::new(upstream_address, config_dir, env)` |
| `proxy_server_run_internal()` | `NymProxyServer.run_with_shutdown()` |
| `proxy_server_address_internal()` | `NymProxyServer.nym_address()` |

## Client Pool
There are currently no FFI bindings for the Client Pool. This will be coming in the future. The bindings for the TcpProxy have been updated to be able to use the Client Pool under the hood, but the standalone Pool is not yet exposed to FFI.
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,6 @@ Whether the `data` of a SURB request being empty is a feature or a bug is to be
You can find a few helper functions [here](./message-helpers.md) to help deal with this issue in the meantime.

> If you can think of a more succinct or different way of handling this do reach out - we're happy to hear other opinions
## Lots of `duplicate fragment received` messages
You might see a lot of `WARN` level logs about duplicate fragments in your logs, depending on the log level you're using. This occurs when a packet is retransmitted somewhere in the Mixnet, but then the original makes it to the destination client as well. This is not something to do with your client logic, but instead the state of the Mixnet.
3 changes: 2 additions & 1 deletion documentation/docs/pages/developers/rust/tcpproxy/_meta.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
{
"architecture": "Architecture",
"examples": "Examples"
"examples": "Examples",
"troubleshooting": "Troubleshooting"
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ The motivation behind the creation of the `TcpProxy` module is to allow develope

## Clients
Each of the sub-modules exposed by the `TcpProxy` deal with Nym clients in a different way.
- the `NymProxyClient` creates an ephemeral client per new TCP connection, which is closed according to the configurable timeout: we map one ephemeral client per TCP connection. This is to deal with multiple simultaneous streams. In the future, this will be superceded by a connection pool in order to speed up new connections.
- the `NymProxyClient` relies on the [`Client Pool`](../client-pool) to create clients and keep a certain number of them in reserve. If the amount of incoming TCP connections rises quicker than the Client Pool can create clients, or you have the pool size set to `0`, the `TcpProxyClient` creates an ephemeral client per new TCP connection, which is closed according to the configurable timeout: we map one ephemeral client per TCP connection. This is to deal with multiple simultaneous streams.
- the `NymProxyServer` has a single Nym client with a persistent identity.

## Framing
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ use tokio::net::TcpStream;
use tokio::signal;
use tokio_stream::StreamExt;
use tokio_util::codec;
use tokio_util::sync::CancellationToken;
use tracing_subscriber::{fmt, prelude::*, EnvFilter};

#[derive(Serialize, Deserialize, Debug)]
struct ExampleMessage {
Expand All @@ -26,6 +28,8 @@ struct ExampleMessage {
tcp_conn: i8,
}

// This example just starts off a bunch of Tcp connections on a loop to a remote endpoint: in this case the TcpListener behind the NymProxyServer instance on the echo server found in `nym/tools/echo-server/`. It pipes a few messages to it, logs the replies, and keeps track of the number of replies received per connection.
//
// To run:
// - run the echo server with `cargo run`
// - run this example with `cargo run --example tcp_proxy_multistream -- <ECHO_SERVER_NYM_ADDRESS> <ENV_FILE_PATH> <CLIENT_PORT>` e.g.
Expand All @@ -40,32 +44,56 @@ async fn main() -> anyhow::Result<()> {
// Nym client logging is very informative but quite verbose.
// The Message Decay related logging gives you an ideas of the internals of the proxy message ordering: you need to switch
// to DEBUG to see the contents of the msg buffer, sphinx packet chunking, etc.
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
tracing_subscriber::registry()
.with(fmt::layer())
.with(
EnvFilter::new("info")
.add_directive("nym_sdk::client_pool=info".parse().unwrap())
.add_directive("nym_sdk::tcp_proxy_client=debug".parse().unwrap()),
)
.init();

let env_path = env::args().nth(2).expect("Env file not specified");
let env = env_path.to_string();

let listen_port = env::args().nth(3).expect("Port not specified");

// Within the TcpProxyClient, individual client shutdown is triggered by the timeout.
// Within the TcpProxyClient, individual client shutdown is triggered by the timeout. The final argument is how many clients to keep in reserve in the client pool when running the TcpProxy.
let proxy_client =
tcp_proxy::NymProxyClient::new(server, "127.0.0.1", &listen_port, 45, Some(env)).await?;
tcp_proxy::NymProxyClient::new(server, "127.0.0.1", &listen_port, 45, Some(env), 2).await?;

// For our disconnect() logic below
let proxy_clone = proxy_client.clone();

tokio::spawn(async move {
proxy_client.run().await?;
Ok::<(), anyhow::Error>(())
});

let example_cancel_token = CancellationToken::new();
let client_cancel_token = example_cancel_token.clone();
let watcher_cancel_token = example_cancel_token.clone();

// Cancel listener thread
tokio::spawn(async move {
signal::ctrl_c().await?;
println!(":: CTRL_C received, shutting down + cleanup up proxy server config files");
watcher_cancel_token.cancel();
proxy_clone.disconnect().await;
Ok::<(), anyhow::Error>(())
});

println!("waiting for everything to be set up..");
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
println!("done. sending bytes");

// In the info traces you will see the different session IDs being set up, one for each TcpStream.
for i in 0..4 {
for i in 0..8 {
let client_cancel_inner_token = client_cancel_token.clone();
if client_cancel_token.is_cancelled() {
break;
}
let conn_id = i;
println!("Starting TCP connection {}", conn_id);
let local_tcp_addr = format!("127.0.0.1:{}", listen_port.clone());
tokio::spawn(async move {
// Now the client and server proxies are running we can create and pipe traffic to/from
Expand All @@ -81,7 +109,10 @@ async fn main() -> anyhow::Result<()> {

// Lets just send a bunch of messages to the server with variable delays between them, with a message and tcp connection ids to keep track of ordering on the server side (for illustrative purposes **only**; keeping track of anonymous replies is handled by the proxy under the hood with Single Use Reply Blocks (SURBs); for this illustration we want some kind of app-level message id, but irl most of the time you'll probably be parsing on e.g. the incoming response type instead)
tokio::spawn(async move {
for i in 0..4 {
for i in 0..8 {
if client_cancel_inner_token.is_cancelled() {
break;
}
let mut rng = SmallRng::from_entropy();
let delay: f64 = rng.gen_range(2.5..5.0);
tokio::time::sleep(tokio::time::Duration::from_secs_f64(delay)).await;
Expand All @@ -96,12 +127,7 @@ async fn main() -> anyhow::Result<()> {
.write_all(&serialised)
.await
.expect("couldn't write to stream");
println!(
">> client sent {}: {} bytes on conn {}",
&i,
msg.message_bytes.len(),
&conn_id
);
println!(">> client sent msg {} on conn {}", &i, &conn_id);
}
Ok::<(), anyhow::Error>(())
});
Expand All @@ -113,17 +139,8 @@ async fn main() -> anyhow::Result<()> {
while let Some(Ok(bytes)) = framed_read.next().await {
match bincode::deserialize::<ExampleMessage>(&bytes) {
Ok(msg) => {
println!(
"<< client received {}: {} bytes on conn {}",
msg.message_id,
msg.message_bytes.len(),
msg.tcp_conn
);
reply_counter += 1;
println!(
"tcp connection {} replies received {}/4",
msg.tcp_conn, reply_counter
);
println!("<< conn {} received {}/8", msg.tcp_conn, reply_counter);
}
Err(e) => {
println!("<< client received something that wasn't an example message of {} bytes. error: {}", bytes.len(), e);
Expand All @@ -138,15 +155,12 @@ async fn main() -> anyhow::Result<()> {
tokio::time::sleep(tokio::time::Duration::from_secs_f64(delay)).await;
}

// Once timeout is passed, you can either wait for graceful shutdown or just hard stop it.
signal::ctrl_c().await?;
println!("CTRL+C received, shutting down");
Ok(())
}

// emulate a series of small messages followed by a closing larger one
fn gen_bytes_fixed(i: usize) -> Vec<u8> {
let amounts = [10, 15, 50, 1000];
let amounts = [10, 15, 50, 1000, 10, 15, 500, 2000];
let len = amounts[i];
let mut rng = rand::thread_rng();
(0..len).map(|_| rng.gen::<u8>()).collect()
Expand Down
Loading

0 comments on commit e454d71

Please sign in to comment.