From 5c2c3c3d628e243b0525a05c2181ecee6346f3d1 Mon Sep 17 00:00:00 2001 From: nbabcock Date: Tue, 5 Nov 2024 19:08:30 -0600 Subject: [PATCH] fix: fall back to chunked mode for unknown frame buffer sizes --- src/command.rs | 3 +- src/iter.rs | 185 +++++++++++++++++++++++------------------ src/test.rs | 217 +++++++++++++++++++++++++++++++++++++++++++------ 3 files changed, 298 insertions(+), 107 deletions(-) diff --git a/src/command.rs b/src/command.rs index f25c238..86993b2 100644 --- a/src/command.rs +++ b/src/command.rs @@ -542,7 +542,8 @@ impl FfmpegCommand { //// Preset argument sets for common use cases. /// Generate a procedural test video. Equivalent to `ffmpeg -f lavfi -i - /// testsrc=duration=10`. + /// testsrc=duration=10`. It also inherits defaults from the `testsrc` filter + /// in FFmpeg: `320x240` size and `25` fps. /// /// [FFmpeg `testsrc` filter /// documentation](https://ffmpeg.org/ffmpeg-filters.html#allrgb_002c-allyuv_002c-color_002c-colorchart_002c-colorspectrum_002c-haldclutsrc_002c-nullsrc_002c-pal75bars_002c-pal100bars_002c-rgbtestsrc_002c-smptebars_002c-smptehdbars_002c-testsrc_002c-testsrc2_002c-yuvtestsrc) diff --git a/src/iter.rs b/src/iter.rs index 4f58dd3..7fd4e83 100644 --- a/src/iter.rs +++ b/src/iter.rs @@ -198,83 +198,119 @@ pub fn spawn_stdout_thread( .unwrap_or(false) }); - // Error on mixing rawvideo and non-rawvideo streams - // TODO: Maybe just revert to chunk mode if this happens? - let any_rawvideo = stdout_output_video_streams - .clone() - .any(|s| s.format == "rawvideo"); - let any_non_rawvideo = stdout_output_video_streams - .clone() - .any(|s| s.format != "rawvideo"); - if any_rawvideo && any_non_rawvideo { - panic!("Cannot mix rawvideo and non-rawvideo streams"); + // Exit early if nothing is being sent to stdout + if stdout_output_video_streams.clone().count() == 0 { + return; } - // Prepare buffers - let mut buffers = stdout_output_video_streams + // If the size of a frame can't be determined, it will be read in arbitrary chunks. + let mut chunked_mode = false; + + // Calculate frame buffer sizes up front. + // Any sizes that cannot be calculated will trigger chunked mode. + let frame_buffer_sizes: Vec = stdout_output_video_streams + .clone() .map(|video_stream| { - // Since we filtered for video_streams above, we can unwrap unconditionally. - let buf_size = match video_stream.format.as_str() { - "rawvideo" => { - let Some(video_data) = video_stream.video_data() else { - tx.send(FfmpegEvent::Error( - "Video stream doesn't have any video data".to_owned(), - )) - .ok(); - return Vec::new(); - }; - - let Some(bytes_per_frame) = get_bytes_per_frame(video_data) else { - tx.send(FfmpegEvent::Error( - format!("Can't bytes per fraame for video data {video_data:?}").to_owned(), - )) - .ok(); - return Vec::new(); - }; - - bytes_per_frame as usize - } + // Any non-rawvideo streams instantly enable chunked mode, since it's + // impossible to tell when one chunk ends and another begins. + if video_stream.format != "rawvideo" { + chunked_mode = true; + return 0; + } - // Arbitrary default buffer size for receiving indeterminate chunks - // of any encoder or container output, when frame boundaries are unknown - _ => 32_768, // ~= 32mb (plenty large enough for any chunk of video at reasonable bitrate) + // This is an unexpected error since we've already filtered for video streams. + let Some(video_data) = video_stream.video_data() else { + chunked_mode = true; + return 0; }; - // Catch unsupported pixel formats - assert!( - buf_size > 0, - "Unsupported pixel format with 0 bytes per pixel" - ); + // This may trigger either on an unsupported pixel format, or + // framebuffers with non-byte-aligned sizes. FFmpeg will pad these with + // zeroes, but we can't predict the exact padding or end size on every format. + let Some(bytes_per_frame) = get_bytes_per_frame(video_data) else { + chunked_mode = true; + return 0; + }; - vec![0u8; buf_size] + bytes_per_frame as usize }) - .collect::>>(); + .collect(); - // No buffers probably indicates that output is being sent to file or - // that an error occured. - if buffers.is_empty() { - return; + // Final check: FFmpeg supports multiple outputs interleaved on stdout, + // but we can only keep track of them if the framerates match. It's + // theoretically still possible to determine the expected frame order, + // but it's not currently supported. + let output_framerates: Vec = stdout_output_video_streams + .clone() + .filter(|s| s.format == "rawvideo") + .map(|video_stream| { + if let Some(video_data) = video_stream.video_data() { + return video_data.fps; + } else { + return -1.0; + } + }) + .collect(); + let any_mismatched_framerates = output_framerates + .iter() + .any(|&fps| fps != output_framerates[0] || fps == -1.0); + if any_mismatched_framerates { + // This edge case is probably not what the user was intending, + // so we'll notify with an error. + tx.send(FfmpegEvent::Error( + "Multiple output streams with different framerates are not supported when outputting to stdout. Falling back to chunked mode.".to_owned() + )).ok(); + chunked_mode = true; } - // Read into buffers - let num_buffers = buffers.len(); - let mut buffer_index = (0..buffers.len()).cycle(); let mut reader = BufReader::new(stdout); - let mut frame_num = 0; - loop { - let i = buffer_index.next().unwrap(); - let video_stream = &output_streams[i]; - // Since we filtered for video_streams above, we can unwrap unconditionally. - let video_data = video_stream.video_data().unwrap(); - let buffer = &mut buffers[i]; - let output_frame_num = frame_num / num_buffers; - let timestamp = output_frame_num as f32 / video_data.fps; - frame_num += 1; - - // Handle two scenarios: - match video_stream.format.as_str() { - // 1. `rawvideo` with exactly known pixel layout - "rawvideo" => match reader.read_exact(buffer.as_mut_slice()) { + if chunked_mode { + // Arbitrary default buffer size for receiving indeterminate chunks + // of any encoder or container output, when frame boundaries are unknown + let mut chunk_buffer = vec![0u8; 65_536]; + loop { + match reader.read(chunk_buffer.as_mut_slice()) { + Ok(0) => break, + Ok(bytes_read) => { + let mut data = vec![0; bytes_read]; + data.clone_from_slice(&chunk_buffer[..bytes_read]); + tx.send(FfmpegEvent::OutputChunk(data)).ok() + } + Err(e) => match e.kind() { + ErrorKind::UnexpectedEof => break, + e => tx.send(FfmpegEvent::Error(e.to_string())).ok(), + }, + }; + } + } else { + // Prepare frame buffers + let mut frame_buffers = frame_buffer_sizes + .iter() + .map(|&size| vec![0u8; size]) + .collect::>>(); + + // Empty buffer array is unexpected at this point, since we've already ruled out + // both chunked mode and non-stdout streams. + if frame_buffers.is_empty() { + tx.send(FfmpegEvent::Error("No frame buffers found".to_owned())) + .ok(); + return; + } + + // Read into buffers + let num_frame_buffers = frame_buffers.len(); + let mut frame_buffer_index = (0..frame_buffers.len()).cycle(); + let mut frame_num = 0; + loop { + let i = frame_buffer_index.next().unwrap(); + let video_stream = &output_streams[i]; + let video_data = video_stream.video_data().unwrap(); + let buffer = &mut frame_buffers[i]; + let output_frame_num = frame_num / num_frame_buffers; + let timestamp = output_frame_num as f32 / video_data.fps; + frame_num += 1; + + match reader.read_exact(buffer.as_mut_slice()) { Ok(_) => tx .send(FfmpegEvent::OutputFrame(OutputVideoFrame { width: video_data.width, @@ -290,23 +326,10 @@ pub fn spawn_stdout_thread( ErrorKind::UnexpectedEof => break, e => tx.send(FfmpegEvent::Error(e.to_string())).ok(), }, - }, - - // 2. Anything else, with unknown buffer size - _ => match reader.read(buffer.as_mut_slice()) { - Ok(0) => break, - Ok(bytes_read) => { - let mut data = vec![0; bytes_read]; - data.clone_from_slice(&buffer[..bytes_read]); - tx.send(FfmpegEvent::OutputChunk(data)).ok() - } - Err(e) => match e.kind() { - ErrorKind::UnexpectedEof => break, - e => tx.send(FfmpegEvent::Error(e.to_string())).ok(), - }, - }, - }; + }; + } } + tx.send(FfmpegEvent::Done).ok(); }) } diff --git a/src/test.rs b/src/test.rs index 97c46fa..2e95ecf 100644 --- a/src/test.rs +++ b/src/test.rs @@ -2,7 +2,7 @@ use std::{sync::mpsc, thread, time::Duration}; use crate::{ command::{ffmpeg_is_installed, FfmpegCommand}, - event::FfmpegEvent, + event::{FfmpegEvent, LogLevel}, version::ffmpeg_version, }; @@ -10,6 +10,36 @@ fn approx_eq(a: f32, b: f32, error: f32) -> bool { (a - b).abs() < error } +/// Returns `Err` if the timeout thread finishes before the FFmpeg process +fn spawn_with_timeout(command: &mut FfmpegCommand, timeout: u64) -> anyhow::Result<()> { + let (sender, receiver) = mpsc::channel(); + + // Thread 1: Waits for 1000ms and sends a message + let timeout_sender = sender.clone(); + thread::spawn(move || { + thread::sleep(Duration::from_millis(timeout)); + timeout_sender.send("timeout").ok(); + }); + + // Thread 2: Consumes the FFmpeg events and sends a message + let mut ffmpeg_child = command.spawn()?; + let iter = ffmpeg_child.iter()?; + thread::spawn(move || { + iter.for_each(|_| {}); + // Note: `.wait()` would not work here, because it closes `stdin` automatically + sender.send("ffmpeg").ok(); + }); + + // Race the two threads + let finished_first = receiver.recv()?; + ffmpeg_child.kill()?; + match finished_first { + "timeout" => anyhow::bail!("Timeout thread expired before FFmpeg"), + "ffmpeg" => Ok(()), + _ => anyhow::bail!("Unknown message received"), + } +} + #[test] fn test_installed() { assert!(ffmpeg_is_installed()); @@ -94,6 +124,39 @@ fn test_deterministic() { assert!(vec1 == vec2) } +/// Pass simple raw pixels across stdin and stdout to check that the frame +/// buffers are pixel-perfect across multiple frames. +#[test] +fn test_passthrough() -> anyhow::Result<()> { + use std::io::Write; + + let mut child = FfmpegCommand::new() + .args("-f rawvideo -pix_fmt rgb24 -s 2x2 -i - -f rawvideo -pix_fmt rgb24 -".split(' ')) + .spawn()?; + + // Send hardcoded RGB values over stdin as three identical 2x2 frames + let input_raw_pixels = vec![255, 0, 0, 0, 255, 0, 0, 0, 255, 255, 255, 255]; + let mut stdin = child.take_stdin().unwrap(); + stdin.write_all(&input_raw_pixels)?; + stdin.write_all(&input_raw_pixels)?; + stdin.write_all(&input_raw_pixels)?; + stdin.flush()?; + drop(stdin); // otherwise FFmpeg will hang waiting for more input + + let output_raw_pixels: Vec> = child + .iter()? + .filter_frames() + .map(|frame| frame.data) + .collect(); + + assert!(output_raw_pixels.len() == 3); + assert!(input_raw_pixels == output_raw_pixels[0]); + assert!(input_raw_pixels == output_raw_pixels[1]); + assert!(input_raw_pixels == output_raw_pixels[2]); + + Ok(()) +} + #[test] fn test_to_file() { FfmpegCommand::new() @@ -480,32 +543,136 @@ fn test_named_pipe() -> anyhow::Result<()> { Ok(()) } -/// Returns `Err` if the timeout thread finishes before the FFmpeg process -fn spawn_with_timeout(command: &mut FfmpegCommand, timeout: u64) -> anyhow::Result<()> { - let (sender, receiver) = mpsc::channel(); +/// Ensure non-byte-aligned pixel formats are still processed correctly. +/// YUV420 has 12 bits per pixel, but the whole frame buffer will still be +/// enforced to be byte-aligned. +/// See +#[test] +fn test_yuv420() -> anyhow::Result<()> { + let iter = FfmpegCommand::new() + .hide_banner() + .testsrc() + .format("rawvideo") + .pix_fmt("yuv420p") + .pipe_stdout() + .spawn()? + .iter()?; + + let mut frames_received = 0; + + for event in iter { + match event { + FfmpegEvent::OutputFrame(frame) => { + frames_received += 1; + assert!(frame.pix_fmt == "yuv420p"); + // Expect 12 bits per pixel, but with an assumption that valid sizes + // will still result in a byte-aligned frame buffer (divisible by 8). + assert!(frame.data.len() as u32 == frame.width * frame.height * 12 / 8); + } + FfmpegEvent::Log(LogLevel::Error | LogLevel::Fatal, _) => { + panic!("Error or fatal log message received"); + } + _ => {} + } + } - // Thread 1: Waits for 1000ms and sends a message - let timeout_sender = sender.clone(); - thread::spawn(move || { - thread::sleep(Duration::from_millis(timeout)); - timeout_sender.send("timeout").ok(); - }); + assert!(frames_received == 10 * 25); // 10 seconds at 25 fps - // Thread 2: Consumes the FFmpeg events and sends a message - let mut ffmpeg_child = command.spawn()?; - let iter = ffmpeg_child.iter()?; - thread::spawn(move || { - iter.for_each(|_| {}); - // Note: `.wait()` would not work here, because it closes `stdin` automatically - sender.send("ffmpeg").ok(); - }); + Ok(()) +} - // Race the two threads - let finished_first = receiver.recv()?; - ffmpeg_child.kill()?; - match finished_first { - "timeout" => anyhow::bail!("Timeout thread expired before FFmpeg"), - "ffmpeg" => Ok(()), - _ => anyhow::bail!("Unknown message received"), +/// Make sure that the iterator doesn't hang forever if there's an invalid +/// framebuffer size; instead, it should fall back to chunked mode. +/// See +#[test] +fn test_yuv420_invalid_size() -> anyhow::Result<()> { + let iter = FfmpegCommand::new() + .hide_banner() + .format("lavfi") + .input("testsrc=duration=10:size=321x241") + .format("rawvideo") + .pix_fmt("yuv420p") + .pipe_stdout() + .spawn()? + .iter()?; + + let mut chunks_received = 0; + + for event in iter { + match event { + FfmpegEvent::OutputFrame(_) => { + panic!("Should not use OutputFrame for non-byte-aligned sizes"); + } + FfmpegEvent::OutputChunk(_) => { + chunks_received += 1; + } + FfmpegEvent::Log(LogLevel::Error | LogLevel::Fatal, _) => { + panic!("Error or fatal log message received"); + } + _ => {} + } } + + assert!(chunks_received > 0); + + Ok(()) +} + +/// Multiple `rawvideo` outputs can be interleaved on stdout. +#[test] +fn test_stdout_interleaved_frames() -> anyhow::Result<()> { + let iter = FfmpegCommand::new() + .testsrc() + .rawvideo() + .testsrc() + .rawvideo() + .spawn()? + .iter()? + .filter_frames(); + + let mut output_1_frames = 0; + let mut output_2_frames = 0; + + for frame in iter { + match frame.output_index { + 0 => output_1_frames += 1, + 1 => output_2_frames += 1, + _ => panic!("Unexpected stream index"), + } + } + + assert!(output_1_frames == 10 * 25); // 10 sec at 25fps + assert!(output_2_frames == 10 * 25); // 10 sec at 25fps + + Ok(()) +} + +/// Multiple interleaved outputs can't be supported with non-uniform framerate. +#[test] +fn test_stdout_interleaved_frames_fallback() -> anyhow::Result<()> { + let iter = FfmpegCommand::new() + .testsrc() + .rate(25.0) + .rawvideo() + .testsrc() + .rate(30.0) + .rawvideo() + .spawn()? + .iter()?; + + let mut output_chunks = 0; + for event in iter { + match event { + FfmpegEvent::OutputFrame(_) => { + panic!("Should not use OutputFrame for interleaved streams"); + } + FfmpegEvent::OutputChunk(_) => { + output_chunks += 1; + } + _ => {} + } + } + assert!(output_chunks > 0); + + Ok(()) }