Skip to content

Commit

Permalink
alright
Browse files Browse the repository at this point in the history
  • Loading branch information
JonnyBurger committed Feb 12, 2025
1 parent 156e5e7 commit fd011ce
Show file tree
Hide file tree
Showing 11 changed files with 108 additions and 76 deletions.
Binary file modified packages/compositor-darwin-arm64/remotion
Binary file not shown.
Binary file modified packages/compositor-darwin-x64/remotion
Binary file not shown.
Binary file modified packages/compositor-linux-arm64-gnu/remotion
Binary file not shown.
Binary file modified packages/compositor-linux-arm64-musl/remotion
Binary file not shown.
Binary file modified packages/compositor-linux-x64-gnu/remotion
Binary file not shown.
Binary file modified packages/compositor-linux-x64-musl/remotion
Binary file not shown.
Binary file modified packages/compositor-win32-x64-msvc/remotion.exe
Binary file not shown.
13 changes: 6 additions & 7 deletions packages/compositor/rust/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use commands::execute_command_and_print;
use errors::ErrorWithBacktrace;
use global_printer::{_print_verbose, print_error, set_verbose_logging};
use memory::get_ideal_maximum_frame_cache_size;
use select_right_thread::THREAD_MAP;
use std::sync::mpsc::{self, Sender};
use std::sync::Mutex;
use std::{env, thread::JoinHandle};
Expand Down Expand Up @@ -83,7 +84,6 @@ pub struct LongRunningProcess {
receive_video_stats_in_main_thread_handles: Vec<mpsc::Receiver<OpenVideoStats>>,
receive_close_video_in_main_thread_handles: Vec<mpsc::Receiver<()>>,
receive_free_in_main_thread_handles: Vec<mpsc::Receiver<()>>,
thread_map: select_right_thread::ThreadMap,
finish_thread_handles: Mutex<Vec<JoinHandle<()>>>,
}

