Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MVP Support for inline-rendering of Rerun within jupyter notebooks #1798

Merged
merged 26 commits into from
Apr 14, 2023
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
4797e6a
Introduce the ability to push an rrd binary via iframe.contentWindow.…
jleibs Apr 7, 2023
e7af3bb
New API to output the current buffered messages as a cell in jupyter
jleibs Apr 7, 2023
71160e4
Example notebook with the cube demo
jleibs Apr 7, 2023
f2b176b
Don't unwrap
jleibs Apr 7, 2023
6cc644f
comment
jleibs Apr 7, 2023
bd38b3c
Track that we need to send another recording msg after draining the b…
jleibs Apr 8, 2023
b06a28f
Merge branch 'main' into jleibs/jupyter_mvp
teh-cmc Apr 8, 2023
1bd6ada
Dynamically resolve the app location based on git commit. Allow overr…
jleibs Apr 11, 2023
e9ca84f
Add some crude timeout logic in case the iframe fails to load
jleibs Apr 11, 2023
4ec8532
Fix lint
jleibs Apr 11, 2023
7084497
Remove the inlined rrd from the cube notebook
jleibs Apr 11, 2023
04adf4e
Fix comment
jleibs Apr 11, 2023
4d53b69
js_val needs debug print
jleibs Apr 12, 2023
679df3b
Merge branch 'main' into jleibs/jupyter_mvp
jleibs Apr 12, 2023
1e5adff
Don't persist app state in notebooks
jleibs Apr 12, 2023
4e3d975
Merge branch 'main' into jleibs/jupyter_mvp
jleibs Apr 12, 2023
19182a7
Introduce new MemoryRecording for use with Jupyter notebooks (#1834)
jleibs Apr 13, 2023
a135cd4
PR cleanup
jleibs Apr 13, 2023
79f2c75
Merge branch 'main' into jleibs/jupyter_mvp
jleibs Apr 13, 2023
20bb4ad
Undo port change -- will move to isolated PR
jleibs Apr 13, 2023
33ea1a9
Merge branch 'main' into jleibs/jupyter_mvp
jleibs Apr 13, 2023
31237fe
descriptive must_use
jleibs Apr 13, 2023
f6f5b7d
Refactor the relationship between the assorted web / websocket server…
jleibs Apr 14, 2023
7d7aee8
Merge branch 'main' into jleibs/jupyter_mvp
jleibs Apr 14, 2023
bd2ebb7
Small PR cleanups
jleibs Apr 14, 2023
49ecf53
Ad Readme and expand notebook example
jleibs Apr 14, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/re_log_encoding/src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ impl<W: std::io::Write> Encoder<W> {

pub fn encode<'a>(
messages: impl Iterator<Item = &'a LogMsg>,
write: impl std::io::Write,
write: &mut impl std::io::Write,
) -> Result<(), EncodeError> {
let mut encoder = Encoder::new(write)?;
for message in messages {
Expand Down
54 changes: 49 additions & 5 deletions crates/re_log_encoding/src/stream_rrd_from_http.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::Arc;

use re_log_types::LogMsg;

pub fn stream_rrd_from_http_to_channel(url: String) -> re_smart_channel::Receiver<LogMsg> {
Expand All @@ -6,14 +8,14 @@ pub fn stream_rrd_from_http_to_channel(url: String) -> re_smart_channel::Receive
});
stream_rrd_from_http(
url,
Box::new(move |msg| {
Arc::new(move |msg| {
tx.send(msg).ok();
}),
);
rx
}

pub fn stream_rrd_from_http(url: String, on_msg: Box<dyn Fn(LogMsg) + Send>) {
pub fn stream_rrd_from_http(url: String, on_msg: Arc<dyn Fn(LogMsg) + Send + Sync>) {
emilk marked this conversation as resolved.
Show resolved Hide resolved
re_log::debug!("Downloading .rrd file from {url:?}…");

// TODO(emilk): stream the http request, progressively decoding the .rrd file.
Expand All @@ -36,9 +38,50 @@ pub fn stream_rrd_from_http(url: String, on_msg: Box<dyn Fn(LogMsg) + Send>) {
});
}

#[cfg(target_arch = "wasm32")]
mod web_event_listener {
use js_sys::Uint8Array;
use re_log_types::LogMsg;
use std::sync::Arc;
use wasm_bindgen::{closure::Closure, JsCast, JsValue};
use web_sys::MessageEvent;

/// Install an event-listener on `window` which will decode the incoming event as an rrd
///
/// From javascript you can send an rrd using:
/// ```
jleibs marked this conversation as resolved.
Show resolved Hide resolved
/// var rrd = new Uint8Array(...); // Get an RRD from somewhere
/// window.postMessage(rrd, "*")
/// ```
pub fn stream_rrd_from_event_listener(on_msg: Arc<dyn Fn(LogMsg) + Send>) {
let window = web_sys::window().expect("no global `window` exists");
let closure =
Closure::wrap(Box::new(
move |event: JsValue| match event.dyn_into::<MessageEvent>() {
Ok(message_event) => {
let uint8_array = Uint8Array::new(&message_event.data());
let result: Vec<u8> = uint8_array.to_vec();

crate::stream_rrd_from_http::decode_rrd(result, on_msg.clone());
}
Err(js_val) => {
re_log::error!("Incoming event was not a MessageEvent. {:?}", js_val);
}
},
) as Box<dyn FnMut(_)>);
window
.add_event_listener_with_callback("message", closure.as_ref().unchecked_ref())
.unwrap();
closure.forget();
}
}

#[cfg(target_arch = "wasm32")]
pub use web_event_listener::stream_rrd_from_event_listener;

#[cfg(not(target_arch = "wasm32"))]
#[allow(clippy::needless_pass_by_value)] // must match wasm version
fn decode_rrd(rrd_bytes: Vec<u8>, on_msg: Box<dyn Fn(LogMsg) + Send>) {
fn decode_rrd(rrd_bytes: Vec<u8>, on_msg: Arc<dyn Fn(LogMsg) + Send>) {
match crate::decoder::Decoder::new(rrd_bytes.as_slice()) {
Ok(decoder) => {
for msg in decoder {
Expand All @@ -61,15 +104,16 @@ fn decode_rrd(rrd_bytes: Vec<u8>, on_msg: Box<dyn Fn(LogMsg) + Send>) {
#[cfg(target_arch = "wasm32")]
mod web_decode {
use re_log_types::LogMsg;
use std::sync::Arc;

pub fn decode_rrd(rrd_bytes: Vec<u8>, on_msg: Box<dyn Fn(LogMsg) + Send>) {
pub fn decode_rrd(rrd_bytes: Vec<u8>, on_msg: Arc<dyn Fn(LogMsg) + Send>) {
wasm_bindgen_futures::spawn_local(decode_rrd_async(rrd_bytes, on_msg));
}

/// Decodes the file in chunks, with an yield between each chunk.
///
/// This is cooperative multi-tasking.
async fn decode_rrd_async(rrd_bytes: Vec<u8>, on_msg: Box<dyn Fn(LogMsg) + Send>) {
async fn decode_rrd_async(rrd_bytes: Vec<u8>, on_msg: Arc<dyn Fn(LogMsg) + Send>) {
let mut last_yield = instant::Instant::now();

match crate::decoder::Decoder::new(rrd_bytes.as_slice()) {
Expand Down
4 changes: 3 additions & 1 deletion crates/re_sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,9 @@ pub mod demo_util;
/// This is how you select whether the log stream ends up
/// sent over TCP, written to file, etc.
pub mod sink {
pub use crate::log_sink::{disabled, BufferedSink, LogSink, TcpSink};
pub use crate::log_sink::{
disabled, BufferedSink, LogSink, MemorySink, MemorySinkStorage, TcpSink,
};

#[cfg(not(target_arch = "wasm32"))]
pub use re_log_encoding::{FileSink, FileSinkError};
Expand Down
47 changes: 47 additions & 0 deletions crates/re_sdk/src/log_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,53 @@ impl LogSink for BufferedSink {
}
}

/// Store log messages directly in memory
///
/// Although very similar to `BufferedSink` this sink is a real-endpoint. When creating
/// a new sink the logged messages stay with the `MemorySink` (`drain_backlog` does nothing).
///
/// Additionally the raw storage can be accessed and used to create an in-memory RRD.
/// This is useful for things like the inline rrd-viewer in Jupyter notebooks.
#[derive(Default)]
pub struct MemorySink(MemorySinkStorage);

impl MemorySink {
/// Access the raw `MemorySinkStorage`
pub fn buffer(&self) -> MemorySinkStorage {
self.0.clone()
}
}

impl LogSink for MemorySink {
fn send(&self, msg: LogMsg) {
self.0.lock().push(msg);
}

fn send_all(&self, mut messages: Vec<LogMsg>) {
self.0.lock().append(&mut messages);
}
}

/// The storage used by [`MemorySink`]
#[derive(Default, Clone)]
pub struct MemorySinkStorage(std::sync::Arc<parking_lot::Mutex<Vec<LogMsg>>>);

///
impl MemorySinkStorage {
/// Lock the contained buffer
fn lock(&self) -> parking_lot::MutexGuard<'_, Vec<LogMsg>> {
self.0.lock()
}

/// Convert the stored messages into an in-memory Rerun log file
pub fn rrd_as_bytes(&self) -> Result<Vec<u8>, re_log_encoding::encoder::EncodeError> {
let messages = self.lock();
let mut buffer = std::io::Cursor::new(Vec::new());
re_log_encoding::encoder::encode(messages.iter(), &mut buffer)?;
Ok(buffer.into_inner())
}
}

// ----------------------------------------------------------------------------

/// Stream log messages to a Rerun TCP server.
Expand Down
7 changes: 6 additions & 1 deletion crates/re_smart_channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,11 @@ pub enum Source {
/// Streaming an `.rrd` file over http.
RrdHttpStream { url: String },

/// Loading an `.rrd` file from a `postMessage` js event
///
/// Only applicable to web browser iframes
RrdWebEventListener,

/// The source is the logging sdk directly, same process.
Sdk,

Expand All @@ -36,7 +41,7 @@ pub enum Source {
impl Source {
pub fn is_network(&self) -> bool {
match self {
Self::File { .. } | Self::Sdk => false,
Self::File { .. } | Self::Sdk | Self::RrdWebEventListener => false,
Self::RrdHttpStream { .. } | Self::WsClient { .. } | Self::TcpServer { .. } => true,
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/re_viewer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ winapi = "0.3.9"
[target.'cfg(target_arch = "wasm32")'.dependencies]
console_error_panic_hook = "0.1.6"
wasm-bindgen-futures = "0.4"

web-sys = { version = "0.3.52", features = ["Window"] }

[build-dependencies]
re_build_build_info.workspace = true
9 changes: 6 additions & 3 deletions crates/re_viewer/src/app.rs
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,9 @@ fn wait_screen_ui(ui: &mut egui::Ui, rx: &Receiver<LogMsg>) {
re_smart_channel::Source::RrdHttpStream { url } => {
ui.strong(format!("Loading {url}…"));
}
re_smart_channel::Source::RrdWebEventListener => {
ready_and_waiting(ui, "Waiting for logging data…");
}
re_smart_channel::Source::Sdk => {
ready_and_waiting(ui, "Waiting for logging data from SDK");
}
Expand Down Expand Up @@ -1885,9 +1888,9 @@ fn new_recording_confg(
let play_state = match data_source {
// Play files from the start by default - it feels nice and alive./
// RrdHttpStream downloads the whole file before decoding it, so we treat it the same as a file.
re_smart_channel::Source::File { .. } | re_smart_channel::Source::RrdHttpStream { .. } => {
PlayState::Playing
}
re_smart_channel::Source::File { .. }
| re_smart_channel::Source::RrdHttpStream { .. }
| re_smart_channel::Source::RrdWebEventListener => PlayState::Playing,

// Live data - follow it!
re_smart_channel::Source::Sdk
Expand Down
1 change: 1 addition & 0 deletions crates/re_viewer/src/viewer_analytics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ impl ViewerAnalytics {
let data_source = match data_source {
re_smart_channel::Source::File { .. } => "file", // .rrd
re_smart_channel::Source::RrdHttpStream { .. } => "http",
re_smart_channel::Source::RrdWebEventListener { .. } => "web_event",
re_smart_channel::Source::Sdk => "sdk", // show()
re_smart_channel::Source::WsClient { .. } => "ws_client", // spawn()
re_smart_channel::Source::TcpServer { .. } => "tcp_server", // connect()
Expand Down
33 changes: 31 additions & 2 deletions crates/re_viewer/src/web.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use eframe::wasm_bindgen::{self, prelude::*};
use std::sync::Arc;

use re_memory::AccountingAllocator;

Expand Down Expand Up @@ -54,7 +55,30 @@ pub async fn start(
let egui_ctx = cc.egui_ctx.clone();
re_log_encoding::stream_rrd_from_http::stream_rrd_from_http(
url,
Box::new(move |msg| {
Arc::new(move |msg| {
egui_ctx.request_repaint(); // wake up ui thread
tx.send(msg).ok();
}),
);

Box::new(crate::App::from_receiver(
build_info,
&app_env,
startup_options,
re_ui,
cc.storage,
rx,
std::sync::Arc::new(std::sync::atomic::AtomicBool::new(false)),
))
}
EndpointCategory::WebEventListener => {
// Process an rrd when it's posted via `window.postMessage`
let (tx, rx) = re_smart_channel::smart_channel(
re_smart_channel::Source::RrdWebEventListener,
);
let egui_ctx = cc.egui_ctx.clone();
re_log_encoding::stream_rrd_from_http::stream_rrd_from_event_listener(
Arc::new(move |msg| {
egui_ctx.request_repaint(); // wake up ui thread
tx.send(msg).ok();
}),
Expand Down Expand Up @@ -95,13 +119,18 @@ enum EndpointCategory {

/// A remote Rerun server.
WebSocket(String),

/// An eventListener for rrd posted from containing html
WebEventListener,
}

fn categorize_uri(mut uri: String) -> EndpointCategory {
if uri.starts_with("http") || uri.ends_with(".rrd") {
EndpointCategory::HttpRrd(uri)
} else if uri.starts_with("ws") {
} else if uri.starts_with("ws:") || uri.starts_with("wss:") {
EndpointCategory::WebSocket(uri)
} else if uri.starts_with("web_event:") {
EndpointCategory::WebEventListener
} else {
// If this is sometyhing like `foo.com` we can't know what it is until we connect to it.
// We could/should connect and see what it is, but for now we just take a wild guess instead:
Expand Down
42 changes: 40 additions & 2 deletions crates/rerun/src/web_viewer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ pub async fn host_web_viewer(
re_log::info!("Web server is running - view it at {viewer_url}");
if open_browser {
webbrowser::open(&viewer_url).ok();
} else {
re_log::info!("Web server is running - view it at {viewer_url}");
}

web_server_handle.await?
Expand Down Expand Up @@ -100,3 +98,43 @@ impl crate::sink::LogSink for RemoteViewerServer {
pub fn new_sink(open_browser: bool) -> Box<dyn crate::sink::LogSink> {
Box::new(RemoteViewerServer::new(open_browser))
}

/// Hosts the rerun web assets only
emilk marked this conversation as resolved.
Show resolved Hide resolved
pub struct HostAssets {
emilk marked this conversation as resolved.
Show resolved Hide resolved
web_port: u16,
shutdown_tx: tokio::sync::broadcast::Sender<()>,
}

impl Drop for HostAssets {
fn drop(&mut self) {
re_log::info!("Shutting down web server.");
self.shutdown_tx.send(()).ok();
}
}

impl HostAssets {
/// Create new web server hosting rerun assets on `web_port`
///
/// The caller needs to ensure that there is a `tokio` runtime running.
#[cfg(feature = "web_viewer")]
pub fn new(web_port: u16) -> Self {
let (shutdown_tx, shutdown_rx_web_server) = tokio::sync::broadcast::channel(1);

tokio::spawn(async move {
let web_server = re_web_viewer_server::WebViewerServer::new(web_port);
let web_server_handle = tokio::spawn(web_server.serve(shutdown_rx_web_server));

web_server_handle.await.unwrap().unwrap();
jleibs marked this conversation as resolved.
Show resolved Hide resolved
});

Self {
web_port,
shutdown_tx,
}
}

/// Get the port where the web assets are hosted
pub fn get_port(&self) -> u16 {
jleibs marked this conversation as resolved.
Show resolved Hide resolved
self.web_port
}
}
Loading