-
Notifications
You must be signed in to change notification settings - Fork 35
Commit
- Loading branch information
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
use hydroflow::hydroflow_syntax; | ||
use hydroflow::util::{UdpSink, UdpStream}; | ||
|
||
use crate::helpers::parse_command; | ||
use crate::protocol::ServerResp; | ||
use crate::Opts; | ||
|
||
pub(crate) async fn run_client(outbound: UdpSink, inbound: UdpStream, opts: Opts) { | ||
println!("Client live!"); | ||
|
||
let server_addr = opts.server_addr.unwrap(); | ||
let mut hf = hydroflow_syntax! { | ||
// set up channels | ||
outbound_chan = dest_sink_serde(outbound); | ||
inbound_chan = source_stream_serde(inbound) -> map(Result::unwrap); | ||
|
||
// read in commands from stdin and forward to server | ||
source_stdin() | ||
-> filter_map(|line| parse_command(line.unwrap())) | ||
-> map(|msg| { (msg, server_addr) }) | ||
-> outbound_chan; | ||
|
||
// print inbound msgs | ||
inbound_chan -> for_each(|(response, addr): (ServerResp, _)| println!("Got a Response: {:?} from: {:?}", response, addr)); | ||
}; | ||
|
||
if let Some(graph) = opts.graph { | ||
let serde_graph = hf | ||
.meta_graph() | ||
.expect("No graph found, maybe failed to parse."); | ||
serde_graph.open_graph(graph, opts.write_config).unwrap(); | ||
} | ||
|
||
hf.run_async().await.unwrap(); | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,25 @@ | ||
use regex::Regex; | ||
|
||
use crate::protocol::ServerReq; | ||
|
||
pub fn parse_command(line: String) -> Option<ServerReq> { | ||
let re = Regex::new(r"([A-z]+)\s+(.+)").unwrap(); | ||
let caps = re.captures(line.as_str())?; | ||
|
||
let binding = caps.get(1).unwrap().as_str().to_uppercase(); | ||
let cmdstr = binding.as_str(); | ||
let args = caps.get(2).unwrap().as_str(); | ||
match cmdstr { | ||
"PUT" => { | ||
let kv = args.split_once(',')?; | ||
Some(ServerReq::ClientPut { | ||
key: kv.0.trim().to_string(), | ||
value: kv.1.trim().to_string(), | ||
}) | ||
} | ||
"GET" => Some(ServerReq::ClientGet { | ||
key: args.trim().to_string(), | ||
}), | ||
_ => None, | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
lhs = mod[0] -> tee(); | ||
rhs = mod[1] -> tee(); | ||
|
||
lhs -> [0]joined; | ||
rhs -> [1]joined; | ||
|
||
joined = join_multiset() -> map(|(k, (lhs, rhs))| (k, (lhs, Some(rhs)))) -> combined; | ||
|
||
lhs -> [pos]missed; | ||
rhs -> map(|(k, _v)| k) -> [neg]missed; | ||
|
||
missed = anti_join() | ||
-> map(|(k, v)| (k, (v, None))) | ||
-> combined; | ||
|
||
combined = union() -> mod; |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,223 @@ | ||
use std::net::SocketAddr; | ||
use std::pin::Pin; | ||
Check failure on line 2 in hydroflow/examples/kvs_replicated_v2/main.rs GitHub Actions / Lints (pinned-nightly)
Check warning on line 2 in hydroflow/examples/kvs_replicated_v2/main.rs GitHub Actions / Check (ubuntu-latest, pinned-nightly)
Check warning on line 2 in hydroflow/examples/kvs_replicated_v2/main.rs GitHub Actions / Check (ubuntu-latest, pinned-nightly)
|
||
|
||
use bytes::{Bytes, BytesMut}; | ||
Check failure on line 4 in hydroflow/examples/kvs_replicated_v2/main.rs GitHub Actions / Lints (pinned-nightly)
Check warning on line 4 in hydroflow/examples/kvs_replicated_v2/main.rs GitHub Actions / Check (ubuntu-latest, pinned-nightly)
Check warning on line 4 in hydroflow/examples/kvs_replicated_v2/main.rs GitHub Actions / Check (ubuntu-latest, pinned-nightly)
|
||
use clap::{Parser, ValueEnum}; | ||
use client::run_client; | ||
use futures::stream::{SplitSink, SplitStream}; | ||
use futures::task::noop_waker; | ||
Check failure on line 8 in hydroflow/examples/kvs_replicated_v2/main.rs GitHub Actions / Lints (pinned-nightly)
Check warning on line 8 in hydroflow/examples/kvs_replicated_v2/main.rs GitHub Actions / Check (ubuntu-latest, pinned-nightly)
Check warning on line 8 in hydroflow/examples/kvs_replicated_v2/main.rs GitHub Actions / Check (ubuntu-latest, pinned-nightly)
|
||
use futures::SinkExt; | ||
use hydroflow::lang::graph::{WriteConfig, WriteGraphType}; | ||
use hydroflow::tokio; | ||
use hydroflow::util::{bind_udp_bytes, ipv4_resolve, TcpFramedStream}; | ||
Check failure on line 12 in hydroflow/examples/kvs_replicated_v2/main.rs GitHub Actions / Lints (pinned-nightly)
Check warning on line 12 in hydroflow/examples/kvs_replicated_v2/main.rs GitHub Actions / Check (ubuntu-latest, pinned-nightly)
Check warning on line 12 in hydroflow/examples/kvs_replicated_v2/main.rs GitHub Actions / Check (ubuntu-latest, pinned-nightly)
|
||
use multiplatform_test::multiplatform_test; | ||
Check failure on line 13 in hydroflow/examples/kvs_replicated_v2/main.rs GitHub Actions / Lints (pinned-nightly)
Check warning on line 13 in hydroflow/examples/kvs_replicated_v2/main.rs GitHub Actions / Check (ubuntu-latest, pinned-nightly)
Check warning on line 13 in hydroflow/examples/kvs_replicated_v2/main.rs GitHub Actions / Check (ubuntu-latest, pinned-nightly)
|
||
use server::run_server; | ||
use tokio_util::codec::LengthDelimitedCodec; | ||
use tokio_util::udp::UdpFramed; | ||
|
||
use crate::protocol::{ServerReq, ServerResp}; | ||
|
||
mod client; | ||
mod helpers; | ||
mod protocol; | ||
mod server; | ||
|
||
#[derive(Clone, ValueEnum, Debug)] | ||
enum Role { | ||
Client, | ||
Server, | ||
} | ||
|
||
#[derive(Parser, Debug)] | ||
struct Opts { | ||
#[clap(value_enum, long)] | ||
role: Role, | ||
#[clap(long, value_parser = ipv4_resolve)] | ||
addr: SocketAddr, | ||
#[clap(long, value_parser = ipv4_resolve)] | ||
server_addr: Option<SocketAddr>, | ||
#[clap(long)] | ||
graph: Option<WriteGraphType>, | ||
#[clap(flatten)] | ||
write_config: Option<WriteConfig>, | ||
} | ||
|
||
#[hydroflow::main] | ||
async fn main() { | ||
let opts = Opts::parse(); | ||
let addr = opts.addr; | ||
|
||
match opts.role { | ||
Role::Client => { | ||
let (outbound, inbound, _) = bind_udp_bytes(addr).await; | ||
println!("Client is bound to {:?}", addr); | ||
println!("Attempting to connect to server at {:?}", opts.server_addr); | ||
run_client(outbound, inbound, opts).await; | ||
} | ||
Role::Server => { | ||
run_server(opts.addr).await; | ||
} | ||
} | ||
} | ||
|
||
async fn send( | ||
outbound: &mut SplitSink<UdpFramed<LengthDelimitedCodec>, (Bytes, SocketAddr)>, | ||
x: ServerReq, | ||
addr: SocketAddr, | ||
) { | ||
outbound | ||
.send((hydroflow::util::serialize_to_bytes(x), addr)) | ||
.await | ||
.unwrap(); | ||
} | ||
|
||
async fn read(inbound: &mut SplitStream<UdpFramed<LengthDelimitedCodec>>) -> ServerResp { | ||
use futures::StreamExt; | ||
|
||
let Some(Ok((bytes, src))) = inbound.next().await else { | ||
panic!() | ||
}; | ||
|
||
hydroflow::util::deserialize_from_bytes(bytes).unwrap() | ||
} | ||
|
||
// #[multiplatform_test(hydroflow, env_tracing)] | ||
#[hydroflow::test] | ||
async fn test_server() { | ||
let server_addr_1 = "127.0.0.1:2098".parse().unwrap(); | ||
let server_addr_2: SocketAddr = "127.0.0.1:2096".parse().unwrap(); | ||
|
||
tokio::task::spawn_local(run_server(server_addr_1)); | ||
// tokio::task::spawn_local(run_server(server_addr_2)); | ||
tokio::time::sleep(std::time::Duration::from_millis(100)).await; | ||
|
||
let (mut outbound, mut inbound, _) = bind_udp_bytes("127.0.0.1:0".parse().unwrap()).await; | ||
|
||
send( | ||
&mut outbound, | ||
ServerReq::AddNode { | ||
node_id: server_addr_1, | ||
}, | ||
server_addr_1, | ||
) | ||
.await; | ||
tokio::time::sleep(std::time::Duration::from_millis(100)).await; | ||
|
||
send( | ||
&mut outbound, | ||
ServerReq::AddNode { | ||
node_id: server_addr_2, | ||
}, | ||
server_addr_1, | ||
) | ||
.await; | ||
tokio::time::sleep(std::time::Duration::from_millis(100)).await; | ||
|
||
// send( | ||
// &mut outbound, | ||
// ServerReq::RemoveNode { | ||
// node_id: server_addr_2, | ||
// }, | ||
// server_addr_1, | ||
// ) | ||
// .await; | ||
// tokio::time::sleep(std::time::Duration::from_millis(100)).await; | ||
|
||
send( | ||
&mut outbound, | ||
ServerReq::ClientPut { | ||
key: "mykey".to_owned(), | ||
value: "myval".to_owned(), | ||
}, | ||
server_addr_1, | ||
) | ||
.await; | ||
tokio::time::sleep(std::time::Duration::from_millis(100)).await; | ||
|
||
// send( | ||
// &mut outbound, | ||
// ServerReq::ClientPut { | ||
// key: "mykey".to_owned(), | ||
// value: "myval2".to_owned(), | ||
// }, | ||
// server_addr_1, | ||
// ) | ||
// .await; | ||
// tokio::time::sleep(std::time::Duration::from_millis(100)).await; | ||
|
||
send( | ||
&mut outbound, | ||
ServerReq::ClientGet { | ||
key: "mykey".to_owned(), | ||
}, | ||
server_addr_1, | ||
) | ||
.await; | ||
tokio::time::sleep(std::time::Duration::from_millis(100)).await; | ||
|
||
println!("received from srv: {:?}", read(&mut inbound).await); | ||
|
||
// send( | ||
// &mut outbound, | ||
// ServerReq::ClientGet { | ||
// key: "mykey2".to_owned(), | ||
// }, | ||
// server_addr_1, | ||
// ) | ||
// .await; | ||
// tokio::time::sleep(std::time::Duration::from_millis(100)).await; | ||
|
||
// Need this sleep otherwise the last sent messages won't get processed before the whole process terminates. | ||
tokio::time::sleep(std::time::Duration::from_millis(1000)).await; | ||
} | ||
|
||
#[test] | ||
fn test() { | ||
use std::io::Write; | ||
|
||
use hydroflow::util::{run_cargo_example, wait_for_process_output}; | ||
|
||
let (_server_1, _, mut server_1_stdout) = | ||
run_cargo_example("kvs_replicated_v2", "--role server --addr 127.0.0.1:2051"); | ||
|
||
let mut server_1_output = String::new(); | ||
wait_for_process_output(&mut server_1_output, &mut server_1_stdout, "Server live!"); | ||
|
||
let (_client_1, mut client_1_stdin, mut client_1_stdout) = run_cargo_example( | ||
"kvs_replicated_v2", | ||
"--role client --addr 127.0.0.1:2052 --server-addr 127.0.0.1:2051", | ||
); | ||
|
||
let mut client_1_output = String::new(); | ||
wait_for_process_output(&mut client_1_output, &mut client_1_stdout, "Client live!"); | ||
|
||
client_1_stdin.write_all(b"PUT a,7\n").unwrap(); | ||
|
||
// let (_server_2, _, mut server_2_stdout) = run_cargo_example( | ||
// "kvs_replicated_v2", | ||
// "--role server --addr 127.0.0.1:2053 --server-addr 127.0.0.1:2051", | ||
// ); | ||
|
||
// let (_client_2, mut client_2_stdin, mut client_2_stdout) = run_cargo_example( | ||
// "kvs_replicated_v2", | ||
// "--role client --addr 127.0.0.1:2054 --server-addr 127.0.0.1:2053", | ||
// ); | ||
|
||
// let mut server_2_output = String::new(); | ||
// wait_for_process_output(&mut server_2_output, &mut server_2_stdout, "Server live!"); | ||
// wait_for_process_output( | ||
// &mut server_2_output, | ||
// &mut server_2_stdout, | ||
// r#"Message received PeerGossip \{ key: "a", value: "7" \} from 127\.0\.0\.1:2051"#, | ||
// ); | ||
|
||
// let mut client_2_output = String::new(); | ||
// wait_for_process_output(&mut client_2_output, &mut client_2_stdout, "Client live!"); | ||
|
||
// client_2_stdin.write_all(b"GET a\n").unwrap(); | ||
// wait_for_process_output( | ||
// &mut client_2_output, | ||
// &mut client_2_stdout, | ||
// r#"Got a Response: ServerResponse \{ key: "a", value: "7" \}"#, | ||
// ); | ||
} |