Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for distributed deployments with multiple daemons #256

Merged
merged 33 commits into from
Apr 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
6ce8b7a
Allow configuring deploy machine
phil-opp Apr 13, 2023
d81de13
Fill in default machine
phil-opp Apr 13, 2023
ccf0ee7
Create `spawn_dataflow_on_machine` function
phil-opp Apr 13, 2023
1b755fb
Only start nodes of own machine in daemon
phil-opp Apr 13, 2023
7b8de70
Set deploy defaults when resolving aliases
phil-opp Apr 13, 2023
b1d9b58
Create new `ResolvedDeploy` type with non-`Option` fields
phil-opp Apr 13, 2023
a59e633
Add spawn logging in coordinator
phil-opp Apr 13, 2023
20d954a
Add TODOs in daemon for notifying remote nodes
phil-opp Apr 13, 2023
89cc6e4
Keep `dora-coordinator` connection open
phil-opp Apr 13, 2023
7397bcc
Forward outputs through coordinator to target machine and nodes
phil-opp Apr 13, 2023
c515651
Start working on a example that uses multiple daemons
phil-opp Apr 17, 2023
4f649e8
Only set ctrl-c handler in deamon when it's run as a binary
phil-opp Apr 18, 2023
852fc2c
Synchronize node startup with coordinator
phil-opp Apr 18, 2023
188813c
Make coordinator usable in examples
phil-opp Apr 18, 2023
3f5f412
Add machine ID to daemon trace messages
phil-opp Apr 18, 2023
9f76239
Run distributed dataflow in `multiple-daemons` example
phil-opp Apr 18, 2023
c523b3b
Report closed inputs to remote daemons
phil-opp Apr 18, 2023
17c52e3
Use control messages in `multiple-daemons` example instead of sleeping
phil-opp Apr 18, 2023
cff3ebd
Run `multiple-daemons` example on CI
phil-opp Apr 18, 2023
647d5e2
Add daemon arguments for setting machine ID and coordinator addr
phil-opp Apr 18, 2023
62fd4f1
Merge branch 'main' into multiple-daemons
phil-opp Apr 18, 2023
d6571bf
Rework communication config
phil-opp Apr 19, 2023
72a57ce
Remove deprecated `communication` options from dataflow examples, tem…
phil-opp Apr 19, 2023
aac5e65
Integrate `dora-runtime` into `dora-daemon`
phil-opp Apr 19, 2023
18cadab
Merge pull request #257 from dora-rs/remove-runtime-binary
phil-opp Apr 19, 2023
d535dfe
Prefix new YAML keys with `_unstable_`
phil-opp Apr 27, 2023
c28427d
Pass parsed dataflow descriptor instead of path
phil-opp Apr 27, 2023
a72f17b
Deny unknown fields in operator and deploy config
phil-opp Apr 27, 2023
bf12a7b
Canoncialize dataflow path to determine working dir
phil-opp Apr 27, 2023
504e98e
Fix: `deny_unknown_fields` is not supported in combination with `flat…
phil-opp Apr 27, 2023
f621920
Fix `multiple-daemons` example: Use new `_unstable_` prefix for deplo…
phil-opp Apr 27, 2023
8bf124a
Merge branch 'main' into multiple-daemons
phil-opp Apr 27, 2023
e13415d
Send cross-machine outputs directly to target daemon without involvin…
phil-opp Apr 27, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ jobs:
timeout-minutes: 30
run: cargo run --example rust-dataflow

- name: "Multiple Daemons example"
timeout-minutes: 30
run: cargo run --example multiple-daemons

- name: "Benchmark example"
timeout-minutes: 30
run: cargo run --example benchmark --release
Expand Down Expand Up @@ -82,7 +86,6 @@ jobs:
run: |
cargo install --path binaries/coordinator
cargo install --path binaries/daemon
cargo install --path binaries/runtime
cargo install --path binaries/cli

- name: "Test CLI"
Expand Down
9 changes: 2 additions & 7 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,8 @@ jobs:

- name: "Build binaries"
timeout-minutes: 60
run: "cargo build --release -p dora-runtime
-p dora-coordinator -p dora-cli -p dora-daemon"

run: "cargo build --release -p dora-coordinator -p dora-cli -p dora-daemon"

- name: "Publish packages on `crates.io`"
if: runner.os == 'Linux'
run: |
Expand Down Expand Up @@ -65,14 +64,11 @@ jobs:
cargo publish -p dora-cli --token ${{ secrets.CARGO_REGISTRY_TOKEN }}
cargo publish -p dora-coordinator --token ${{ secrets.CARGO_REGISTRY_TOKEN }}
cargo publish -p dora-daemon --token ${{ secrets.CARGO_REGISTRY_TOKEN }}
cargo publish -p dora-runtime --token ${{ secrets.CARGO_REGISTRY_TOKEN }}


- name: "Create Archive (Unix)"
if: runner.os == 'Linux' || runner.os == 'macOS'
run: |
mkdir archive
cp target/release/dora-runtime archive
cp target/release/dora-coordinator archive
cp target/release/dora-daemon archive
cp target/release/dora-cli archive/dora
Expand All @@ -85,7 +81,6 @@ jobs:
shell: powershell
run: |
New-Item -Path archive -ItemType Directory
Copy-Item target/release/dora-runtime.exe -Destination archive
Copy-Item target/release/dora-coordinator.exe -Destination archive
Copy-Item target/release/dora-daemon.exe -Destination archive
Copy-Item target/release/dora-cli.exe -Destination archive/dora.exe
Expand Down
32 changes: 32 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ members = [
"binaries/runtime",
"examples/rust-dataflow/*",
"examples/benchmark/*",
"examples/multiple-daemons/*",
"libraries/communication-layer/*",
"libraries/core",
"libraries/message",
Expand Down Expand Up @@ -46,6 +47,7 @@ communication-layer-request-reply = { version = "0.2.2", path = "libraries/commu
dora-message = { version = "0.2.2", path = "libraries/message" }
dora-runtime = { version = "0.2.2", path = "binaries/runtime" }
dora-daemon = { version = "0.2.2", path = "binaries/daemon" }
dora-coordinator = { version = "0.2.2", path = "binaries/coordinator" }

[package]
name = "dora-examples"
Expand All @@ -58,12 +60,16 @@ license = "Apache-2.0"
eyre = "0.6.8"
tokio = "1.24.2"
dora-daemon = { workspace = true }
dora-coordinator = { workspace = true }
dora-core = { workspace = true }
dunce = "1.0.2"
serde_yaml = "0.8.23"
uuid = { version = "1.2.1", features = ["v4", "serde"] }
tracing = "0.1.36"
tracing-subscriber = "0.3.15"
futures = "0.3.25"
tokio-stream = "0.1.11"
clap = { version = "4.0.3", features = ["derive"] }

[[example]]
name = "c-dataflow"
Expand Down Expand Up @@ -92,3 +98,7 @@ path = "examples/python-operator-dataflow/run.rs"
[[example]]
name = "benchmark"
path = "examples/benchmark/run.rs"

[[example]]
name = "multiple-daemons"
path = "examples/multiple-daemons/run.rs"
4 changes: 0 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,6 @@ You need to add the created operators/nodes to your dataflow YAML file.
8. You can also download already implemented operators by putting links in the dataflow. This example will launch a webcam plot stream.

```yaml
communication:
zenoh:
prefix: abc_project

