Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

src: add native debugging code to workers #21423

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion src/async_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -803,7 +803,8 @@ void EmitAsyncDestroy(Isolate* isolate, async_context asyncContext) {

std::string AsyncWrap::diagnostic_name() const {
return std::string(provider_names[provider_type()]) +
" (" + std::to_string(static_cast<int64_t>(async_id_)) + ")";
" (" + std::to_string(env()->thread_id()) + ":" +
std::to_string(static_cast<int64_t>(async_id_)) + ")";
}

} // namespace node
Expand Down
19 changes: 18 additions & 1 deletion src/node_messaging.cc
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include "debug_utils.h"
#include "node_messaging.h"
#include "node_internals.h"
#include "node_buffer.h"
Expand Down Expand Up @@ -305,8 +306,10 @@ void MessagePortData::AddToIncomingQueue(Message&& message) {
Mutex::ScopedLock lock(mutex_);
incoming_messages_.emplace_back(std::move(message));

if (owner_ != nullptr)
if (owner_ != nullptr) {
Debug(owner_, "Adding message to incoming queue");
owner_->TriggerAsync();
}
}

bool MessagePortData::IsSiblingClosed() const {
Expand Down Expand Up @@ -380,6 +383,8 @@ MessagePort::MessagePort(Environment* env,
Local<Function> init = fn.As<Function>();
USE(init->Call(context, wrap, 0, nullptr));
}

Debug(this, "Created message port");
}

