Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(hydro_cli): added basic wrapper for hydro deploy Maelstrom integration #936

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ members = [
"benches",
"hydro_cli",
"hydro_cli_examples",
"hydro_cli_maelstrom",
"hydroflow",
"hydroflow_cli_integration",
"hydroflow_datalog",
Expand Down
11 changes: 10 additions & 1 deletion hydro_cli/src/core/custom_service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::any::Any;
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::{Arc, Weak};

Expand Down Expand Up @@ -65,11 +66,19 @@
Ok(())
}

async fn start(&mut self) {}
async fn start(&mut self, names: &HashMap<usize, String>) {}

Check warning on line 69 in hydro_cli/src/core/custom_service.rs

View workflow job for this annotation

GitHub Actions / Check (pinned-nightly)

unused variable: `names`

Check failure on line 69 in hydro_cli/src/core/custom_service.rs

View workflow job for this annotation

GitHub Actions / Lints (pinned-nightly)

unused variable: `names`

Check warning on line 69 in hydro_cli/src/core/custom_service.rs

View workflow job for this annotation

GitHub Actions / Check (pinned-nightly)

unused variable: `names`

Check warning on line 69 in hydro_cli/src/core/custom_service.rs

View workflow job for this annotation

GitHub Actions / Test Suite (--lib --bins, pinned-nightly)

unused variable: `names`

Check warning on line 69 in hydro_cli/src/core/custom_service.rs

View workflow job for this annotation

GitHub Actions / Test Suite (--benches, pinned-nightly)

unused variable: `names`

Check warning on line 69 in hydro_cli/src/core/custom_service.rs

View workflow job for this annotation

GitHub Actions / Test Suite (--tests, pinned-nightly)

unused variable: `names`

async fn stop(&mut self) -> Result<()> {
Ok(())
}

fn name(&self) -> String {
self._id.to_string()
}

fn id(&self) -> usize {
self._id
}
}

pub struct CustomClientPort {
Expand Down
22 changes: 20 additions & 2 deletions hydro_cli/src/core/deployment.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use std::collections::HashMap;
use std::sync::{Arc, Weak};

use anyhow::Result;
use futures::future::join_all;
use tokio::sync::RwLock;

use super::{progress, Host, ResourcePool, ResourceResult, Service};
Expand Down Expand Up @@ -57,7 +59,7 @@
.map(|host: &mut Arc<RwLock<dyn Host>>| async {
host.write().await.provision(&result).await;
});
futures::future::join_all(hosts_provisioned)

Check warning on line 62 in hydro_cli/src/core/deployment.rs

View workflow job for this annotation

GitHub Actions / Check (pinned-nightly)

unnecessary qualification

Check failure on line 62 in hydro_cli/src/core/deployment.rs

View workflow job for this annotation

GitHub Actions / Lints (pinned-nightly)

unnecessary qualification

Check warning on line 62 in hydro_cli/src/core/deployment.rs

View workflow job for this annotation

GitHub Actions / Check (pinned-nightly)

unnecessary qualification

Check warning on line 62 in hydro_cli/src/core/deployment.rs

View workflow job for this annotation

GitHub Actions / Test Suite (--lib --bins, pinned-nightly)

unnecessary qualification

Check warning on line 62 in hydro_cli/src/core/deployment.rs

View workflow job for this annotation

GitHub Actions / Test Suite (--benches, pinned-nightly)

unnecessary qualification

Check warning on line 62 in hydro_cli/src/core/deployment.rs

View workflow job for this annotation

GitHub Actions / Test Suite (--tests, pinned-nightly)

unnecessary qualification
})
.await;

Expand All @@ -75,7 +77,7 @@
.await;
});

futures::future::join_all(services_future)

Check warning on line 80 in hydro_cli/src/core/deployment.rs

View workflow job for this annotation

GitHub Actions / Check (pinned-nightly)

unnecessary qualification

Check failure on line 80 in hydro_cli/src/core/deployment.rs

View workflow job for this annotation

GitHub Actions / Lints (pinned-nightly)

unnecessary qualification

Check warning on line 80 in hydro_cli/src/core/deployment.rs

View workflow job for this annotation

GitHub Actions / Check (pinned-nightly)

unnecessary qualification

Check warning on line 80 in hydro_cli/src/core/deployment.rs

View workflow job for this annotation

GitHub Actions / Test Suite (--lib --bins, pinned-nightly)

unnecessary qualification

Check warning on line 80 in hydro_cli/src/core/deployment.rs

View workflow job for this annotation

GitHub Actions / Test Suite (--benches, pinned-nightly)

unnecessary qualification

Check warning on line 80 in hydro_cli/src/core/deployment.rs

View workflow job for this annotation

GitHub Actions / Test Suite (--tests, pinned-nightly)

unnecessary qualification
})
.await;