nodes:
- id: op_1
operator:
Expand Down
2 changes: 1 addition & 1 deletion binaries/cli/src/attach.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ pub fn attach_dataflow(
// Generate path hashmap
let mut node_path_lookup = HashMap::new();

let nodes = dataflow.resolve_aliases();
let nodes = dataflow.resolve_aliases_and_set_defaults();

let working_dir = dataflow_path
.canonicalize()
Expand Down
45 changes: 28 additions & 17 deletions binaries/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ enum Command {
Check {
#[clap(long)]
dataflow: Option<PathBuf>,
#[clap(long)]
runtime_path: Option<PathBuf>,
},
/// Generate a visualization of the given graph using mermaid.js. Use --open to open browser.
Graph {
Expand Down Expand Up @@ -132,12 +130,15 @@ fn run() -> eyre::Result<()> {
let args = Args::parse();

match args.command {
Command::Check {
dataflow,
runtime_path,
} => match dataflow {
Command::Check { dataflow } => match dataflow {
Some(dataflow) => {
Descriptor::blocking_read(&dataflow)?.check(&dataflow, runtime_path)?;
let working_dir = dataflow
.canonicalize()
.context("failed to canonicalize dataflow path")?
.parent()
.ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))?
.to_owned();
Descriptor::blocking_read(&dataflow)?.check(&working_dir)?;
check::check_environment()?
}
None => check::check_environment()?,
Expand Down Expand Up @@ -171,18 +172,29 @@ fn run() -> eyre::Result<()> {
attach,
hot_reload,
} => {
let dataflow_description =
let dataflow_descriptor =
Descriptor::blocking_read(&dataflow).wrap_err("Failed to read yaml dataflow")?;
dataflow_description
.check(&dataflow, None)
let working_dir = dataflow
.canonicalize()
.context("failed to canonicalize dataflow path")?
.parent()
.ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))?
.to_owned();
dataflow_descriptor
.check(&working_dir)
.wrap_err("Could not validate yaml")?;
let mut session =
connect_to_coordinator().wrap_err("failed to connect to dora coordinator")?;
let dataflow_id = start_dataflow(dataflow.clone(), name, &mut *session)?;
let dataflow_id = start_dataflow(
dataflow_descriptor.clone(),
name,
working_dir,
&mut *session,
)?;

if attach {
attach_dataflow(
dataflow_description,
dataflow_descriptor,
dataflow,
dataflow_id,
&mut *session,
Expand Down Expand Up @@ -212,18 +224,17 @@ fn run() -> eyre::Result<()> {
}

fn start_dataflow(
dataflow: PathBuf,
dataflow: Descriptor,
name: Option<String>,
local_working_dir: PathBuf,
session: &mut TcpRequestReplyConnection,
) -> Result<Uuid, eyre::ErrReport> {
let canonicalized = dataflow
.canonicalize()
.wrap_err("given dataflow file does not exist")?;
let reply_raw = session
.request(
&serde_json::to_vec(&ControlRequest::Start {
dataflow_path: canonicalized,
dataflow,
name,
local_working_dir,
})
.unwrap(),
)
Expand Down
4 changes: 0 additions & 4 deletions binaries/cli/src/template/c/dataflow-template.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
communication:
zenoh:
prefix: ___name___

nodes:
- id: op_1
operator:
Expand Down
4 changes: 0 additions & 4 deletions binaries/cli/src/template/cxx/dataflow-template.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
communication:
zenoh:
prefix: ___name___

nodes:
- id: runtime-node_1
operators:
Expand Down
4 changes: 0 additions & 4 deletions binaries/cli/src/template/python/dataflow-template.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,3 @@
communication:
zenoh:
prefix: ___name___

nodes:
- id: op_1
operator:
Expand Down
28 changes: 12 additions & 16 deletions binaries/cli/src/template/rust/dataflow-template.yml
Original file line number Diff line number Diff line change
@@ -1,24 +1,20 @@
communication:
zenoh:
prefix: ___name___

nodes:
- id: op_1
operator:
build: cargo build -p op_1
shared-library: target/debug/op_1
inputs:
tick: dora/timer/millis/100
outputs:
- some-output
build: cargo build -p op_1
shared-library: target/debug/op_1
inputs:
tick: dora/timer/millis/100
outputs:
- some-output
- id: op_2
operator:
build: cargo build -p op_2
shared-library: target/debug/op_2
inputs:
tick: dora/timer/secs/2
outputs:
- some-output
build: cargo build -p op_2
shared-library: target/debug/op_2
inputs:
tick: dora/timer/secs/2
outputs:
- some-output

- id: custom-node_1
custom:
Expand Down
1 change: 1 addition & 0 deletions binaries/coordinator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ serde_json = "1.0.86"
which = "4.3.0"
thiserror = "1.0.37"
ctrlc = "3.2.5"
clap = { version = "3.1.8", features = ["derive"] }
Loading