Skip to content

Commit

Permalink
[Breaking] Remove ResumableFileSlot and rely on high ulimits (#1582)
Browse files Browse the repository at this point in the history
ResumableFileSlot became to difficult to manage, instead of
managing resources this way, we use much higher
DEFAULT_OPEN_FILE_PERMITS, set ulimit (when unix) and warn
user if the limits are likely too low.

Breaking: Removed idle_file_descriptor_timeout_millis from config.

closes: #1288, #513, #1298, #527
  • Loading branch information
allada authored Feb 12, 2025
1 parent 7afe286 commit 8b89c31
Show file tree
Hide file tree
Showing 21 changed files with 555 additions and 961 deletions.
12 changes: 11 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 0 additions & 15 deletions nativelink-config/src/cas_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -716,21 +716,6 @@ pub struct GlobalConfig {
#[serde(deserialize_with = "convert_numeric_with_shellexpand")]
pub max_open_files: usize,

/// If a file descriptor is idle for this many milliseconds, it will be closed.
/// In the event a client or store takes a long time to send or receive data
/// the file descriptor will be closed, and since `max_open_files` blocks new
/// `open_file` requests until a slot opens up, it will allow new requests to be
/// processed. If a read or write is attempted on a closed file descriptor, the
/// file will be reopened and the operation will continue.
///
/// On services where worker(s) and scheduler(s) live in the same process, this
/// also prevents deadlocks if a file->file copy is happening, but cannot open
/// a new file descriptor because the limit has been reached.
///
/// Default: 1000 (1 second)
#[serde(default, deserialize_with = "convert_duration_with_shellexpand")]
pub idle_file_descriptor_timeout_millis: u64,

/// This flag can be used to prevent metrics from being collected at runtime.
/// Metrics are still able to be collected, but this flag prevents metrics that
/// are collected at runtime (performance metrics) from being tallied. The
Expand Down
4 changes: 1 addition & 3 deletions nativelink-macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,10 @@ pub fn nativelink_test(attr: TokenStream, item: TokenStream) -> TokenStream {
async fn #fn_name(#fn_inputs) #fn_output {
// Error means already initialized, which is ok.
let _ = nativelink_util::init_tracing();
// If already set it's ok.
let _ = nativelink_util::fs::set_idle_file_descriptor_timeout(std::time::Duration::from_millis(100));

#[warn(clippy::disallowed_methods)]
::std::sync::Arc::new(::nativelink_util::origin_context::OriginContext::new()).wrap_async(
::nativelink_util::__tracing::trace_span!("test"), async move {
::nativelink_util::__tracing::error_span!(stringify!(#fn_name)), async move {
#fn_block
}
)
Expand Down
3 changes: 0 additions & 3 deletions nativelink-store/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,6 @@ aws-sdk-s3 = { version = "=1.68.0", features = [
"rt-tokio",
], default-features = false }
aws-smithy-runtime-api = "1.7.3"
serial_test = { version = "3.2.0", features = [
"async",
], default-features = false }
serde_json = "1.0.135"
fred = { version = "10.0.3", default-features = false, features = ["mocks"] }
tracing-subscriber = { version = "0.3.19", default-features = false }
10 changes: 6 additions & 4 deletions nativelink-store/src/fast_slow_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use std::borrow::BorrowMut;
use std::cmp::{max, min};
use std::ffi::OsString;
use std::ops::Range;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
Expand Down Expand Up @@ -224,9 +225,10 @@ impl StoreDriver for FastSlowStore {
async fn update_with_whole_file(
self: Pin<&Self>,
key: StoreKey<'_>,
mut file: fs::ResumeableFileSlot,
path: OsString,
mut file: fs::FileSlot,
upload_size: UploadSizeInfo,
) -> Result<Option<fs::ResumeableFileSlot>, Error> {
) -> Result<Option<fs::FileSlot>, Error> {
if self
.fast_store
.optimized_for(StoreOptimizations::FileUpdates)
Expand All @@ -246,7 +248,7 @@ impl StoreDriver for FastSlowStore {
}
return self
.fast_store
.update_with_whole_file(key, file, upload_size)
.update_with_whole_file(key, path, file, upload_size)
.await;
}

Expand All @@ -269,7 +271,7 @@ impl StoreDriver for FastSlowStore {
}
return self
.slow_store
.update_with_whole_file(key, file, upload_size)
.update_with_whole_file(key, path, file, upload_size)
.await;
}

Expand Down
132 changes: 34 additions & 98 deletions nativelink-store/src/filesystem_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use std::fmt::{Debug, Formatter};
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, Weak};
use std::time::{Duration, SystemTime};
use std::time::SystemTime;

use async_lock::RwLock;
use async_trait::async_trait;
Expand All @@ -39,8 +39,7 @@ use nativelink_util::store_trait::{
StoreDriver, StoreKey, StoreKeyBorrow, StoreOptimizations, UploadSizeInfo,
};
use nativelink_util::{background_spawn, spawn_blocking};
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, SeekFrom};
use tokio::time::{sleep, timeout, Sleep};
use tokio::io::{AsyncReadExt, AsyncWriteExt, Take};
use tokio_stream::wrappers::ReadDirStream;
use tracing::{event, Level};

Expand Down Expand Up @@ -168,7 +167,7 @@ pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
fn make_and_open_file(
block_size: u64,
encoded_file_path: EncodedFilePath,
) -> impl Future<Output = Result<(Self, fs::ResumeableFileSlot, OsString), Error>> + Send
) -> impl Future<Output = Result<(Self, fs::FileSlot, OsString), Error>> + Send
where
Self: Sized;

Expand All @@ -186,7 +185,7 @@ pub trait FileEntry: LenEntry + Send + Sync + Debug + 'static {
&self,
offset: u64,
length: u64,
) -> impl Future<Output = Result<fs::ResumeableFileSlot, Error>> + Send;
) -> impl Future<Output = Result<Take<fs::FileSlot>, Error>> + Send;

