Skip to content

Commit

Permalink
Merge pull request #601 from dora-rs/delay-node-drop
Browse files Browse the repository at this point in the history
Delay dropping of `DoraNode` in Python until all event data is freed
  • Loading branch information
haixuanTao authored Jul 23, 2024
2 parents 3239f9a + 328fd7e commit 16ef29a
Show file tree
Hide file tree
Showing 8 changed files with 141 additions and 48 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

80 changes: 56 additions & 24 deletions apis/python/node/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods]

use std::sync::Arc;
use std::time::Duration;

use arrow::pyarrow::{FromPyArrow, ToPyArrow};
use dora_node_api::dora_core::config::NodeId;
use dora_node_api::dora_core::daemon_messages::DataflowId;
use dora_node_api::merged::{MergeExternalSend, MergedEvent};
use dora_node_api::{DoraNode, EventStream};
use dora_operator_api_python::{pydict_to_metadata, PyEvent};
use dora_operator_api_python::{pydict_to_metadata, DelayedCleanup, NodeCleanupHandle, PyEvent};
use dora_ros2_bridge_python::Ros2Subscription;
use eyre::Context;
use futures::{Stream, StreamExt};
Expand All @@ -30,8 +32,10 @@ use pyo3_special_method_derive::{Dict, Dir, Repr, Str};
#[derive(Dir, Dict, Str, Repr)]
pub struct Node {
events: Events,
#[pyo3_smd(skip)]
pub node: DoraNode,
node: DelayedCleanup<DoraNode>,

dataflow_id: DataflowId,
node_id: NodeId,
}

#[pymethods]
Expand All @@ -45,8 +49,20 @@ impl Node {
DoraNode::init_from_env().context("Couldn not initiate node from environment variable. For dynamic node, please add a node id in the initialization function.")?
};

