Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Delay dropping of DoraNode in Python until all event data is freed #601

Merged
merged 2 commits into from
Jul 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

80 changes: 56 additions & 24 deletions apis/python/node/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#![allow(clippy::borrow_deref_ref)] // clippy warns about code generated by #[pymethods]

use std::sync::Arc;
use std::time::Duration;

use arrow::pyarrow::{FromPyArrow, ToPyArrow};
use dora_node_api::dora_core::config::NodeId;
use dora_node_api::dora_core::daemon_messages::DataflowId;
use dora_node_api::merged::{MergeExternalSend, MergedEvent};
use dora_node_api::{DoraNode, EventStream};
use dora_operator_api_python::{pydict_to_metadata, PyEvent};
use dora_operator_api_python::{pydict_to_metadata, DelayedCleanup, NodeCleanupHandle, PyEvent};
use dora_ros2_bridge_python::Ros2Subscription;
use eyre::Context;
use futures::{Stream, StreamExt};
Expand All @@ -30,8 +32,10 @@ use pyo3_special_method_derive::{Dict, Dir, Repr, Str};
#[derive(Dir, Dict, Str, Repr)]
pub struct Node {
events: Events,
#[pyo3_smd(skip)]
pub node: DoraNode,
node: DelayedCleanup<DoraNode>,

dataflow_id: DataflowId,
node_id: NodeId,
}

#[pymethods]
Expand All @@ -45,8 +49,20 @@ impl Node {
DoraNode::init_from_env().context("Couldn not initiate node from environment variable. For dynamic node, please add a node id in the initialization function.")?
};