Expand Down Expand Up @@ -106,14 +108,30 @@
.collect::<Vec<_>>();
self.services = active_services;

let node_names: HashMap<usize, String> =
join_all(self.services.iter().map(|service| async {
let service = service.upgrade().unwrap();
let service = service.read().await;
(service.id(), service.name())
}))
.await
.into_iter()
.collect();

let all_services_start =
self.services
.iter()
.map(|service: &Weak<RwLock<dyn Service>>| async {
service.upgrade().unwrap().write().await.start().await;
service
.upgrade()
.unwrap()
.write()
.await
.start(&node_names)
.await;
});

futures::future::join_all(all_services_start).await;
join_all(all_services_start).await;
}

pub fn add_host<T: Host + 'static, F: FnOnce(usize) -> T>(
Expand Down
18 changes: 15 additions & 3 deletions hydro_cli/src/core/hydroflow_crate/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ impl Service for HydroflowCrate {
.await
}

async fn start(&mut self) {
async fn start(&mut self, names: &HashMap<usize, String>) {
if self.started {
return;
}
Expand All @@ -296,7 +296,9 @@ impl Service for HydroflowCrate {
sink_ports.insert(port_name.clone(), outgoing.load_instantiated(&|p| p).await);
}

let formatted_defns = serde_json::to_string(&sink_ports).unwrap();
let payload = (&sink_ports, self.id, names);

let formatted_start = serde_json::to_string(&payload).unwrap();

self.launched_binary
.as_mut()
Expand All @@ -305,7 +307,7 @@ impl Service for HydroflowCrate {
.await
.stdin()
.await
.send(format!("start: {formatted_defns}\n"))
.send(format!("start: {formatted_start}\n"))
.await
.unwrap();

Expand Down Expand Up @@ -333,4 +335,14 @@ impl Service for HydroflowCrate {

Ok(())
}

fn name(&self) -> String {
self.display_id
.clone()
.unwrap_or_else(|| format!("service/{}", self.id))
}

fn id(&self) -> usize {
self.id
}
}
8 changes: 7 additions & 1 deletion hydro_cli/src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,8 +186,14 @@ pub trait Service: Send + Sync {
async fn ready(&mut self) -> Result<()>;

/// Starts the service by having it connect to other services and start computations.
async fn start(&mut self);
/// Takes in a map from service id to service name for all services.
async fn start(&mut self, names: &HashMap<usize, String>);

/// Stops the service by having it disconnect from other services and stop computations.
async fn stop(&mut self) -> Result<()>;

/// Returns the id of the service
fn id(&self) -> usize;
/// Returns the display name of the service
fn name(&self) -> String;
}
6 changes: 6 additions & 0 deletions hydro_cli_examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ name = "pn_counter_delta"
[[example]]
name = "ws_chat_server"

[[example]]
name = "maelstrom_unique_id"

[[example]]
name = "maelstrom_broadcast"

[dev-dependencies]
hydroflow = { path = "../hydroflow", features = [ "cli_integration" ] }
hydroflow_datalog = { path = "../hydroflow_datalog" }
Expand Down
71 changes: 71 additions & 0 deletions hydro_cli_examples/examples/echo/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
use hydroflow::hydroflow_syntax;
use hydroflow::util::cli::{ConnectedDirect, ConnectedSink, ConnectedSource};
use hydroflow::util::serialize_to_bytes;
use serde::{Deserialize, Serialize};
use serde_json::Value;

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EchoMsg {
pub msg_id: Value,
pub echo: String,
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EchoOkMsg {
pub echo: String,
pub in_reply_to: Value,
}

impl EchoMsg {
/// Generate EchoOkMsg response to this EchoMsg
fn response(
EchoMsg {
echo,
msg_id: source_msg_id,
}: Self,
) -> EchoOkMsg {
EchoOkMsg {
echo,
in_reply_to: source_msg_id,
}
}
}

#[hydroflow::main]
async fn main() {
let mut ports = hydroflow::util::cli::init().await;

// TODO: use ConnectedDemux?
let echo_in = ports
.port("echo_in")
.connect::<ConnectedDirect>()
.await
.into_source();
let echo_out = ports
.port("echo_out")
.connect::<ConnectedDirect>()
.await
.into_sink();

let df = hydroflow_syntax! {
input = source_stream(echo_in)
-> map(Result::unwrap)
-> map(|x| x.to_vec())
-> map(String::from_utf8)
-> map(Result::unwrap);

output = map(|x| serde_json::to_string(&x))
-> map(Result::unwrap)
-> map(serialize_to_bytes)
-> dest_sink(echo_out);


input
-> map(|x| serde_json::from_str::<EchoMsg>(&x).unwrap())
//-> map(|x| EchoMsg {msg_id: x.msg_id, echo: x.echo + "hi"})
-> map(EchoMsg::response)
-> output;
};

hydroflow::util::cli::launch_flow(df).await;
}
Loading
Loading