/// This function is a safe way to extract the file name of the underlying file. To protect users from
/// accidentally creating undefined behavior we encourage users to do the logic they need to do with
Expand Down Expand Up @@ -231,7 +230,7 @@ impl FileEntry for FileEntryImpl {
async fn make_and_open_file(
block_size: u64,
encoded_file_path: EncodedFilePath,
) -> Result<(FileEntryImpl, fs::ResumeableFileSlot, OsString), Error> {
) -> Result<(FileEntryImpl, fs::FileSlot, OsString), Error> {
let temp_full_path = encoded_file_path.get_file_path().to_os_string();
let temp_file_result = fs::create_file(temp_full_path.clone())
.or_else(|mut err| async {
Expand Down Expand Up @@ -276,30 +275,19 @@ impl FileEntry for FileEntryImpl {
&self.encoded_file_path
}

async fn read_file_part(
fn read_file_part(
&self,
offset: u64,
length: u64,
) -> Result<fs::ResumeableFileSlot, Error> {
let (mut file, full_content_path_for_debug_only) = self
.get_file_path_locked(|full_content_path| async move {
let file = fs::open_file(full_content_path.clone(), length)
.await
.err_tip(|| {
format!("Failed to open file in filesystem store {full_content_path:?}")
})?;
Ok((file, full_content_path))
})
.await?;

file.as_reader()
.await
.err_tip(|| "Could not seek file in read_file_part()")?
.get_mut()
.seek(SeekFrom::Start(offset))
.await
.err_tip(|| format!("Failed to seek file: {full_content_path_for_debug_only:?}"))?;
Ok(file)
) -> impl Future<Output = Result<Take<fs::FileSlot>, Error>> + Send {
self.get_file_path_locked(move |full_content_path| async move {
let file = fs::open_file(&full_content_path, offset, length)
.await
.err_tip(|| {
format!("Failed to open file in filesystem store {full_content_path:?}")
})?;
Ok(file)
})
}

async fn get_file_path_locked<
Expand Down Expand Up @@ -524,6 +512,7 @@ async fn add_files_to_cache<Fe: FileEntry>(
);
}
};

Result::<(String, SystemTime, u64, bool), Error>::Ok((
file_name,
atime,
Expand Down Expand Up @@ -668,19 +657,16 @@ pub struct FilesystemStore<Fe: FileEntry = FileEntryImpl> {
#[metric(help = "Size of the configured read buffer size")]
read_buffer_size: usize,
weak_self: Weak<Self>,
sleep_fn: fn(Duration) -> Sleep,
rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
}

impl<Fe: FileEntry> FilesystemStore<Fe> {
pub async fn new(spec: &FilesystemSpec) -> Result<Arc<Self>, Error> {
Self::new_with_timeout_and_rename_fn(spec, sleep, |from, to| std::fs::rename(from, to))
.await
Self::new_with_timeout_and_rename_fn(spec, |from, to| std::fs::rename(from, to)).await
}

pub async fn new_with_timeout_and_rename_fn(
spec: &FilesystemSpec,
sleep_fn: fn(Duration) -> Sleep,
rename_fn: fn(&OsStr, &OsStr) -> Result<(), std::io::Error>,
) -> Result<Arc<Self>, Error> {
async fn create_subdirs(path: &str) -> Result<(), Error> {
Expand Down Expand Up @@ -735,7 +721,6 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
block_size,
read_buffer_size,
weak_self: weak_self.clone(),
sleep_fn,
rename_fn,
}))
}
Expand All @@ -754,50 +739,34 @@ impl<Fe: FileEntry> FilesystemStore<Fe> {
async fn update_file<'a>(
self: Pin<&'a Self>,
mut entry: Fe,
mut resumeable_temp_file: fs::ResumeableFileSlot,
mut temp_file: fs::FileSlot,
final_key: StoreKey<'static>,
mut reader: DropCloserReadHalf,
) -> Result<(), Error> {
let mut data_size = 0;
loop {
let Ok(data_result) = timeout(fs::idle_file_descriptor_timeout(), reader.recv()).await
else {
// In the event we timeout, we want to close the writing file, to prevent
// the file descriptor left open for long periods of time.
// This is needed because we wrap `fs` so only a fixed number of file
// descriptors may be open at any given time. If we are streaming from
// File -> File, it can cause a deadlock if the Write file is not sending
// data because it is waiting for a file descriotor to open before sending data.
resumeable_temp_file.close_file().await.err_tip(|| {
"Could not close file due to timeout in FileSystemStore::update_file"
})?;
continue;
};
let mut data = data_result.err_tip(|| "Failed to receive data in filesystem store")?;
let mut data = reader
.recv()
.await
.err_tip(|| "Failed to receive data in filesystem store")?;
let data_len = data.len();
if data_len == 0 {
break; // EOF.
}
resumeable_temp_file
.as_writer()
.await
.err_tip(|| "in filesystem_store::update_file")?
temp_file
.write_all_buf(&mut data)
.await
.err_tip(|| "Failed to write data into filesystem store")?;
data_size += data_len as u64;
}

resumeable_temp_file
.as_writer()
.await
.err_tip(|| "in filesystem_store::update_file")?
temp_file
.as_ref()
.sync_all()
.await
.err_tip(|| "Failed to sync_data in filesystem store")?;

drop(resumeable_temp_file);
drop(temp_file);

*entry.data_size_mut() = data_size;
self.emplace_file(final_key, Arc::new(entry)).await
Expand Down Expand Up @@ -942,19 +911,13 @@ impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
async fn update_with_whole_file(
self: Pin<&Self>,
key: StoreKey<'_>,
mut file: fs::ResumeableFileSlot,
path: OsString,
file: fs::FileSlot,
upload_size: UploadSizeInfo,
) -> Result<Option<fs::ResumeableFileSlot>, Error> {
let path = file.get_path().as_os_str().to_os_string();
) -> Result<Option<fs::FileSlot>, Error> {
let file_size = match upload_size {
UploadSizeInfo::ExactSize(size) => size,
UploadSizeInfo::MaxSize(_) => file
.as_reader()
.await
.err_tip(|| {
format!("While getting metadata for {path:?} in update_with_whole_file")
})?
.get_ref()
.as_ref()
.metadata()
.await
Expand Down Expand Up @@ -995,7 +958,6 @@ impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
.err_tip(|| "Failed to send zero EOF in filesystem store get_part")?;
return Ok(());
}

let entry = self.evicting_map.get(&key).await.ok_or_else(|| {
make_err!(
Code::NotFound,
Expand All @@ -1004,47 +966,21 @@ impl<Fe: FileEntry> StoreDriver for FilesystemStore<Fe> {
)
})?;
let read_limit = length.unwrap_or(u64::MAX);
let mut resumeable_temp_file = entry.read_file_part(offset, read_limit).await?;
let mut temp_file = entry.read_file_part(offset, read_limit).await?;

loop {
let mut buf = BytesMut::with_capacity(self.read_buffer_size);
resumeable_temp_file
.as_reader()
.await
.err_tip(|| "In FileSystemStore::get_part()")?
temp_file
.read_buf(&mut buf)
.await
.err_tip(|| "Failed to read data in filesystem store")?;
if buf.is_empty() {
break; // EOF.
}
// In the event it takes a while to send the data to the client, we want to close the
// reading file, to prevent the file descriptor left open for long periods of time.
// Failing to do so might cause deadlocks if the receiver is unable to receive data
// because it is waiting for a file descriptor to open before receiving data.
// Using `ResumeableFileSlot` will re-open the file in the event it gets closed on the
// next iteration.
let buf_content = buf.freeze();
loop {
let sleep_fn = (self.sleep_fn)(fs::idle_file_descriptor_timeout());
tokio::pin!(sleep_fn);
tokio::select! {
() = & mut (sleep_fn) => {
resumeable_temp_file
.close_file()
.await
.err_tip(|| "Could not close file due to timeout in FileSystemStore::get_part")?;
}
res = writer.send(buf_content.clone()) => {
match res {
Ok(()) => break,
Err(err) => {
return Err(err).err_tip(|| "Failed to send chunk in filesystem store get_part");
}
}
}
}
}
writer
.send(buf.freeze())
.await
.err_tip(|| "Failed to send chunk in filesystem store get_part")?;
}
writer
.send_eof()
Expand Down
Loading

0 comments on commit 8b89c31

Please sign in to comment.