Skip to content

Commit

Permalink
Refuse relative path for remote in coordinator (#538)
Browse files Browse the repository at this point in the history
* refuse pass relative path to remote daemon

* add local ip checker in dora start

* delete some additional packages on CI to make enough space available

* add coodinator_is_remote argument
  • Loading branch information
XxChang authored Jun 12, 2024
1 parent 4f39c50 commit 799a3a6
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 10 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ jobs:
with:
# this might remove tools that are actually needed,
# if set to "true" but frees about 6 GB
tool-cache: false
tool-cache: true

# all of these default to true, but feel free to set to
# "false" if necessary for your workflow
Expand All @@ -96,7 +96,7 @@ jobs:
haskell: true
large-packages: false
docker-images: true
swap-storage: false
swap-storage: true
- name: Free disk Space (Windows)
if: runner.os == 'Windows'
run: |
Expand Down
10 changes: 7 additions & 3 deletions binaries/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,9 +332,13 @@ fn run() -> eyre::Result<()> {
.parent()
.ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))?
.to_owned();
dataflow_descriptor
.check(&working_dir)
.wrap_err("Could not validate yaml")?;
if !coordinator_addr.is_loopback() {
dataflow_descriptor.check_in_daemon(&working_dir, &[], true)?;
} else {
dataflow_descriptor
.check(&working_dir)
.wrap_err("Could not validate yaml")?;
}

let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into())
.wrap_err("failed to connect to dora coordinator")?;
Expand Down
12 changes: 11 additions & 1 deletion binaries/coordinator/src/run/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,17 @@ pub(super) async fn spawn_dataflow(
daemon_connections: &mut HashMap<String, DaemonConnection>,
clock: &HLC,
) -> eyre::Result<SpawnedDataflow> {
dataflow.check(&working_dir)?;
let remote_machine_id: Vec<_> = daemon_connections
.iter()
.filter_map(|(id, c)| {
if !c.listen_socket.ip().is_loopback() {
Some(id.as_str())
} else {
None
}
})
.collect();
dataflow.check_in_daemon(&working_dir, &remote_machine_id, false)?;

let nodes = dataflow.resolve_aliases_and_set_defaults()?;
let uuid = Uuid::new_v7(Timestamp::now(NoContext));
Expand Down
18 changes: 17 additions & 1 deletion libraries/core/src/descriptor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,23 @@ impl Descriptor {
}

pub fn check(&self, working_dir: &Path) -> eyre::Result<()> {
validate::check_dataflow(self, working_dir).wrap_err("Dataflow could not be validated.")
validate::check_dataflow(self, working_dir, None, false)
.wrap_err("Dataflow could not be validated.")
}

pub fn check_in_daemon(
&self,
working_dir: &Path,
remote_machine_id: &[&str],
coordinator_is_remote: bool,
) -> eyre::Result<()> {
validate::check_dataflow(
self,
working_dir,
Some(remote_machine_id),
coordinator_is_remote,
)
.wrap_err("Dataflow could not be validated.")
}
}

Expand Down
33 changes: 30 additions & 3 deletions libraries/core/src/descriptor/validate.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::{
adjust_shared_library_path,
config::{DataId, Input, InputMapping, OperatorId, UserInputMapping},
descriptor::{self, source_is_url, CoreNodeKind, OperatorSource},
descriptor::{self, source_is_url, CoreNodeKind, OperatorSource, EXE_EXTENSION},
get_python_path,
};

Expand All @@ -12,18 +12,45 @@ use tracing::info;
use super::{resolve_path, Descriptor, SHELL_SOURCE};
const VERSION: &str = env!("CARGO_PKG_VERSION");

pub fn check_dataflow(dataflow: &Descriptor, working_dir: &Path) -> eyre::Result<()> {
pub fn check_dataflow(
dataflow: &Descriptor,
working_dir: &Path,
remote_daemon_id: Option<&[&str]>,
coordinator_is_remote: bool,
) -> eyre::Result<()> {
let nodes = dataflow.resolve_aliases_and_set_defaults()?;
let mut has_python_operator = false;

// check that nodes and operators exist
for node in &nodes {
match &node.kind {
descriptor::CoreNodeKind::Custom(node) => match node.source.as_str() {
descriptor::CoreNodeKind::Custom(custom) => match custom.source.as_str() {
SHELL_SOURCE => (),
source => {
if source_is_url(source) {
info!("{source} is a URL."); // TODO: Implement url check.
} else if let Some(remote_daemon_id) = remote_daemon_id {
if remote_daemon_id.contains(&node.deploy.machine.as_str())
|| coordinator_is_remote
{
let path = Path::new(&source);
let path = if path.extension().is_none() {
path.with_extension(EXE_EXTENSION)
} else {
path.to_owned()
};
if path.is_relative() {
eyre::bail!(
"paths of remote nodes must be absolute (node `{}`)",
node.id
);
}
info!("skipping path check for remote node `{}`", node.id);
} else {
resolve_path(source, working_dir).wrap_err_with(|| {
format!("Could not find source path `{}`", source)
})?;
}
} else {
resolve_path(source, working_dir)
.wrap_err_with(|| format!("Could not find source path `{}`", source))?;
Expand Down

0 comments on commit 799a3a6

Please sign in to comment.