Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

worker: correct (de)initialization order #22773

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 22 additions & 13 deletions src/node_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,14 +71,6 @@ Worker::Worker(Environment* env, Local<Object> wrap)
isolate_ = NewIsolate(array_buffer_allocator_.get(), &loop_);
CHECK_NE(isolate_, nullptr);

thread_exit_async_.reset(new uv_async_t);
thread_exit_async_->data = this;
CHECK_EQ(uv_async_init(env->event_loop(),
thread_exit_async_.get(),
[](uv_async_t* handle) {
static_cast<Worker*>(handle->data)->OnThreadStopped();
}), 0);

{
// Enter an environment capable of executing code in the child Isolate
// (and only in it).
Expand Down Expand Up @@ -242,9 +234,6 @@ void Worker::Run() {

DisposeIsolate();

// Need to run the loop one more time to close the platform's uv_async_t
uv_run(&loop_, UV_RUN_ONCE);

{
Mutex::ScopedLock lock(mutex_);
CHECK(thread_exit_async_);
Expand All @@ -256,6 +245,13 @@ void Worker::Run() {
}

void Worker::DisposeIsolate() {
if (env_) {
CHECK_NOT_NULL(isolate_);
Locker locker(isolate_);
Isolate::Scope isolate_scope(isolate_);
env_.reset();
}

if (isolate_ == nullptr)
return;

Expand Down Expand Up @@ -332,12 +328,16 @@ Worker::~Worker() {
CHECK(stopped_);
CHECK(thread_joined_);
CHECK_EQ(child_port_, nullptr);
CheckedUvLoopClose(&loop_);

// This has most likely already happened within the worker thread -- this
// is just in case Worker creation failed early.
DisposeIsolate();

// Need to run the loop one more time to close the platform's uv_async_t
uv_run(&loop_, UV_RUN_ONCE);

CheckedUvLoopClose(&loop_);

Debug(this, "Worker %llu destroyed", thread_id_);
}

Expand All @@ -361,10 +361,19 @@ void Worker::StartThread(const FunctionCallbackInfo<Value>& args) {

w->env()->add_sub_worker_context(w);
w->stopped_ = false;
w->thread_joined_ = false;

w->thread_exit_async_.reset(new uv_async_t);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: why is this before the uv_thread_create? Maybe it's just a non intuitive name OnThreadStopped?
AFAICT uv_async_send(thread_exit_async_.get()) can only run iff the thread was created.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: why is this before the uv_thread_create?

Switching these would create a race condition – the worker thread might finish (and try to call uv_async_send() on this) before we have initialized the async

Maybe it's just a non intuitive name OnThreadStopped?

Can you expand on this? It’s an event handler that’s called when the worker thread finishes.

AFAICT uv_async_send(thread_exit_async_.get()) can only run iff the thread was created.

That is correct, yes.

w->thread_exit_async_->data = w;
CHECK_EQ(uv_async_init(w->env()->event_loop(),
w->thread_exit_async_.get(),
[](uv_async_t* handle) {
static_cast<Worker*>(handle->data)->OnThreadStopped();
}), 0);

CHECK_EQ(uv_thread_create(&w->tid_, [](void* arg) {
static_cast<Worker*>(arg)->Run();
}, static_cast<void*>(w)), 0);
w->thread_joined_ = false;
}

void Worker::StopThread(const FunctionCallbackInfo<Value>& args) {
Expand Down
15 changes: 15 additions & 0 deletions test/parallel/test-worker-invalid-workerdata.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Flags: --experimental-worker
'use strict';
require('../common');
const assert = require('assert');
const { Worker } = require('worker_threads');

// This tests verifies that failing to serialize workerData does not keep
// the process alive.
// Refs: https://github.com/nodejs/node/issues/22736

assert.throws(() => {
new Worker('./worker.js', {
workerData: { fn: () => {} }
});
}, /DataCloneError/);