Skip to content

Commit

Permalink
Expose flush on Python recording stream and call it on context manage…
Browse files Browse the repository at this point in the history
…r exit (#8911)
  • Loading branch information
Wumpf authored Feb 4, 2025
1 parent 92a5dfa commit b1aa3ad
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 19 deletions.
1 change: 1 addition & 0 deletions crates/top/re_sdk/src/log_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ pub trait LogSink: Send + Sync + 'static {

/// Blocks until all pending data in the sink's send buffers has been fully flushed.
///
/// If applicable, this should flush all data to any underlying OS-managed file descriptors.
/// See also [`LogSink::drop_if_disconnected`].
fn flush_blocking(&self);

Expand Down
6 changes: 3 additions & 3 deletions crates/top/re_sdk/src/recording_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -696,9 +696,9 @@ impl RecordingStreamBuilder {
/// thread originally sent them in, from its point of view.
/// - There isn't any well defined global order across multiple threads.
///
/// This means that e.g. flushing the pipeline ([`Self::flush_blocking`]) guarantees that all
/// previous data sent by the calling thread has been recorded; no more, no less.
/// (e.g. it does not mean that all file caches are flushed)
/// This means that e.g. flushing the pipeline ([`Self::flush_blocking`]) guarantees that all previous data sent by the calling thread
/// has been recorded and (if applicable) flushed to the underlying OS-managed file descriptor,
/// but other threads may still have data in flight.
///
/// ## Shutdown
///
Expand Down
18 changes: 18 additions & 0 deletions rerun_py/rerun_sdk/rerun/recording_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,10 @@ class RecordingStream:
to the wrong stream. See: <https://github.com/rerun-io/rerun/issues/6238>. You can work around this
by using the [`rerun.recording_stream_generator_ctx`][] decorator.
Flushing or context manager exit guarantees that all previous data sent by the calling thread
has been recorded and (if applicable) flushed to the underlying OS-managed file descriptor,
but other threads may still have data in flight.
See also: [`rerun.get_data_recording`][], [`rerun.get_global_data_recording`][],
[`rerun.get_thread_local_data_recording`][].
Expand Down Expand Up @@ -242,6 +246,8 @@ def __enter__(self): # type: ignore[no-untyped-def]
return self

def __exit__(self, type, value, traceback): # type: ignore[no-untyped-def]
self.flush(blocking=True)

current_recording = active_recording_stream.get(None)

# Restore the context state
Expand All @@ -263,6 +269,18 @@ def __exit__(self, type, value, traceback): # type: ignore[no-untyped-def]
def to_native(self: RecordingStream | None) -> bindings.PyRecordingStream | None:
return self.inner if self is not None else None

def flush(self, blocking: bool = True) -> None:
"""
Initiates a flush the batching pipeline and optionally waits for it to propagate to the underlying file descriptor (if any).
Parameters
----------
blocking:
If true, the flush will block until the flush is complete.
"""
bindings.flush(blocking, recording=self.to_native())

def __del__(self): # type: ignore[no-untyped-def]
recording = RecordingStream.to_native(self)
# TODO(jleibs): I'm 98% sure this flush is redundant, but removing it requires more thorough testing.
Expand Down
30 changes: 14 additions & 16 deletions rerun_py/tests/unit/test_dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ def test_load_recording() -> None:
with tempfile.TemporaryDirectory() as tmpdir:
rrd = tmpdir + "/tmp.rrd"

rr.init("rerun_example_test_recording")
rr.set_time_sequence("my_index", 1)
rr.log("log", rr.TextLog("Hello"))
rr.save(rrd)
with rr.new_recording("rerun_example_test_recording") as rec:
rr.save(rrd, recording=rec)
rr.set_time_sequence("my_index", 1, recording=rec)
rr.log("log", rr.TextLog("Hello"), recording=rec)

recording = rr.dataframe.load_recording(rrd)
assert recording is not None
Expand All @@ -46,18 +46,16 @@ def test_load_recording() -> None:

class TestDataframe:
def setup_method(self) -> None:
rr.init(APP_ID, recording_id=RECORDING_ID)

rr.set_time_sequence("my_index", 1)
rr.log("points", rr.Points3D([[1, 2, 3], [4, 5, 6], [7, 8, 9]], radii=[]))
rr.set_time_sequence("my_index", 7)
rr.log("points", rr.Points3D([[10, 11, 12]], colors=[[255, 0, 0]]))
rr.log("static_text", rr.TextLog("Hello"), static=True)

with tempfile.TemporaryDirectory() as tmpdir:
rrd = tmpdir + "/tmp.rrd"

rr.save(rrd)
with rr.new_recording(APP_ID, recording_id=RECORDING_ID) as rec:
rr.save(rrd, recording=rec)
rr.set_time_sequence("my_index", 1, recording=rec)
rr.log("points", rr.Points3D([[1, 2, 3], [4, 5, 6], [7, 8, 9]], radii=[]), recording=rec)
rr.set_time_sequence("my_index", 7, recording=rec)
rr.log("points", rr.Points3D([[10, 11, 12]], colors=[[255, 0, 0]]), recording=rec)
rr.log("static_text", rr.TextLog("Hello"), static=True, recording=rec)

self.recording = rr.dataframe.load_recording(rrd)

Expand Down Expand Up @@ -390,9 +388,9 @@ def test_roundtrip_send(self) -> None:
with tempfile.TemporaryDirectory() as tmpdir:
rrd = tmpdir + "/tmp.rrd"

rr.init("rerun_example_test_recording")
rr.dataframe.send_dataframe(df)
rr.save(rrd)
with rr.new_recording("rerun_example_test_recording") as rec:
rr.save(rrd, recording=rec)
rr.dataframe.send_dataframe(df, rec=rec)

round_trip_recording = rr.dataframe.load_recording(rrd)

Expand Down

0 comments on commit b1aa3ad

Please sign in to comment.