Expand All @@ -94,7 +94,7 @@ impl LongRunningProcess {
vec![];
let receive_close_video_in_main_thread_handles: Vec<mpsc::Receiver<()>> = vec![];
let receive_free_in_main_thread_handles: Vec<mpsc::Receiver<()>> = vec![];
let thread_map = select_right_thread::ThreadMap::new(threads);
THREAD_MAP.lock().unwrap().set_max_thread_count(threads);
let finish_thread_handles = Mutex::new(vec![]);

let map = LongRunningProcess {
Expand All @@ -103,7 +103,6 @@ impl LongRunningProcess {
receive_video_stats_in_main_thread_handles,
receive_close_video_in_main_thread_handles,
receive_free_in_main_thread_handles,
thread_map,
finish_thread_handles,
};
map
Expand Down Expand Up @@ -192,17 +191,17 @@ impl LongRunningProcess {
) -> Result<(), ErrorWithBacktrace> {
let _result: Result<(), ErrorWithBacktrace> = match opts.payload {
CliInputCommandPayload::ExtractFrame(command) => {
let thread_id = self.thread_map.select_right_thread(
let thread_to_use = THREAD_MAP.lock().unwrap().select_right_thread(
&command.src,
command.time,
command.transparent,
)?;
if thread_id == self.send_to_thread_handles.len() {
self.start_new_thread(thread_id)?;
if thread_to_use.thread_id == self.send_to_thread_handles.len() {
self.start_new_thread(thread_to_use.thread_id)?;
}

let input_to_send = parse_cli(&input)?;
self.send_to_thread_handles[thread_id].send(input_to_send)?;
self.send_to_thread_handles[thread_to_use.thread_id].send(input_to_send)?;
Ok(())
}
CliInputCommandPayload::GetOpenVideoStats(_) => {
Expand Down
13 changes: 13 additions & 0 deletions packages/compositor/rust/opened_video_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use crate::{
logger::log_callback,
opened_stream::{self, calc_position, OpenedStream},
opened_video::is_frame_cache_empty,
select_right_thread::{OpenStream, THREAD_MAP},
};

pub struct OpenedVideoManager {
Expand Down Expand Up @@ -140,6 +141,18 @@ impl OpenedVideoManager {
frame_cache_manager,
thread_index,
)?;
THREAD_MAP.lock().unwrap().update_stream(
thread_index,
stream_index,
OpenStream {
src: video.src.clone(),
last_time: video.last_position.unwrap() as f64
/ (video.time_base.1 as f64 / video.time_base.0 as f64),
id: stream_index,
transparent: video.transparent,
},
);

Ok(frame)
}

Expand Down
157 changes: 88 additions & 69 deletions packages/compositor/rust/select_right_thread.rs
Original file line number Diff line number Diff line change
@@ -1,103 +1,122 @@
use std::sync::Mutex;

use crate::{errors::ErrorWithBacktrace, global_printer::_print_verbose};
use lazy_static::lazy_static;

struct OpenStream {
src: String,
last_time: f64,
transparent: bool,
pub struct OpenStream {
pub src: String,
pub last_time: f64,
pub transparent: bool,
pub id: usize,
}

struct ThreadStreams {
streams: Vec<OpenStream>,
}

pub struct ThreadMap {
map: Vec<Option<ThreadStreams>>,
threads: usize,
map: Vec<ThreadStreams>,
max_threads: Option<usize>,
}

#[derive(PartialEq, Debug, Clone)]
pub struct UseThisThread {
pub thread_id: usize,
pub stream_id: Option<usize>,
}

impl ThreadMap {
pub fn new(t: usize) -> Self {
let mut map = ThreadMap {
pub fn new() -> Self {
let map = ThreadMap {
map: Vec::new(),
threads: t,
max_threads: None,
};
for _ in 0..t {
map.map.push(None);
}
map
}

pub fn set_max_thread_count(&mut self, max_threads: usize) {
self.max_threads = Some(max_threads);
}

pub fn update_stream(
&mut self,
thread_index: usize,
stream_index: usize,
stream_to_set: OpenStream,
) {
// Ensure we keep track of enough threads
while self.map.len() <= thread_index {
self.map.push(ThreadStreams {
streams: Vec::new(),
});
}

// Update stream if it exists
for stream in self.map[thread_index].streams.iter_mut() {
if stream.id == stream_index {
self.map[thread_index].streams[stream_index] = stream_to_set;
return;
}
}

// Otherwise add it
self.map[thread_index].streams.push(stream_to_set);
}

pub fn select_right_thread(
&mut self,
src: &String,
time: f64,
transparent: bool,
) -> Result<usize, ErrorWithBacktrace> {
) -> Result<UseThisThread, ErrorWithBacktrace> {
// Map to an existing stream
for thread_id in 0..self.map.len() {
let thread_streams = self.map[thread_id].as_mut();
if thread_streams.is_none() {
continue;
}
for stream in &mut thread_streams.unwrap().streams {
if &stream.src == src && stream.transparent && transparent {
if (stream.last_time - time).abs() < 5.0 {
stream.last_time = time;
_print_verbose(&format!(
"Reusing thread {} for stream {} at time {}",
thread_id, src, time
))?;
return Ok(thread_id);
}
for stream in self.map[thread_id].streams.iter_mut() {
if &stream.src != src {
continue;
}
if stream.transparent != transparent {
continue;
}
if (stream.last_time - time).abs() >= 5.0 {
continue;
}
_print_verbose(&format!(
"Reusing thread {} for stream {} at time {}",
thread_id, src, time
))?;
return Ok(UseThisThread {
thread_id,
stream_id: Some(stream.id),
});
}
}

let mut min_streams = usize::MAX;
let mut selected_thread = None;

for thread_id in 0..self.map.len() {
let thread_streams = self.map[thread_id].as_mut();
if thread_streams.is_none() {
continue;
}
let unwrapped = thread_streams.unwrap();

if unwrapped.streams.is_empty() {
selected_thread = Some(thread_id);
break;
} else if unwrapped.streams.len() < min_streams {
min_streams = unwrapped.streams.len();
selected_thread = Some(thread_id);
}
}

let new_thread_id = match self.map.iter().position(|x| x.is_none()) {
Some(thread_id) => thread_id,
None => match selected_thread {
Some(x) => x,
None => self.map.len() % self.threads,
},
};

if self.map[new_thread_id].is_none() {
self.map[new_thread_id] = Some(ThreadStreams {
streams: Vec::new(),
// Create new thread if allowed
let max_thread = self.max_threads.unwrap();
if self.map.len() < max_thread {
return Ok(UseThisThread {
thread_id: self.map.len(),
stream_id: None,
});
}

let new_thread_streams = self.map[new_thread_id].as_mut().unwrap();
// Assign to the thread with the least amount of streams
let least_amount_of_threads = self
.map
.iter()
.enumerate()
.min_by_key(|(_, streams)| streams.streams.len())
.unwrap()
.0;

new_thread_streams.streams.push(OpenStream {
src: src.clone(),
last_time: time,
transparent,
return Ok(UseThisThread {
thread_id: least_amount_of_threads,
stream_id: None,
});

_print_verbose(&format!(
"Adding stream {} to thread {}",
src, new_thread_id
))?;

Ok(new_thread_id)
}
}

lazy_static! {
pub static ref THREAD_MAP: Mutex<ThreadMap> = Mutex::new(ThreadMap::new());
}
1 change: 1 addition & 0 deletions packages/compositor/rust/thread.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ pub fn run_on_thread(
&mut opened_video_manager,
&mut frame_cache_manager,
)?;

global_printer::synchronized_write_buf(0, &message.nonce, &res)?;
if let Some(cache_size) = current_maximum_cache_size {
ffmpeg::keep_only_latest_frames_and_close_videos(
Expand Down

0 comments on commit fd011ce

Please sign in to comment.