Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor python typing #369

Merged
merged 5 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion binaries/cli/src/graph/mermaid-template.html
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
____insert____
</div>
<script src="https://cdn.jsdelivr.net/npm/mermaid/dist/mermaid.min.js"></script>
<script>mermaid.initialize({ startOnLoad: true, securityLevel: 'loose' });
<script>mermaid.initialize({ startOnLoad: true, securityLevel: 'loose', theme: 'base' });
</script>
</body>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,10 @@ def on_input(

Args:
dora_input (dict): Input dict containing an `id`, `data` and `metadata`.
send_output Callable[[str, bytes | pa.UInt8Array, Optional[dict]], None]:
send_output Callable[[str, bytes | pa.Array, Optional[dict]], None]:
Function for sending output to the dataflow:
- First argument is the `output_id`
- Second argument is the data as either bytes or `pa.UInt8Array`
- Second argument is the data as either bytes or `pa.Array`
- Third argument is dora metadata dict
e.g.: `send_output("bbox", pa.array([100], type=pa.uint8()), dora_event["metadata"])`

Expand All @@ -44,9 +44,7 @@ def on_input(
STOP means that the operator stop listening for inputs.

"""
print(
f"Received input {dora_input['id']}, with data: {dora_input['value']}"
)
print(f"Received input {dora_input['id']}, with data: {dora_input['value']}")

return DoraStatus.CONTINUE

Expand Down
2 changes: 1 addition & 1 deletion binaries/runtime/src/operator/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ mod callback_impl {

/// Send an output from the operator:
/// - the first argument is the `output_id` as defined in your dataflow.
/// - the second argument is the data as either bytes or pyarrow.UInt8Array for zero copy.
/// - the second argument is the data as either bytes or pyarrow.Array for zero copy.
/// - the third argument is dora metadata if you want ot link the tracing from one input into an output.
/// `e.g.: send_output("bbox", pa.array([100], type=pa.uint8()), dora_event["metadata"])`
#[pymethods]
Expand Down
10 changes: 5 additions & 5 deletions examples/python-operator-dataflow/object_detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def __init__(self):
def on_event(
self,
dora_event: dict,
send_output: Callable[[str, bytes | pa.UInt8Array, Optional[dict]], None],
send_output: Callable[[str, bytes | pa.Array, Optional[dict]], None],
) -> DoraStatus:
if dora_event["type"] == "INPUT":
return self.on_input(dora_event, send_output)
Expand All @@ -36,15 +36,15 @@ def on_event(
def on_input(
self,
dora_input: dict,
send_output: Callable[[str, bytes | pa.UInt8Array, Optional[dict]], None],
send_output: Callable[[str, bytes | pa.Array, Optional[dict]], None],
) -> DoraStatus:
"""Handle image
Args:
dora_input (dict): Dict containing the "id", "data", and "metadata"
send_output Callable[[str, bytes | pa.UInt8Array, Optional[dict]], None]:
dora_input (dict): Dict containing the "id", value, and "metadata"
send_output Callable[[str, bytes | pa.Array, Optional[dict]], None]:
Function for sending output to the dataflow:
- First argument is the `output_id`
- Second argument is the data as either bytes or `pa.UInt8Array`
- Second argument is the data as either bytes or `pa.Array`
- Third argument is dora metadata dict
e.g.: `send_output("bbox", pa.array([100], type=pa.uint8()), dora_event["metadata"])`
"""
Expand Down
8 changes: 4 additions & 4 deletions examples/python-operator-dataflow/plot.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ def __init__(self):
def on_event(
self,
dora_event: dict,
send_output: Callable[[str, bytes | pa.UInt8Array, Optional[dict]], None],
send_output: Callable[[str, bytes | pa.Array, Optional[dict]], None],
) -> DoraStatus:
if dora_event["type"] == "INPUT":
return self.on_input(dora_event, send_output)
Expand All @@ -43,18 +43,18 @@ def on_event(
def on_input(
self,
dora_input: dict,
send_output: Callable[[str, bytes | pa.UInt8Array, Optional[dict]], None],
send_output: Callable[[str, bytes | pa.Array, Optional[dict]], None],
) -> DoraStatus:
"""
Put image and bounding box on cv2 window.

Args:
dora_input["id"] (str): Id of the dora_input declared in the yaml configuration
dora_input["value"] (arrow array): message of the dora_input
send_output Callable[[str, bytes | pa.UInt8Array, Optional[dict]], None]:
send_output Callable[[str, bytes | pa.Array, Optional[dict]], None]:
Function for sending output to the dataflow:
- First argument is the `output_id`
- Second argument is the data as either bytes or `pa.UInt8Array`
- Second argument is the data as either bytes or `pa.Array`
- Third argument is dora metadata dict
e.g.: `send_output("bbox", pa.array([100], type=pa.uint8()), dora_event["metadata"])`
"""
Expand Down
2 changes: 1 addition & 1 deletion examples/python-operator-dataflow/webcam.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def __init__(self):
def on_event(
self,
dora_event: str,
send_output: Callable[[str, bytes | pa.UInt8Array, Optional[dict]], None],
send_output: Callable[[str, bytes | pa.Array, Optional[dict]], None],
) -> DoraStatus:
match dora_event["type"]:
case "INPUT":
Expand Down
12 changes: 9 additions & 3 deletions examples/python-ros2-dataflow/random_turtle.py
Original file line number Diff line number Diff line change
@@ -1,36 +1,40 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import time
import random
import dora
from dora import Node
import pyarrow as pa

CHECK_TICK = 50

# Create a ROS2 Context
ros2_context = dora.experimental.ros2_bridge.Ros2Context()
ros2_node = ros2_context.new_node(
"turtle_teleop",
"/ros2_demo",
dora.experimental.ros2_bridge.Ros2NodeOptions(rosout=True),
)

# Define a ROS2 QOS
topic_qos = dora.experimental.ros2_bridge.Ros2QosPolicies(
reliable=True, max_blocking_time=0.1
)

# Create a publisher to cmd_vel topic
turtle_twist_topic = ros2_node.create_topic(
"/turtle1/cmd_vel", "geometry_msgs::Twist", topic_qos
)
twist_writer = ros2_node.create_publisher(turtle_twist_topic)

# Create a listener to pose topic
turtle_pose_topic = ros2_node.create_topic(
"/turtle1/pose", "turtlesim::Pose", topic_qos
)
pose_reader = ros2_node.create_subscription(turtle_pose_topic)

# Create a dora node
dora_node = Node()

# Listen for both stream on the same loop as Python does not handle well multiprocessing
dora_node.merge_external_events(pose_reader)

print("looping", flush=True)
Expand All @@ -40,13 +44,15 @@
if event is None:
break
match event["kind"]:
# Dora event
case "dora":
match event["type"]:
case "INPUT":
match event["id"]:
case "direction":
twist_writer.publish(event["value"])

# ROS2 Event
case "external":
pose = event.inner()[0].as_py()
if i == CHECK_TICK:
Expand Down
36 changes: 25 additions & 11 deletions libraries/core/src/descriptor/visualize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,22 +93,36 @@ fn visualize_runtime_node(
operators: &[OperatorDefinition],
flowchart: &mut String,
) {
writeln!(flowchart, "subgraph {node_id}").unwrap();
for operator in operators {
let operator_id = &operator.id;
if operators.len() == 1 && operators[0].id.to_string() == "op" {
let operator = &operators[0];
// single operator node
if operator.config.inputs.is_empty() {
// source operator
writeln!(flowchart, " {node_id}/{operator_id}[\\{operator_id}/]").unwrap();
// source node
writeln!(flowchart, " {node_id}/op[\\{node_id}/]").unwrap();
} else if operator.config.outputs.is_empty() {
// sink operator
writeln!(flowchart, " {node_id}/{operator_id}[/{operator_id}\\]").unwrap();
// sink node
writeln!(flowchart, " {node_id}/op[/{node_id}\\]").unwrap();
} else {
// normal operator
writeln!(flowchart, " {node_id}/{operator_id}[{operator_id}]").unwrap();
// normal node
writeln!(flowchart, " {node_id}/op[{node_id}]").unwrap();
}
} else {
writeln!(flowchart, "subgraph {node_id}").unwrap();
for operator in operators {
let operator_id = &operator.id;
if operator.config.inputs.is_empty() {
// source operator
writeln!(flowchart, " {node_id}/{operator_id}[\\{operator_id}/]").unwrap();
} else if operator.config.outputs.is_empty() {
// sink operator
writeln!(flowchart, " {node_id}/{operator_id}[/{operator_id}\\]").unwrap();
} else {
// normal operator
writeln!(flowchart, " {node_id}/{operator_id}[{operator_id}]").unwrap();
}
}
flowchart.push_str("end\n");
}

flowchart.push_str("end\n");
}

fn visualize_node_inputs(
Expand Down