let dataflow_id = *node.dataflow_id();
let node_id = node.id().clone();
let node = DelayedCleanup::new(node);
let events = DelayedCleanup::new(events);
let cleanup_handle = NodeCleanupHandle {
_handles: Arc::new((node.handle(), events.handle())),
};
Ok(Node {
events: Events::Dora(events),
events: Events {
inner: EventsInner::Dora(events),
cleanup_handle,
},
dataflow_id,
node_id,
node,
})
}
Expand Down Expand Up @@ -148,10 +164,11 @@ impl Node {
if let Ok(py_bytes) = data.downcast_bound::<PyBytes>(py) {
let data = py_bytes.as_bytes();
self.node
.get_mut()
.send_output_bytes(output_id.into(), parameters, data.len(), data)
.wrap_err("failed to send output")?;
} else if let Ok(arrow_array) = arrow::array::ArrayData::from_pyarrow_bound(data.bind(py)) {
self.node.send_output(
self.node.get_mut().send_output(
output_id.into(),
parameters,
arrow::array::make_array(arrow_array),
Expand All @@ -168,15 +185,18 @@ impl Node {
/// This method returns the parsed dataflow YAML file.
///
/// :rtype: dict
pub fn dataflow_descriptor(&self, py: Python) -> pythonize::Result<PyObject> {
pythonize::pythonize(py, self.node.dataflow_descriptor())
pub fn dataflow_descriptor(&mut self, py: Python) -> eyre::Result<PyObject> {
Ok(pythonize::pythonize(
py,
self.node.get_mut().dataflow_descriptor(),
)?)
}

/// Returns the dataflow id.
///
/// :rtype: str
pub fn dataflow_id(&self) -> String {
self.node.dataflow_id().to_string()
self.dataflow_id.to_string()
}

/// Merge an external event stream with dora main loop.
Expand Down Expand Up @@ -207,43 +227,55 @@ impl Node {

// take out the event stream and temporarily replace it with a dummy
let events = std::mem::replace(
&mut self.events,
Events::Merged(Box::new(futures::stream::empty())),
&mut self.events.inner,
EventsInner::Merged(Box::new(futures::stream::empty())),
);
// update self.events with the merged stream
self.events = Events::Merged(events.merge_external_send(Box::pin(stream)));
self.events.inner = EventsInner::Merged(events.merge_external_send(Box::pin(stream)));

Ok(())
}
}

enum Events {
Dora(EventStream),
Merged(Box<dyn Stream<Item = MergedEvent<PyObject>> + Unpin + Send>),
struct Events {
inner: EventsInner,
cleanup_handle: NodeCleanupHandle,
}

impl Events {
fn recv(&mut self, timeout: Option<Duration>) -> Option<PyEvent> {
match self {
Events::Dora(events) => match timeout {
Some(timeout) => events.recv_timeout(timeout).map(PyEvent::from),
None => events.recv().map(PyEvent::from),
let event = match &mut self.inner {
EventsInner::Dora(events) => match timeout {
Some(timeout) => events
.get_mut()
.recv_timeout(timeout)
.map(MergedEvent::Dora),
None => events.get_mut().recv().map(MergedEvent::Dora),
},
Events::Merged(events) => futures::executor::block_on(events.next()).map(PyEvent::from),
}
EventsInner::Merged(events) => futures::executor::block_on(events.next()),
};
event.map(|event| PyEvent {
event,
_cleanup: Some(self.cleanup_handle.clone()),
})
}
}

impl<'a> MergeExternalSend<'a, PyObject> for Events {
enum EventsInner {
Dora(DelayedCleanup<EventStream>),
Merged(Box<dyn Stream<Item = MergedEvent<PyObject>> + Unpin + Send>),
}

impl<'a> MergeExternalSend<'a, PyObject> for EventsInner {
type Item = MergedEvent<PyObject>;

fn merge_external_send(
self,
external_events: impl Stream<Item = PyObject> + Unpin + Send + 'a,
) -> Box<dyn Stream<Item = Self::Item> + Unpin + Send + 'a> {
match self {
Events::Dora(events) => events.merge_external_send(external_events),
Events::Merged(events) => {
EventsInner::Dora(events) => events.merge_external_send(external_events),
EventsInner::Merged(events) => {
let merged = events.merge_external_send(external_events);
Box::new(merged.map(|event| match event {
MergedEvent::Dora(e) => MergedEvent::Dora(e),
Expand All @@ -256,7 +288,7 @@ impl<'a> MergeExternalSend<'a, PyObject> for Events {

impl Node {
pub fn id(&self) -> String {
self.node.id().to_string()
self.node_id.to_string()
}
}

Expand Down
2 changes: 2 additions & 0 deletions apis/python/operator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ flume = "0.10.14"
arrow = { workspace = true, features = ["pyarrow"] }
arrow-schema = { workspace = true }
aligned-vec = "0.5.0"
futures = "0.3.28"
futures-concurrency = "7.3.0"
86 changes: 70 additions & 16 deletions apis/python/operator/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,82 @@
use std::collections::HashMap;
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};

use arrow::pyarrow::ToPyArrow;
use dora_node_api::{merged::MergedEvent, Event, Metadata, MetadataParameters};
use dora_node_api::{
merged::{MergeExternalSend, MergedEvent},
DoraNode, Event, EventStream, Metadata, MetadataParameters,
};
use eyre::{Context, Result};
use futures::{Stream, StreamExt};
use futures_concurrency::stream::Merge as _;
use pyo3::{
prelude::*,
pybacked::PyBackedStr,
types::{IntoPyDict, PyDict},
};

/// Dora Event
#[derive(Debug)]
pub struct PyEvent {
event: MergedEvent<PyObject>,
pub event: MergedEvent<PyObject>,
pub _cleanup: Option<NodeCleanupHandle>,
}

/// Keeps the dora node alive until all event objects have been dropped.
#[derive(Clone)]
#[pyclass]
pub struct NodeCleanupHandle {
pub _handles: Arc<(CleanupHandle<DoraNode>, CleanupHandle<EventStream>)>,
}

/// Owned type with delayed cleanup (using `handle` method).
pub struct DelayedCleanup<T>(Arc<Mutex<T>>);

impl<T> DelayedCleanup<T> {
pub fn new(value: T) -> Self {
Self(Arc::new(Mutex::new(value)))
}

pub fn handle(&self) -> CleanupHandle<T> {
CleanupHandle(self.0.clone())
}

pub fn get_mut(&mut self) -> std::sync::MutexGuard<T> {
self.0.try_lock().expect("failed to lock DelayedCleanup")
}
}

impl Stream for DelayedCleanup<EventStream> {
type Item = Event;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let mut inner: std::sync::MutexGuard<'_, EventStream> = self.get_mut().get_mut();
inner.poll_next_unpin(cx)
}
}

impl<'a, E> MergeExternalSend<'a, E> for DelayedCleanup<EventStream>
where
E: 'static,
{
type Item = MergedEvent<E>;

fn merge_external_send(
self,
external_events: impl Stream<Item = E> + Unpin + Send + 'a,
) -> Box<dyn Stream<Item = Self::Item> + Unpin + Send + 'a> {
let dora = self.map(MergedEvent::Dora);
let external = external_events.map(MergedEvent::External);
Box::new((dora, external).merge())
}
}

pub struct CleanupHandle<T>(Arc<Mutex<T>>);

impl PyEvent {
pub fn to_py_dict(self, py: Python<'_>) -> PyResult<Py<PyDict>> {
let mut pydict = HashMap::new();
Expand Down Expand Up @@ -44,6 +106,10 @@ impl PyEvent {
}
}

if let Some(cleanup) = self._cleanup.clone() {
pydict.insert("_cleanup", cleanup.into_py(py));
}

Ok(pydict.into_py_dict_bound(py).unbind())
}

Expand Down Expand Up @@ -92,18 +158,6 @@ impl PyEvent {
}
}

impl From<Event> for PyEvent {
fn from(event: Event) -> Self {
Self::from(MergedEvent::Dora(event))
}
}

impl From<MergedEvent<PyObject>> for PyEvent {
fn from(event: MergedEvent<PyObject>) -> Self {
Self { event }
}
}

pub fn pydict_to_metadata(dict: Option<Bound<'_, PyDict>>) -> Result<MetadataParameters> {
let mut default_metadata = MetadataParameters::default();
if let Some(metadata) = dict {
Expand Down
4 changes: 2 additions & 2 deletions apis/rust/node/src/event_stream/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,14 +220,14 @@ fn report_remaining_drop_tokens(

let mut still_pending = Vec::new();
for (token, rx, since, _) in pending_drop_tokens.drain(..) {
match rx.recv_timeout(Duration::from_millis(50)) {
match rx.recv_timeout(Duration::from_millis(100)) {
Ok(()) => return Err(eyre!("Node API should not send anything on ACK channel")),
Err(flume::RecvTimeoutError::Disconnected) => {
// the event was dropped -> add the drop token to the list
drop_tokens.push(token);
}
Err(flume::RecvTimeoutError::Timeout) => {
let duration = Duration::from_millis(200);
let duration = Duration::from_secs(30);
if since.elapsed() > duration {
tracing::warn!(
"timeout: node finished, but token {token:?} was still not \
Expand Down
2 changes: 1 addition & 1 deletion apis/rust/node/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ impl Drop for DoraNode {
);
}

match self.drop_stream.recv_timeout(Duration::from_millis(500)) {
match self.drop_stream.recv_timeout(Duration::from_secs(10)) {
Ok(token) => {
self.sent_out_shared_memory.remove(&token);
}
Expand Down
2 changes: 1 addition & 1 deletion binaries/daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1600,7 +1600,7 @@ impl RunningDataflow {
let running_nodes = self.running_nodes.clone();
let grace_duration_kills = self.grace_duration_kills.clone();
tokio::spawn(async move {
let duration = grace_duration.unwrap_or(Duration::from_millis(2000));
let duration = grace_duration.unwrap_or(Duration::from_millis(15000));
tokio::time::sleep(duration).await;
let mut system = sysinfo::System::new();
system.refresh_processes();
Expand Down
11 changes: 7 additions & 4 deletions binaries/runtime/src/operator/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use dora_core::{
descriptor::{source_is_url, Descriptor, PythonSource},
};
use dora_download::download_file;
use dora_node_api::Event;
use dora_node_api::{merged::MergedEvent, Event};
use dora_operator_api_python::PyEvent;
use dora_operator_api_types::DoraStatus;
use eyre::{bail, eyre, Context, Result};
Expand Down Expand Up @@ -208,9 +208,12 @@ pub fn run(
metadata.parameters.open_telemetry_context = string_cx;
}

let py_event = PyEvent::from(event)
.to_py_dict(py)
.context("Could not convert event to pydict bound")?;
let py_event = PyEvent {
event: MergedEvent::Dora(event),
_cleanup: None,
}
.to_py_dict(py)
.context("Could not convert event to pydict bound")?;

let status_enum = operator
.call_method1(py, "on_event", (py_event, send_output.clone()))
Expand Down

0 comments on commit 16ef29a

Please sign in to comment.