Skip to content

Commit

Permalink
feat(hydro_cli): added basic wrapper for hydro deploy Maelstrom integ…
Browse files Browse the repository at this point in the history
…ration
  • Loading branch information
Ryan Alameddine committed Oct 19, 2023
1 parent 3136e0f commit 43c5713
Show file tree
Hide file tree
Showing 9 changed files with 547 additions and 0 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ members = [
"benches",
"hydro_cli",
"hydro_cli_examples",
"hydro_cli_maelstrom",
"hydroflow",
"hydroflow_cli_integration",
"hydroflow_datalog",
Expand Down
9 changes: 9 additions & 0 deletions hydro_cli/src/core/hydroflow_crate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,15 @@ impl Service for HydroflowCrate {
)
.await?;

// send the id over
binary
.write()
.await
.stdin()
.await
.send(format!("id: {}\n", self.id))
.await?;

let mut bind_config = HashMap::new();
for (port_name, bind_type) in self.port_to_bind.iter() {
bind_config.insert(port_name.clone(), launched_host.server_config(bind_type));
Expand Down
3 changes: 3 additions & 0 deletions hydro_cli_examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,9 @@ name = "pn_counter_delta"
[[example]]
name = "ws_chat_server"

[[example]]
name = "maelstrom_unique_id"

[dev-dependencies]
hydroflow = { path = "../hydroflow", features = [ "cli_integration" ] }
hydroflow_datalog = { path = "../hydroflow_datalog" }
Expand Down
71 changes: 71 additions & 0 deletions hydro_cli_examples/examples/echo/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use hydroflow::hydroflow_syntax;
use hydroflow::util::cli::{ConnectedDirect, ConnectedSink, ConnectedSource};
use hydroflow::util::serialize_to_bytes;
use serde::{Deserialize, Serialize};
use serde_json::Value;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EchoMsg {
pub msg_id: Value,
pub echo: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EchoOkMsg {
pub echo: String,
pub in_reply_to: Value,
}

impl EchoMsg {
/// Generate EchoOkMsg response to this EchoMsg
fn response(
EchoMsg {
echo,
msg_id: source_msg_id,
}: Self,
) -> EchoOkMsg {
EchoOkMsg {
echo,
in_reply_to: source_msg_id,
}
}
}

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;

// TODO: use ConnectedDemux?
let echo_in = ports
.port("echo_in")
.connect::<ConnectedDirect>()
.await
.into_source();
let echo_out = ports
.port("echo_out")
.connect::<ConnectedDirect>()
.await
.into_sink();

let df = hydroflow_syntax! {
input = source_stream(echo_in)
-> map(Result::unwrap)
-> map(|x| x.to_vec())
-> map(String::from_utf8)
-> map(Result::unwrap);

output = map(|x| serde_json::to_string(&x))
-> map(Result::unwrap)
-> map(serialize_to_bytes)
-> dest_sink(echo_out);


input
-> map(|x| serde_json::from_str::<EchoMsg>(&x).unwrap())
//-> map(|x| EchoMsg {msg_id: x.msg_id, echo: x.echo + "hi"})
-> map(EchoMsg::response)
-> output;
};

hydroflow::util::cli::launch_flow(df).await;
}
68 changes: 68 additions & 0 deletions hydro_cli_examples/examples/maelstrom_unique_id/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use hydroflow::hydroflow_syntax;
use hydroflow::util::cli::{ConnectedDirect, ConnectedSink, ConnectedSource};
use hydroflow::util::serialize_to_bytes;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Generate {
pub msg_id: Value,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GenerateOk {
pub id: Value,
pub in_reply_to: Value,
}

impl Generate {
/// Generate GenerateOk response to this Generate message
pub fn respond(self, i: usize, node_id: &str) -> GenerateOk {
let id = json!([i, node_id]);

GenerateOk {
id,
in_reply_to: self.msg_id,
}
}
}

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;
let node_id = ports.node_id.clone();

// TODO: use ConnectedDemux?
let gen_in = ports
.port("gen_in")
.connect::<ConnectedDirect>()
.await
.into_source();
let ok_out = ports
.port("ok_out")
.connect::<ConnectedDirect>()
.await
.into_sink();

let df = hydroflow_syntax! {
input = source_stream(gen_in)
-> map(Result::unwrap)
-> map(|x| x.to_vec())
-> map(String::from_utf8)
-> map(Result::unwrap);

output = map(|x| serde_json::to_string(&x))
-> map(Result::unwrap)
-> map(serialize_to_bytes)
-> dest_sink(ok_out);


input
-> map(|x| serde_json::from_str::<Generate>(&x).unwrap())
-> enumerate::<'static>() //-> enumerate() will fail!
-> map(|(i, x)| x.respond(i, &node_id))
-> output;
};

hydroflow::util::cli::launch_flow(df).await;
}
21 changes: 21 additions & 0 deletions hydro_cli_maelstrom/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
[package]
name = "hydro_cli_maelstrom"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
serde_json = "1"
serde = { version = "1", features = [ "derive" ] }
futures = { version = "0.3" }
bytes = "1.1.0"
bincode = "1.3"

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio = { version = "1.16", features = [ "full" ] }
tokio-util = { version = "0.7.4", features = [ "net", "codec" ] }

[target.'cfg(target_arch = "wasm32")'.dependencies]
tokio = { version = "1.16", features = [ "rt" , "sync", "macros", "io-util", "time" ] }
tokio-util = { version = "0.7.4", features = [ "codec" ] }
Loading

0 comments on commit 43c5713

Please sign in to comment.