Skip to content

Commit

Permalink
Merge pull request #1237 from trapexit/threadfix
Browse files Browse the repository at this point in the history
Initialize readdir threadpool after daemonizing
  • Loading branch information
trapexit authored Aug 29, 2023
2 parents 82781b6 + 0c555e7 commit a927a15
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 22 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ UGID_USE_RWLOCK = 0
ifeq ($(DEBUG),1)
OPT_FLAGS := -O0 -g -fsanitize=undefined
else
OPT_FLAGS := -O2
OPT_FLAGS := -O2 -DNDEBUG
endif

ifeq ($(STATIC),1)
Expand Down
15 changes: 10 additions & 5 deletions man/mergerfs.1
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,9 @@ to enable page caching for when \f[C]cache.files=per-process\f[R].
.IP \[bu] 2
\f[B]parallel-direct-writes=BOOL\f[R]: Allow the kernel to dispatch
multiple, parallel (non-extending) write requests for files opened with
\f[C]direct_io=true\f[R] (if supported by the kernel)
\f[C]cache.files=per-process\f[R] (if the process is not in
\f[C]process-names\f[R]) or \f[C]cache.files=off\f[R].
(This requires kernel support, and was added in v6.2)
.IP \[bu] 2
\f[B]direct_io\f[R]: deprecated - Bypass page cache.
Use \f[C]cache.files=off\f[R] instead.
Expand Down Expand Up @@ -859,8 +861,8 @@ calculations.
The reason is that it doesn\[cq]t really work for non-path preserving
policies and can lead to non-obvious behaviors.
.PP
NOTE: While any policy can be assigned to a function or category though
some may not be very useful in practice.
NOTE: While any policy can be assigned to a function or category, some
may not be very useful in practice.
For instance: \f[B]rand\f[R] (random) may be useful for file creation
(create) but could lead to very odd behavior if used for \f[C]chmod\f[R]
if there were more than one copy of the file.
Expand Down Expand Up @@ -1753,9 +1755,12 @@ size are unchanged since previous open.
cache.files=libfuse: follow traditional libfuse \f[C]direct_io\f[R],
\f[C]kernel_cache\f[R], and \f[C]auto_cache\f[R] arguments.
.IP \[bu] 2
cache.files=per-process: Enable page caching only for processes which
`comm' name matches one of the values defined in
cache.files=per-process: Enable page caching (equivalent to
\f[C]cache.files=partial\f[R]) only for processes whose `comm' name
matches one of the values defined in
\f[C]cache.files.process-names\f[R].
If the name does not match the file open is equivalent to
\f[C]cache.files=off\f[R].
.PP
FUSE, which mergerfs uses, offers a number of page caching modes.
mergerfs tries to simplify their use via the \f[C]cache.files\f[R]
Expand Down
2 changes: 2 additions & 0 deletions src/fuse_init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,8 @@ namespace FUSE

ugid::init();

cfg->readdir.initialize();

l::want_if_capable(conn_,FUSE_CAP_ASYNC_DIO);
l::want_if_capable(conn_,FUSE_CAP_ASYNC_READ,&cfg->async_read);
l::want_if_capable(conn_,FUSE_CAP_ATOMIC_O_TRUNC);
Expand Down
50 changes: 39 additions & 11 deletions src/fuse_readdir.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,15 @@
*/

#include "fuse_readdir.hpp"
#include "fuse_readdir_factory.hpp"

#include "config.hpp"
#include "fuse_readdir_factory.hpp"

/*
The _initialized stuff is not pretty but easiest way to deal with
the fact that mergerfs is doing arg parsing and setting up of things
(including thread pools) before the daemonizing
*/

int
FUSE::readdir(const fuse_file_info_t *ffi_,
Expand All @@ -32,9 +37,11 @@ FUSE::readdir(const fuse_file_info_t *ffi_,
}

FUSE::ReadDir::ReadDir(std::string const s_)
: _initialized(false)
{
from_string(s_);
assert(_readdir);
if(_initialized)
assert(_readdir);
}

std::string
Expand All @@ -45,25 +52,46 @@ FUSE::ReadDir::to_string() const
return _type;
}

void
FUSE::ReadDir::initialize()
{
_initialized = true;
from_string(_type);
}

int
FUSE::ReadDir::from_string(std::string const &str_)
{
std::shared_ptr<FUSE::ReadDirBase> tmp;
if(_initialized)
{
std::shared_ptr<FUSE::ReadDirBase> tmp;

tmp = FUSE::ReadDirFactory::make(str_);
if(!tmp)
return -EINVAL;
tmp = FUSE::ReadDirFactory::make(str_);
if(!tmp)
return -EINVAL;

{
std::lock_guard<std::mutex> lg(_mutex);
{
std::lock_guard<std::mutex> lg(_mutex);

_type = str_;
_readdir = tmp;
}
_type = str_;
std::swap(_readdir,tmp);
}
}
else
{
std::lock_guard<std::mutex> lg(_mutex);

_type = str_;
}

return 0;
}

/*
Yeah... if not initialized it will crash... ¯\_(ツ)_/¯
This will be resolved once initialization of internal objects and
handling of input is better seperated.
*/
int
FUSE::ReadDir::operator()(fuse_file_info_t const *ffi_,
fuse_dirents_t *buf_)
Expand Down
9 changes: 8 additions & 1 deletion src/fuse_readdir.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@

#include <assert.h>


// The initialization behavior is not pretty but for the moment
// needed due to the daemonizing function of the libfuse library when
// not using foreground mode. The threads need to be created after the
// fork, not before.
namespace FUSE
{
int readdir(fuse_file_info_t const *ffi,
Expand All @@ -44,10 +47,14 @@ namespace FUSE
int operator()(fuse_file_info_t const *ffi,
fuse_dirents_t *buf);

public:
void initialize();

private:
mutable std::mutex _mutex;

private:
bool _initialized;
std::string _type;
std::shared_ptr<FUSE::ReadDirBase> _readdir;
};
Expand Down
20 changes: 16 additions & 4 deletions src/unbounded_thread_pool.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include "unbounded_queue.hpp"

#include "syslog.hpp"

#include <atomic>
#include <csignal>
#include <functional>
Expand All @@ -21,8 +23,13 @@ class ThreadPool
ThreadPool(std::size_t const thread_count_ = std::thread::hardware_concurrency(),
std::string const name_ = {})
: _queues(thread_count_),
_count(thread_count_)
_count(thread_count_),
_name(name_)
{
syslog_info("threadpool: spawning %zu threads named '%s'",
_count,
_name.c_str());

auto worker = [this](std::size_t i)
{
while(true)
Expand Down Expand Up @@ -51,17 +58,21 @@ class ThreadPool
_threads.reserve(thread_count_);
for(std::size_t i = 0; i < thread_count_; ++i)
_threads.emplace_back(worker, i);
if(!name_.empty())
if(!_name.empty())
{
for(auto &t : _threads)
pthread_setname_np(t.native_handle(),name_.c_str());
pthread_setname_np(t.native_handle(),_name.c_str());
}

pthread_sigmask(SIG_SETMASK,&oldset,NULL);
}

~ThreadPool()
{
syslog_info("threadpool: destroying %zu threads named '%s'",
_count,
_name.c_str());

for(auto& queue : _queues)
queue.unblock();
for(auto& thread : _threads)
Expand Down Expand Up @@ -133,8 +144,9 @@ class ThreadPool
std::vector<std::thread> _threads;

private:
const std::size_t _count;
std::size_t const _count;
std::atomic_uint _index;
std::string const _name;

static const unsigned int K = 2;
};

0 comments on commit a927a15

Please sign in to comment.