void MessagePort::AddToIncomingQueue(Message&& message) {
Expand All @@ -396,6 +401,8 @@ void MessagePort::TriggerAsync() {
}

void MessagePort::Close(v8::Local<v8::Value> close_callback) {
Debug(this, "Closing message port, data set = %d", static_cast<int>(!!data_));

if (data_) {
// Wrap this call with accessing the mutex, so that TriggerAsync()
// can check IsHandleClosing() without race conditions.
Expand Down Expand Up @@ -447,6 +454,7 @@ MessagePort* MessagePort::New(
}

void MessagePort::OnMessage() {
Debug(this, "Running MessagePort::OnMessage()");
HandleScope handle_scope(env()->isolate());
Local<Context> context = object(env()->isolate())->CreationContext();

Expand All @@ -461,11 +469,15 @@ void MessagePort::OnMessage() {
Mutex::ScopedLock lock(data_->mutex_);

if (stop_event_loop_) {
Debug(this, "MessagePort stops loop as requested");
CHECK(!data_->receiving_messages_);
uv_stop(env()->event_loop());
break;
}

Debug(this, "MessagePort has message, receiving = %d",
static_cast<int>(data_->receiving_messages_));

if (!data_->receiving_messages_)
break;
if (data_->incoming_messages_.empty())
Expand All @@ -475,6 +487,7 @@ void MessagePort::OnMessage() {
}

if (!env()->can_call_into_js()) {
Debug(this, "MessagePort drains queue because !can_call_into_js()");
// In this case there is nothing to do but to drain the current queue.
continue;
}
Expand Down Expand Up @@ -508,6 +521,7 @@ bool MessagePort::IsSiblingClosed() const {
}

void MessagePort::OnClose() {
Debug(this, "MessagePort::OnClose()");
if (data_) {
data_->owner_ = nullptr;
data_->Disentangle();
Expand Down Expand Up @@ -557,13 +571,15 @@ void MessagePort::PostMessage(const FunctionCallbackInfo<Value>& args) {

void MessagePort::Start() {
Mutex::ScopedLock lock(data_->mutex_);
Debug(this, "Start receiving messages");
data_->receiving_messages_ = true;
if (!data_->incoming_messages_.empty())
TriggerAsync();
}

void MessagePort::Stop() {
Mutex::ScopedLock lock(data_->mutex_);
Debug(this, "Stop receiving messages");
data_->receiving_messages_ = false;
}

Expand All @@ -572,6 +588,7 @@ void MessagePort::StopEventLoop() {
data_->receiving_messages_ = false;
stop_event_loop_ = true;

Debug(this, "Received StopEventLoop request");
TriggerAsync();
}

Expand Down
22 changes: 22 additions & 0 deletions src/node_worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ Worker::Worker(Environment* env, Local<Object> wrap)
Mutex::ScopedLock next_thread_id_lock(next_thread_id_mutex);
thread_id_ = next_thread_id++;
}

Debug(this, "Creating worker with id %llu", thread_id_);
wrap->Set(env->context(),
env->thread_id_string(),
Number::New(env->isolate(),
Expand Down Expand Up @@ -107,6 +109,8 @@ Worker::Worker(Environment* env, Local<Object> wrap)

// The new isolate won't be bothered on this thread again.
isolate_->DiscardThreadSpecificMetadata();

Debug(this, "Set up worker with id %llu", thread_id_);
}

bool Worker::is_stopped() const {
Expand All @@ -123,6 +127,7 @@ void Worker::Run() {
MultiIsolatePlatform* platform = isolate_data_->platform();
CHECK_NE(platform, nullptr);

Debug(this, "Starting worker with id %llu", thread_id_);
{
Locker locker(isolate_);
Isolate::Scope isolate_scope(isolate_);
Expand All @@ -143,6 +148,8 @@ void Worker::Run() {
// within it.
if (child_port_ != nullptr)
env_->set_message_port(child_port_->object(isolate_));

Debug(this, "Created message port for worker %llu", thread_id_);
}

if (!is_stopped()) {
Expand All @@ -152,6 +159,8 @@ void Worker::Run() {
// This loads the Node bootstrapping code.
LoadEnvironment(env_.get());
env_->async_hooks()->pop_async_id(1);

Debug(this, "Loaded environment for worker %llu", thread_id_);
}

{
Expand Down Expand Up @@ -189,6 +198,9 @@ void Worker::Run() {
Mutex::ScopedLock lock(mutex_);
if (exit_code_ == 0 && !stopped)
exit_code_ = exit_code;

Debug(this, "Exiting thread for worker %llu with exit code %d",
thread_id_, exit_code_);
}

env_->set_can_call_into_js(false);
Expand Down Expand Up @@ -237,12 +249,15 @@ void Worker::Run() {
scheduled_on_thread_stopped_ = true;
uv_async_send(thread_exit_async_.get());
}

Debug(this, "Worker %llu thread stops", thread_id_);
}

void Worker::DisposeIsolate() {
if (isolate_ == nullptr)
return;

Debug(this, "Worker %llu dispose isolate", thread_id_);
CHECK(isolate_data_);
MultiIsolatePlatform* platform = isolate_data_->platform();
platform->CancelPendingDelayedTasks(isolate_);
Expand Down Expand Up @@ -275,6 +290,8 @@ void Worker::OnThreadStopped() {
Mutex::ScopedLock lock(mutex_);
scheduled_on_thread_stopped_ = false;

Debug(this, "Worker %llu thread stopped", thread_id_);

{
Mutex::ScopedLock stopped_lock(stopped_mutex_);
CHECK(stopped_);
Expand Down Expand Up @@ -318,6 +335,8 @@ Worker::~Worker() {
// This has most likely already happened within the worker thread -- this
// is just in case Worker creation failed early.
DisposeIsolate();

Debug(this, "Worker %llu destroyed", thread_id_);
}

void Worker::New(const FunctionCallbackInfo<Value>& args) {
Expand Down Expand Up @@ -371,6 +390,9 @@ void Worker::Unref(const FunctionCallbackInfo<Value>& args) {
void Worker::Exit(int code) {
Mutex::ScopedLock lock(mutex_);
Mutex::ScopedLock stopped_lock(stopped_mutex_);

Debug(this, "Worker %llu called Exit(%d)", thread_id_, code);

if (!stopped_) {
CHECK_NE(env_, nullptr);
stopped_ = true;
Expand Down