Skip to content

Commit

Permalink
Report when shared memory region is mapped to allow faster cleanup
Browse files Browse the repository at this point in the history
The shared memory region can be safely removed by the sender once it's mapped in the receiver. The OS will just delete the file handle associated with the shared memory region, but keep the data alive until it has been unmapped from all address spaces.

By notifying the sender that a message has been mapped to the address space we enable faster cleanup on exit. The sender can safely close all of its shared memory regions once all of its sent messages are at least mapped. So it does not need to wait until all messages have been _dropped_ anymore, which can take considerably longer, especially if the Python GC is involved.

This commit modifies the message format, so we need to bump the version of the `dora-message` crate to `0.5.0`.
  • Loading branch information
phil-opp committed Dec 11, 2024
1 parent d2eb777 commit b73b9ee
Show file tree
Hide file tree
Showing 10 changed files with 213 additions and 125 deletions.
53 changes: 3 additions & 50 deletions apis/rust/node/src/event_stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
};

use dora_message::{
daemon_to_node::{DaemonCommunication, DaemonReply, DataMessage, NodeEvent},
daemon_to_node::{DaemonCommunication, DaemonReply},
id::DataId,
node_to_daemon::{DaemonRequest, Timestamped},
DataflowId,
Expand All @@ -18,10 +18,7 @@ use futures::{
use futures_timer::Delay;
use scheduler::{Scheduler, NON_INPUT_EVENT};

use self::{
event::SharedMemoryData,
thread::{EventItem, EventStreamThreadHandle},
};
use self::thread::{EventItem, EventStreamThreadHandle};
use crate::daemon_connection::DaemonChannel;
use dora_core::{
config::{Input, NodeId},
Expand Down Expand Up @@ -198,51 +195,7 @@ impl EventStream {

fn convert_event_item(item: EventItem) -> Event {
match item {
EventItem::NodeEvent { event, ack_channel } => match event {
NodeEvent::Stop => Event::Stop,
NodeEvent::Reload { operator_id } => Event::Reload { operator_id },
NodeEvent::InputClosed { id } => Event::InputClosed { id },
NodeEvent::Input { id, metadata, data } => {
let data = match data {
None => Ok(None),
Some(DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))),
Some(DataMessage::SharedMemory {
shared_memory_id,
len,
drop_token: _, // handled in `event_stream_loop`
}) => unsafe {
MappedInputData::map(&shared_memory_id, len).map(|data| {
Some(RawData::SharedMemory(SharedMemoryData {
data,
_drop: ack_channel,
}))
})
},
};
let data = data.and_then(|data| {
let raw_data = data.unwrap_or(RawData::Empty);
raw_data
.into_arrow_array(&metadata.type_info)
.map(arrow::array::make_array)
});
match data {
Ok(data) => Event::Input {
id,
metadata,
data: data.into(),
},
Err(err) => Event::Error(format!("{err:?}")),
}
}
NodeEvent::AllInputsClosed => {
let err = eyre!(
"received `AllInputsClosed` event, which should be handled by background task"
);
tracing::error!("{err:?}");
Event::Error(err.wrap_err("internal error").to_string())
}
},

EventItem::NodeEvent { event } => event,
EventItem::FatalError(err) => {
Event::Error(format!("fatal event stream error: {err:?}"))
}
Expand Down
10 changes: 2 additions & 8 deletions apis/rust/node/src/event_stream/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::collections::{HashMap, VecDeque};

use dora_message::{daemon_to_node::NodeEvent, id::DataId};

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Test (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Test (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Test (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Clippy

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / cross-check (ubuntu-20.04, x86_64-unknown-linux-gnu)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / cross-check (ubuntu-20.04, armv7-unknown-linux-musleabihf)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / cross-check (ubuntu-20.04, aarch64-unknown-linux-musl)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / cross-check (ubuntu-20.04, i686-unknown-linux-gnu)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / cross-check (ubuntu-20.04, aarch64-unknown-linux-gnu)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Bench (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Bench (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Bench (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Bench (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Test (ubuntu-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Test (ubuntu-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Test (ubuntu-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Bench (ubuntu-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Bench (ubuntu-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Bench (ubuntu-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Bench (ubuntu-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / ROS2 Bridge Examples

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / ROS2 Bridge Examples

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / ROS2 Bridge Examples

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / ROS2 Bridge Examples

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / ROS2 Bridge Examples

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / ROS2 Bridge Examples

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / ROS2 Bridge Examples

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (ubuntu-22.04)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (ubuntu-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (ubuntu-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (ubuntu-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (ubuntu-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (ubuntu-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (ubuntu-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (ubuntu-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (ubuntu-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (ubuntu-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (ubuntu-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (ubuntu-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Test (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Test (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Test (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Bench (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Bench (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Bench (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / Examples (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / cross-check (macos-13, x86_64-apple-darwin)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (windows-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / CLI Test (macos-latest)

unused import: `daemon_to_node::NodeEvent`

Check warning on line 3 in apis/rust/node/src/event_stream/scheduler.rs

View workflow job for this annotation

GitHub Actions / cross-check (macos-13, aarch64-apple-darwin)

unused import: `daemon_to_node::NodeEvent`

use super::thread::EventItem;
use super::{thread::EventItem, Event};
pub const NON_INPUT_EVENT: &str = "dora/non_input_event";

// This scheduler will make sure that there is fairness between
Expand Down Expand Up @@ -40,13 +40,7 @@ impl Scheduler {
pub fn add_event(&mut self, event: EventItem) {
let event_id = match &event {
EventItem::NodeEvent {
event:
NodeEvent::Input {
id,
metadata: _,
data: _,
},
ack_channel: _,
event: Event::Input { id, .. },
} => id,
_ => &DataId::from(NON_INPUT_EVENT.to_string()),
};
Expand Down
85 changes: 61 additions & 24 deletions apis/rust/node/src/event_stream/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use dora_core::{
uhlc::{self, Timestamp},
};
use dora_message::{
common::{DataMessage, DropTokenState, DropTokenStatus},
daemon_to_node::{DaemonReply, NodeEvent},
node_to_daemon::{DaemonRequest, DropToken, Timestamped},
};
Expand All @@ -15,6 +16,8 @@ use std::{

use crate::daemon_connection::DaemonChannel;

use super::{event::SharedMemoryData, Event, MappedInputData, RawData};

pub fn init(
node_id: NodeId,
tx: flume::Sender<EventItem>,
Expand All @@ -28,10 +31,7 @@ pub fn init(

#[derive(Debug)]
pub enum EventItem {
NodeEvent {
event: NodeEvent,
ack_channel: flume::Sender<()>,
},
NodeEvent { event: super::Event },
FatalError(eyre::Report),
TimeoutError(eyre::Report),
}
Expand Down Expand Up @@ -130,25 +130,60 @@ fn event_stream_loop(
if let Err(err) = clock.update_with_timestamp(&timestamp) {
tracing::warn!("failed to update HLC: {err}");
}
let drop_token = match &inner {
NodeEvent::Input {
data: Some(data), ..
} => data.drop_token(),

let event = match inner {
NodeEvent::Stop => Event::Stop,
NodeEvent::Reload { operator_id } => Event::Reload { operator_id },
NodeEvent::InputClosed { id } => Event::InputClosed { id },
NodeEvent::Input { id, metadata, data } => {
let data = match data {
None => Ok(None),
Some(DataMessage::Vec(v)) => Ok(Some(RawData::Vec(v))),
Some(DataMessage::SharedMemory {
shared_memory_id,
len,
drop_token,
}) => unsafe {
let (drop_tx, drop_rx) = flume::bounded(0);
let data = MappedInputData::map(&shared_memory_id, len).map(|data| {
Some(RawData::SharedMemory(SharedMemoryData {
data,
_drop: drop_tx,
}))
});
drop_tokens.push(DropTokenStatus {
token: drop_token,
state: DropTokenState::Mapped,
});
pending_drop_tokens.push((drop_token, drop_rx, Instant::now(), 1));
data
},
};
let data = data.and_then(|data| {
let raw_data = data.unwrap_or(RawData::Empty);
raw_data
.into_arrow_array(&metadata.type_info)
.map(arrow::array::make_array)
});
match data {
Ok(data) => Event::Input {
id,
metadata,
data: data.into(),
},
Err(err) => Event::Error(format!("{err:?}")),
}
}
NodeEvent::AllInputsClosed => {
// close the event stream
tx = None;
// skip this internal event
continue;
}
_ => None,
};

if let Some(tx) = tx.as_ref() {
let (drop_tx, drop_rx) = flume::bounded(0);
match tx.send(EventItem::NodeEvent {
event: inner,
ack_channel: drop_tx,
}) {
match tx.send(EventItem::NodeEvent { event }) {
Ok(()) => {}
Err(send_error) => {
let event = send_error.into_inner();
Expand All @@ -159,12 +194,8 @@ fn event_stream_loop(
break 'outer Ok(());
}
}

if let Some(token) = drop_token {
pending_drop_tokens.push((token, drop_rx, Instant::now(), 1));
}
} else {
tracing::warn!("dropping event because event `tx` was already closed: `{inner:?}`");
tracing::warn!("dropping event because event `tx` was already closed: `{event:?}`");
}
}
};
Expand Down Expand Up @@ -196,15 +227,18 @@ fn event_stream_loop(

fn handle_pending_drop_tokens(
pending_drop_tokens: &mut Vec<(DropToken, flume::Receiver<()>, Instant, u64)>,
drop_tokens: &mut Vec<DropToken>,
drop_tokens: &mut Vec<DropTokenStatus>,
) -> eyre::Result<()> {
let mut still_pending = Vec::new();
for (token, rx, since, warn) in pending_drop_tokens.drain(..) {
match rx.try_recv() {
Ok(()) => return Err(eyre!("Node API should not send anything on ACK channel")),
Err(flume::TryRecvError::Disconnected) => {
// the event was dropped -> add the drop token to the list
drop_tokens.push(token);
drop_tokens.push(DropTokenStatus {
token,
state: DropTokenState::Dropped,
});
}
Err(flume::TryRecvError::Empty) => {
let duration = Duration::from_secs(30 * warn);
Expand All @@ -221,7 +255,7 @@ fn handle_pending_drop_tokens(

fn report_remaining_drop_tokens(
mut channel: DaemonChannel,
mut drop_tokens: Vec<DropToken>,
mut drop_tokens: Vec<DropTokenStatus>,
mut pending_drop_tokens: Vec<(DropToken, flume::Receiver<()>, Instant, u64)>,
timestamp: Timestamp,
) -> eyre::Result<()> {
Expand All @@ -234,7 +268,10 @@ fn report_remaining_drop_tokens(
Ok(()) => return Err(eyre!("Node API should not send anything on ACK channel")),
Err(flume::RecvTimeoutError::Disconnected) => {
// the event was dropped -> add the drop token to the list
drop_tokens.push(token);
drop_tokens.push(DropTokenStatus {
token,
state: DropTokenState::Dropped,
});
}
Err(flume::RecvTimeoutError::Timeout) => {
let duration = Duration::from_secs(1);
Expand All @@ -259,7 +296,7 @@ fn report_remaining_drop_tokens(
}

fn report_drop_tokens(
drop_tokens: &mut Vec<DropToken>,
drop_tokens: &mut Vec<DropTokenStatus>,
channel: &mut DaemonChannel,
timestamp: Timestamp,
) -> Result<(), eyre::ErrReport> {
Expand Down
35 changes: 21 additions & 14 deletions apis/rust/node/src/node/drop_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ use std::{sync::Arc, time::Duration};
use crate::daemon_connection::DaemonChannel;
use dora_core::{config::NodeId, uhlc};
use dora_message::{
common::{DropTokenState, DropTokenStatus},
daemon_to_node::{DaemonCommunication, DaemonReply, NodeDropEvent},
node_to_daemon::{DaemonRequest, DropToken, Timestamped},
node_to_daemon::{DaemonRequest, Timestamped},
DataflowId,
};
use eyre::{eyre, Context};
use flume::RecvTimeoutError;

pub struct DropStream {
receiver: flume::Receiver<DropToken>,
receiver: flume::Receiver<DropTokenStatus>,
_thread_handle: DropStreamThreadHandle,
}

Expand Down Expand Up @@ -82,7 +83,7 @@ impl DropStream {
}

impl std::ops::Deref for DropStream {
type Target = flume::Receiver<DropToken>;
type Target = flume::Receiver<DropTokenStatus>;

fn deref(&self) -> &Self::Target {
&self.receiver
Expand All @@ -92,7 +93,7 @@ impl std::ops::Deref for DropStream {
#[tracing::instrument(skip(tx, channel, clock))]
fn drop_stream_loop(
node_id: NodeId,
tx: flume::Sender<DropToken>,
tx: flume::Sender<DropTokenStatus>,
mut channel: DaemonChannel,
clock: Arc<uhlc::HLC>,
) {
Expand Down Expand Up @@ -125,16 +126,22 @@ fn drop_stream_loop(
if let Err(err) = clock.update_with_timestamp(&timestamp) {
tracing::warn!("failed to update HLC: {err}");
}
match inner {
NodeDropEvent::OutputDropped { drop_token } => {
if tx.send(drop_token).is_err() {
tracing::warn!(
"drop channel was closed already, could not forward \
drop token`{drop_token:?}`"
);
break 'outer;
}
}
let event = match inner {
NodeDropEvent::OutputMapped { drop_token } => DropTokenStatus {
token: drop_token,
state: DropTokenState::Mapped,
},
NodeDropEvent::OutputDropped { drop_token } => DropTokenStatus {
token: drop_token,
state: DropTokenState::Dropped,
},
};
if tx.send(event).is_err() {
tracing::warn!(
"drop channel was closed already, could not forward \
drop token event `{event:?}`"
);
break 'outer;
}
}
}
Expand Down
Loading

0 comments on commit b73b9ee

Please sign in to comment.