Skip to content

Commit

Permalink
[core][experimental] Add an actionable log when detecting a deadlock …
Browse files Browse the repository at this point in the history
…caused by using NCCL between DAG nodes on the same actor (#46673)

NCCL is a blocking operation, which means the writer and the reader need to write and read at the same time. However, an actor can only execute one Ray task at a time. Hence, it is impossible for both the reader and writer to operate simultaneously, so the NCCL operation will block forever. If both the reader and the writer are on the same actor, IntraProcessChannel should be used. However, if users specify TorchTensorType, NCCL channel will be created instead.

This PR adds an actionable log when detecting a deadlock caused by using NCCL between DAG nodes on the same actor.
  • Loading branch information
kevin85421 authored Jul 19, 2024
1 parent 7b3c90a commit dd3c04b
Show file tree
Hide file tree
Showing 2 changed files with 89 additions and 2 deletions.
44 changes: 42 additions & 2 deletions python/ray/dag/compiled_dag_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -900,7 +900,7 @@ def _get_or_compile(

from ray.dag.constants import RAY_ADAG_ENABLE_DETECT_DEADLOCK

if RAY_ADAG_ENABLE_DETECT_DEADLOCK and not self._detect_deadlock():
if RAY_ADAG_ENABLE_DETECT_DEADLOCK and self._detect_deadlock():
raise ValueError(
"This DAG cannot be compiled because it will deadlock on NCCL "
"calls. If you believe this is a false positive, please disable "
Expand Down Expand Up @@ -1068,6 +1068,9 @@ def _detect_deadlock(self) -> bool:
If you are interested in the detailed explanation, please refer to
https://github.com/ray-project/ray/pull/45960.
Returns:
True if deadlock is detected, otherwise False.
"""
assert self.idx_to_task
assert self.actor_to_tasks
Expand Down Expand Up @@ -1101,6 +1104,26 @@ def _add_edge(
graph[from_idx].out_edges.add(to_idx)
graph[to_idx].in_edges.add(from_idx)

def _is_same_actor(idx1: int, idx2: int) -> bool:
"""
Args:
idx1: A key in the idx_to_task dictionary.
idx2: A key in the idx_to_task dictionary.
Returns:
True if both DAG nodes are on the same actor;
otherwise, False.
"""
task1 = self.idx_to_task[idx1]
task2 = self.idx_to_task[idx2]
if not isinstance(task1.dag_node, ClassMethodNode):
return False
if not isinstance(task2.dag_node, ClassMethodNode):
return False
actor_id_1 = task1.dag_node._get_actor_handle()._actor_id
actor_id_2 = task2.dag_node._get_actor_handle()._actor_id
return actor_id_1 == actor_id_2

graph = defaultdict(GraphNode)
for idx, task in self.idx_to_task.items():
# Add an edge from task_{bind_index} to task_{bind_index+1}
Expand All @@ -1111,6 +1134,23 @@ def _add_edge(
# Add an edge from the writer to the reader.
_add_edge(graph, idx, downstream_idx)
if task.dag_node.type_hint.requires_nccl():
if _is_same_actor(idx, downstream_idx):
actor_handle = self.idx_to_task[
idx
].dag_node._get_actor_handle()
method = self.idx_to_task[idx].dag_node.get_method_name()
downstream_method = self.idx_to_task[
downstream_idx
].dag_node.get_method_name()
logger.error(
"Detected a deadlock caused by using NCCL channels to "
f"transfer data between the task `{method}` and "
f"its downstream method `{downstream_method}` on the same "
f"actor {actor_handle}. Please remove "
'`TorchTensorType(transport="nccl")` between '
"DAG nodes on the same actor."
)
return True
# Add an edge from the reader of an NCCL channel to the node
# that has the next bind index on the same actor as the writer.
_add_edge(graph, downstream_idx, next_task_idx)
Expand Down Expand Up @@ -1148,7 +1188,7 @@ def _add_edge(
"https://github.com/ray-project/ray/issues/new/."
)

return topological_order_exists
return not topological_order_exists

def _monitor_failures(self):
outer = self
Expand Down
47 changes: 47 additions & 0 deletions python/ray/dag/tests/experimental/test_detect_deadlock_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,53 @@ def test_invalid_graph_1_actor(ray_start_regular, tensor_transport):
dag.experimental_compile()


@pytest.mark.parametrize("ray_start_regular", [{"num_gpus": 1}], indirect=True)
def test_invalid_graph_1_actor_log(ray_start_regular):
"""
This test is similar to test_invalid_graph_1_actor, but it checks if the error
message is correctly logged.
"""
import logging

class LogCaptureHandler(logging.Handler):
def __init__(self):
super().__init__()
self.records = []

def emit(self, record):
self.records.append(record)

logger = logging.getLogger("ray.dag.compiled_dag_node")
handler = LogCaptureHandler()
logger.addHandler(handler)

a = MockedWorker.remote()

ray.get(a.start_mock.remote())

with InputNode() as inp:
dag = a.no_op.bind(inp)
dag.with_type_hint(TorchTensorType(transport=TorchTensorType.NCCL))
dag = a.no_op.bind(dag)

with pytest.raises(ValueError, match=INVALID_GRAPH):
dag.experimental_compile()

error_msg = (
"Detected a deadlock caused by using NCCL channels to transfer "
f"data between the task `no_op` and its downstream method `no_op` on "
f"the same actor {str(a)}. Please remove "
'`TorchTensorType(transport="nccl")` between DAG '
"nodes on the same actor."
)
error_msg_exist = False
for record in handler.records:
if error_msg in record.getMessage():
error_msg_exist = True
assert error_msg_exist, "Error message not found in log."
logger.removeHandler(handler)


@pytest.mark.parametrize("ray_start_regular", [{"num_gpus": 2}], indirect=True)
@pytest.mark.parametrize(
"tensor_transport", [TorchTensorType.AUTO, TorchTensorType.NCCL]
Expand Down

0 comments on commit dd3c04b

Please sign in to comment.