let dataflow_id = *node.dataflow_id();
let node_id = node.id().clone();
let node = DelayedCleanup::new(node);
let events = DelayedCleanup::new(events);
let cleanup_handle = NodeCleanupHandle {
_handles: Arc::new((node.handle(), events.handle())),
};
Ok(Node {
events: Events::Dora(events),
events: Events {
inner: EventsInner::Dora(events),
cleanup_handle,
},
dataflow_id,
node_id,
node,
})
}
Expand Down Expand Up @@ -148,10 +164,11 @@ impl Node {
if let Ok(py_bytes) = data.downcast_bound::<PyBytes>(py) {
let data = py_bytes.as_bytes();
self.node
.get_mut()
.send_output_bytes(output_id.into(), parameters, data.len(), data)
.wrap_err("failed to send output")?;
} else if let Ok(arrow_array) = arrow::array::ArrayData::from_pyarrow_bound(data.bind(py)) {
self.node.send_output(
self.node.get_mut().send_output(
output_id.into(),
parameters,
arrow::array::make_array(arrow_array),
Expand All @@ -168,15 +185,18 @@ impl Node {
/// This method returns the parsed dataflow YAML file.
///
/// :rtype: dict
pub fn dataflow_descriptor(&self, py: Python) -> pythonize::Result<PyObject> {
pythonize::pythonize(py, self.node.dataflow_descriptor())
pub fn dataflow_descriptor(&mut self, py: Python) -> eyre::Result<PyObject> {
Ok(pythonize::pythonize(
py,
self.node.get_mut().dataflow_descriptor(),
)?)
}

/// Returns the dataflow id.
///
/// :rtype: str
pub fn dataflow_id(&self) -> String {
self.node.dataflow_id().to_string()
self.dataflow_id.to_string()
}

/// Merge an external event stream with dora main loop.
Expand Down Expand Up @@ -207,43 +227,55 @@ impl Node {

// take out the event stream and temporarily replace it with a dummy
let events = std::mem::replace(
&mut self.events,
Events::Merged(Box::new(futures::stream::empty())),
&mut self.events.inner,
EventsInner::Merged(Box::new(futures::stream::empty())),
);
// update self.events with the merged stream
self.events = Events::Merged(events.merge_external_send(Box::pin(stream)));
self.events.inner = EventsInner::Merged(events.merge_external_send(Box::pin(stream)));

Ok(())
}
}

enum Events {
Dora(EventStream),
Merged(Box<dyn Stream<Item = MergedEvent<PyObject>> + Unpin + Send>),
struct Events {
inner: EventsInner,
cleanup_handle: NodeCleanupHandle,
}

impl Events {
fn recv(&mut self, timeout: Option<Duration>) -> Option<PyEvent> {
match self {
Events::Dora(events) => match timeout {
Some(timeout) => events.recv_timeout(timeout).map(PyEvent::from),
None => events.recv().map(PyEvent::from),
let event = match &mut self.inner {
EventsInner::Dora(events) => match timeout {
Some(timeout) => events
.get_mut()
.recv_timeout(timeout)
.map(MergedEvent::Dora),
None => events.get_mut().recv().map(MergedEvent::Dora),
},
Events::Merged(events) => futures::executor::block_on(events.next()).map(PyEvent::from),
}
EventsInner::Merged(events) => futures::executor::block_on(events.next()),
};
event.map(|event| PyEvent {
event,
_cleanup: Some(self.cleanup_handle.clone()),
})
}
}

impl<'a> MergeExternalSend<'a, PyObject> for Events {
enum EventsInner {
Dora(DelayedCleanup<EventStream>),
Merged(Box<dyn Stream<Item = MergedEvent<PyObject>> + Unpin + Send>),
}

impl<'a> MergeExternalSend<'a, PyObject> for EventsInner {
type Item = MergedEvent<PyObject>;

fn merge_external_send(
self,
external_events: impl Stream<Item = PyObject> + Unpin + Send + 'a,
) -> Box<dyn Stream<Item = Self::Item> + Unpin + Send + 'a> {
match self {
Events::Dora(events) => events.merge_external_send(external_events),
Events::Merged(events) => {
EventsInner::Dora(events) => events.merge_external_send(external_events),
EventsInner::Merged(events) => {
let merged = events.merge_external_send(external_events);
Box::new(merged.map(|event| match event {
MergedEvent::Dora(e) => MergedEvent::Dora(e),
Expand All @@ -256,7 +288,7 @@ impl<'a> MergeExternalSend<'a, PyObject> for Events {

impl Node {
pub fn id(&self) -> String {
self.node.id().to_string()
self.node_id.to_string()
}
}

Expand Down
2 changes: 2 additions & 0 deletions apis/python/operator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,5 @@ flume = "0.10.14"
arrow = { workspace = true, features = ["pyarrow"] }
arrow-schema = { workspace = true }
aligned-vec = "0.5.0"
futures = "0.3.28"
futures-concurrency = "7.3.0"
86 changes: 70 additions & 16 deletions apis/python/operator/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,82 @@
use std::collections::HashMap;
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};

use arrow::pyarrow::ToPyArrow;
use dora_node_api::{merged::MergedEvent, Event, Metadata, MetadataParameters};
use dora_node_api::{
merged::{MergeExternalSend, MergedEvent},
DoraNode, Event, EventStream, Metadata, MetadataParameters,
};
use eyre::{Context, Result};
use futures::{Stream, StreamExt};
use futures_concurrency::stream::Merge as _;
use pyo3::{
prelude::*,
pybacked::PyBackedStr,
types::{IntoPyDict, PyDict},
};

/// Dora Event
#[derive(Debug)]
pub struct PyEvent {
event: MergedEvent<PyObject>,
pub event: MergedEvent<PyObject>,
pub _cleanup: Option<NodeCleanupHandle>,
}

/// Keeps the dora node alive until all event objects have been dropped.
#[derive(Clone)]
#[pyclass]
pub struct NodeCleanupHandle {
pub _handles: Arc<(CleanupHandle<DoraNode>, CleanupHandle<EventStream>)>,
}

/// Owned type with delayed cleanup (using `handle` method).
pub struct DelayedCleanup<T>(Arc<Mutex<T>>);

impl<T> DelayedCleanup<T> {
pub fn new(value: T) -> Self {
Self(Arc::new(Mutex::new(value)))
}

pub fn handle(&self) -> CleanupHandle<T> {
CleanupHandle(self.0.clone())
}

pub fn get_mut(&mut self) -> std::sync::MutexGuard<T> {
self.0.try_lock().expect("failed to lock DelayedCleanup")
}
}

impl Stream for DelayedCleanup<EventStream> {
type Item = Event;

fn poll_next(
self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
let mut inner: std::sync::MutexGuard<'_, EventStream> = self.get_mut().get_mut();
inner.poll_next_unpin(cx)
}
}

impl<'a, E> MergeExternalSend<'a, E> for DelayedCleanup<EventStream>
where
E: 'static,
{
type Item = MergedEvent<E>;

fn merge_external_send(
self,
external_events: impl Stream<Item = E> + Unpin + Send + 'a,
) -> Box<dyn Stream<Item = Self::Item> + Unpin + Send + 'a> {
let dora = self.map(MergedEvent::Dora);
let external = external_events.map(MergedEvent::External);
Box::new((dora, external).merge())
}
}

pub struct CleanupHandle<T>(Arc<Mutex<T>>);

impl PyEvent {
pub fn to_py_dict(self, py: Python<'_>) -> PyResult<Py<PyDict>> {
let mut pydict = HashMap::new();
Expand Down Expand Up @@ -44,6 +106,10 @@ impl PyEvent {
}
}

if let Some(cleanup) = self._cleanup.clone() {
pydict.insert("_cleanup", cleanup.into_py(py));
}

Ok(pydict.into_py_dict_bound(py).unbind())
}

Expand Down Expand Up @@ -92,18 +158,6 @@ impl PyEvent {
}
}

impl From<Event> for PyEvent {
fn from(event: Event) -> Self {
Self::from(MergedEvent::Dora(event))
}
}

impl From<MergedEvent<PyObject>> for PyEvent {
fn from(event: MergedEvent<PyObject>) -> Self {
Self { event }
}
}

pub fn pydict_to_metadata(dict: Option<Bound<'_, PyDict>>) -> Result<MetadataParameters> {
let mut default_metadata = MetadataParameters::default();
if let Some(metadata) = dict {
Expand Down
4 changes: 2 additions & 2 deletions apis/rust/node/src/event_stream/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,14 +220,14 @@ fn report_remaining_drop_tokens(

let mut still_pending = Vec::new();
for (token, rx, since, _) in pending_drop_tokens.drain(..) {
match rx.recv_timeout(Duration::from_millis(50)) {
match rx.recv_timeout(Duration::from_millis(100)) {
Ok(()) => return Err(eyre!("Node API should not send anything on ACK channel")),
Err(flume::RecvTimeoutError::Disconnected) => {
// the event was dropped -> add the drop token to the list
drop_tokens.push(token);
}
Err(flume::RecvTimeoutError::Timeout) => {
let duration = Duration::from_millis(200);
let duration = Duration::from_secs(30);
if since.elapsed() > duration {
tracing::warn!(
"timeout: node finished, but token {token:?} was still not \
Expand Down
2 changes: 1 addition & 1 deletion apis/rust/node/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@
serde_yaml::from_str(&raw).context("failed to deserialize operator config")?
};
#[cfg(feature = "tracing")]
set_up_tracing(&node_config.node_id.to_string())

Check warning on line 68 in apis/rust/node/src/node/mod.rs

View workflow job for this annotation

GitHub Actions / Clippy

unnecessary use of `to_string`
.context("failed to set up tracing subscriber")?;
Self::init(node_config)
}
Expand Down Expand Up @@ -401,7 +401,7 @@
);
}

match self.drop_stream.recv_timeout(Duration::from_millis(500)) {
match self.drop_stream.recv_timeout(Duration::from_secs(10)) {
Ok(token) => {
self.sent_out_shared_memory.remove(&token);
}
Expand Down
2 changes: 1 addition & 1 deletion binaries/daemon/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1600,7 +1600,7 @@ impl RunningDataflow {
let running_nodes = self.running_nodes.clone();
let grace_duration_kills = self.grace_duration_kills.clone();
tokio::spawn(async move {
let duration = grace_duration.unwrap_or(Duration::from_millis(2000));
let duration = grace_duration.unwrap_or(Duration::from_millis(15000));
tokio::time::sleep(duration).await;
let mut system = sysinfo::System::new();
system.refresh_processes();
Expand Down
11 changes: 7 additions & 4 deletions binaries/runtime/src/operator/python.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use dora_core::{
descriptor::{source_is_url, Descriptor, PythonSource},
};
use dora_download::download_file;
use dora_node_api::Event;
use dora_node_api::{merged::MergedEvent, Event};
use dora_operator_api_python::PyEvent;
use dora_operator_api_types::DoraStatus;
use eyre::{bail, eyre, Context, Result};
Expand Down Expand Up @@ -208,9 +208,12 @@ pub fn run(
metadata.parameters.open_telemetry_context = string_cx;
}

let py_event = PyEvent::from(event)
.to_py_dict(py)
.context("Could not convert event to pydict bound")?;
let py_event = PyEvent {
event: MergedEvent::Dora(event),
_cleanup: None,
}
.to_py_dict(py)
.context("Could not convert event to pydict bound")?;

let status_enum = operator
.call_method1(py, "on_event", (py_event, send_output.clone()))
Expand Down
Loading