Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

lib,src: make StatWatcher a HandleWrap #21244

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 40 additions & 28 deletions lib/internal/fs/watchers.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,41 +11,42 @@ const {
getStatsFromBinding,
validatePath
} = require('internal/fs/utils');
const { defaultTriggerAsyncIdScope } = require('internal/async_hooks');
const { toNamespacedPath } = require('path');
const { validateUint32 } = require('internal/validators');
const { getPathFromURL } = require('internal/url');
const util = require('util');
const assert = require('assert');

const kOldStatus = Symbol('kOldStatus');
const kUseBigint = Symbol('kUseBigint');
const kOwner = Symbol('kOwner');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a big deal, but don't we usually use a normal owner property on the handles elsewhere?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We use a symbol in (most/all) new code.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I’d like to transition to using a single kOwner symbol for all native handles

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea (and probably what would have been done if symbols existed back in the day).


function emitStop(self) {
self.emit('stop');
}

function StatWatcher(bigint) {
EventEmitter.call(this);

this._handle = new _StatWatcher(bigint);

// uv_fs_poll is a little more powerful than ev_stat but we curb it for
// the sake of backwards compatibility
let oldStatus = -1;

this._handle.onchange = (newStatus, stats) => {
if (oldStatus === -1 &&
newStatus === -1 &&
stats[2/* new nlink */] === stats[16/* old nlink */]) return;

oldStatus = newStatus;
this.emit('change', getStatsFromBinding(stats),
getStatsFromBinding(stats, kFsStatsFieldsLength));
};

this._handle.onstop = () => {
process.nextTick(emitStop, this);
};
this._handle = null;
this[kOldStatus] = -1;
this[kUseBigint] = bigint;
}
util.inherits(StatWatcher, EventEmitter);

function onchange(newStatus, stats) {
const self = this[kOwner];
if (self[kOldStatus] === -1 &&
newStatus === -1 &&
stats[2/* new nlink */] === stats[16/* old nlink */]) {
return;
}

self[kOldStatus] = newStatus;
self.emit('change', getStatsFromBinding(stats),
getStatsFromBinding(stats, kFsStatsFieldsLength));
}

// FIXME(joyeecheung): this method is not documented.
// At the moment if filename is undefined, we
Expand All @@ -54,16 +55,23 @@ util.inherits(StatWatcher, EventEmitter);
// on a valid filename and the wrap has been initialized
// This method is a noop if the watcher has already been started.
StatWatcher.prototype.start = function(filename, persistent, interval) {
assert(this._handle instanceof _StatWatcher, 'handle must be a StatWatcher');
if (this._handle.isActive) {
if (this._handle !== null)
return;
}

this._handle = new _StatWatcher(this[kUseBigint]);
this._handle[kOwner] = this;
this._handle.onchange = onchange;
if (!persistent)
this._handle.unref();

// uv_fs_poll is a little more powerful than ev_stat but we curb it for
// the sake of backwards compatibility
this[kOldStatus] = -1;

filename = getPathFromURL(filename);
validatePath(filename, 'filename');
validateUint32(interval, 'interval');
const err = this._handle.start(toNamespacedPath(filename),
persistent, interval);
const err = this._handle.start(toNamespacedPath(filename), interval);
if (err) {
const error = errors.uvException({
errno: err,
Expand All @@ -80,11 +88,15 @@ StatWatcher.prototype.start = function(filename, persistent, interval) {
// FSWatcher is .close()
// This method is a noop if the watcher has not been started.
StatWatcher.prototype.stop = function() {
assert(this._handle instanceof _StatWatcher, 'handle must be a StatWatcher');
if (!this._handle.isActive) {
if (this._handle === null)
return;
}
this._handle.stop();

defaultTriggerAsyncIdScope(this._handle.getAsyncId(),
process.nextTick,
emitStop,
this);
this._handle.close();
this._handle = null;
};


Expand Down
108 changes: 21 additions & 87 deletions src/node_stat_watcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,12 @@
namespace node {

using v8::Context;
using v8::DontDelete;
using v8::DontEnum;
using v8::FunctionCallbackInfo;
using v8::FunctionTemplate;
using v8::HandleScope;
using v8::Integer;
using v8::Local;
using v8::Object;
using v8::PropertyAttribute;
using v8::ReadOnly;
using v8::Signature;
using v8::String;
using v8::Uint32;
using v8::Value;
Expand All @@ -58,43 +53,32 @@ void StatWatcher::Initialize(Environment* env, Local<Object> target) {

AsyncWrap::AddWrapMethods(env, t);
env->SetProtoMethod(t, "start", StatWatcher::Start);
env->SetProtoMethod(t, "stop", StatWatcher::Stop);

Local<FunctionTemplate> is_active_templ =
FunctionTemplate::New(env->isolate(),
IsActive,
env->as_external(),
Signature::New(env->isolate(), t));
t->PrototypeTemplate()->SetAccessorProperty(
FIXED_ONE_BYTE_STRING(env->isolate(), "isActive"),
is_active_templ,
Local<FunctionTemplate>(),
static_cast<PropertyAttribute>(ReadOnly | DontDelete | DontEnum));
env->SetProtoMethod(t, "close", HandleWrap::Close);
env->SetProtoMethod(t, "ref", HandleWrap::Ref);
env->SetProtoMethod(t, "unref", HandleWrap::Unref);
env->SetProtoMethod(t, "hasRef", HandleWrap::HasRef);

target->Set(statWatcherString, t->GetFunction());
}


StatWatcher::StatWatcher(Environment* env, Local<Object> wrap, bool use_bigint)
: AsyncWrap(env, wrap, AsyncWrap::PROVIDER_STATWATCHER),
watcher_(nullptr),
StatWatcher::StatWatcher(Environment* env,
Local<Object> wrap,
bool use_bigint)
: HandleWrap(env,
wrap,
reinterpret_cast<uv_handle_t*>(&watcher_),
AsyncWrap::PROVIDER_STATWATCHER),
use_bigint_(use_bigint) {
MakeWeak();
}


StatWatcher::~StatWatcher() {
if (IsActive())
Stop();
CHECK_EQ(0, uv_fs_poll_init(env->event_loop(), &watcher_));
}


void StatWatcher::Callback(uv_fs_poll_t* handle,
int status,
const uv_stat_t* prev,
const uv_stat_t* curr) {
StatWatcher* wrap = static_cast<StatWatcher*>(handle->data);
CHECK_EQ(wrap->watcher_, handle);
StatWatcher* wrap = ContainerOf(&StatWatcher::watcher_, handle);
Environment* env = wrap->env();
HandleScope handle_scope(env->isolate());
Context::Scope context_scope(env->context());
Expand All @@ -118,78 +102,28 @@ void StatWatcher::New(const FunctionCallbackInfo<Value>& args) {
new StatWatcher(env, args.This(), args[0]->IsTrue());
}

bool StatWatcher::IsActive() {
return watcher_ != nullptr;
}

void StatWatcher::IsActive(const v8::FunctionCallbackInfo<v8::Value>& args) {
StatWatcher* wrap = Unwrap<StatWatcher>(args.This());
CHECK_NOT_NULL(wrap);
args.GetReturnValue().Set(wrap->IsActive());
}

// wrap.start(filename, persistent, interval)
// wrap.start(filename, interval)
void StatWatcher::Start(const FunctionCallbackInfo<Value>& args) {
CHECK_EQ(args.Length(), 3);
CHECK_EQ(args.Length(), 2);

StatWatcher* wrap = Unwrap<StatWatcher>(args.Holder());
CHECK_NOT_NULL(wrap);
if (wrap->IsActive()) {
return;
}
StatWatcher* wrap;
ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder());
CHECK(!uv_is_active(wrap->GetHandle()));

const int argc = args.Length();
CHECK_GE(argc, 3);

node::Utf8Value path(args.GetIsolate(), args[0]);
CHECK_NOT_NULL(*path);

bool persistent = true;
if (args[1]->IsFalse()) {
persistent = false;
}

CHECK(args[2]->IsUint32());
const uint32_t interval = args[2].As<Uint32>()->Value();

wrap->watcher_ = new uv_fs_poll_t();
CHECK_EQ(0, uv_fs_poll_init(wrap->env()->event_loop(), wrap->watcher_));
wrap->watcher_->data = static_cast<void*>(wrap);
// Safe, uv_ref/uv_unref are idempotent.
if (persistent)
uv_ref(reinterpret_cast<uv_handle_t*>(wrap->watcher_));
else
uv_unref(reinterpret_cast<uv_handle_t*>(wrap->watcher_));
CHECK(args[1]->IsUint32());
const uint32_t interval = args[1].As<Uint32>()->Value();

// Note that uv_fs_poll_start does not return ENOENT, we are handling
// mostly memory errors here.
const int err = uv_fs_poll_start(wrap->watcher_, Callback, *path, interval);
const int err = uv_fs_poll_start(&wrap->watcher_, Callback, *path, interval);
if (err != 0) {
args.GetReturnValue().Set(err);
}
wrap->ClearWeak();
}


void StatWatcher::Stop(const FunctionCallbackInfo<Value>& args) {
StatWatcher* wrap = Unwrap<StatWatcher>(args.Holder());
CHECK_NOT_NULL(wrap);
if (!wrap->IsActive()) {
return;
}

Environment* env = wrap->env();
Context::Scope context_scope(env->context());
wrap->MakeCallback(env->onstop_string(), 0, nullptr);
wrap->Stop();
}


void StatWatcher::Stop() {
env()->CloseHandle(watcher_, [](uv_fs_poll_t* handle) { delete handle; });
watcher_ = nullptr;
MakeWeak();
}


} // namespace node
16 changes: 6 additions & 10 deletions src/node_stat_watcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,26 +25,24 @@
#if defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS

#include "node.h"
#include "async_wrap.h"
#include "handle_wrap.h"
#include "env.h"
#include "uv.h"
#include "v8.h"

namespace node {

class StatWatcher : public AsyncWrap {
class StatWatcher : public HandleWrap {
public:
~StatWatcher() override;

static void Initialize(Environment* env, v8::Local<v8::Object> target);

protected:
StatWatcher(Environment* env, v8::Local<v8::Object> wrap, bool use_bigint);
StatWatcher(Environment* env,
v8::Local<v8::Object> wrap,
bool use_bigint);

static void New(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Start(const v8::FunctionCallbackInfo<v8::Value>& args);
static void Stop(const v8::FunctionCallbackInfo<v8::Value>& args);
static void IsActive(const v8::FunctionCallbackInfo<v8::Value>& args);

size_t self_size() const override { return sizeof(*this); }

Expand All @@ -53,10 +51,8 @@ class StatWatcher : public AsyncWrap {
int status,
const uv_stat_t* prev,
const uv_stat_t* curr);
void Stop();
bool IsActive();

uv_fs_poll_t* watcher_;
uv_fs_poll_t watcher_;
const bool use_bigint_;
};

Expand Down
46 changes: 31 additions & 15 deletions test/async-hooks/test-statwatcher.js
Original file line number Diff line number Diff line change
@@ -1,21 +1,29 @@
'use strict';

const common = require('../common');
const commonPath = require.resolve('../common');
const tmpdir = require('../common/tmpdir');
const assert = require('assert');
const initHooks = require('./init-hooks');
const { checkInvocations } = require('./hook-checks');
const fs = require('fs');
const path = require('path');

if (!common.isMainThread)
common.skip('Worker bootstrapping works differently -> different async IDs');

tmpdir.refresh();

const file1 = path.join(tmpdir.path, 'file1');
const file2 = path.join(tmpdir.path, 'file2');
fs.writeFileSync(file1, 'foo');
fs.writeFileSync(file2, 'bar');

const hooks = initHooks();
hooks.enable();

function onchange() {}
// install first file watcher
fs.watchFile(__filename, onchange);
const w1 = fs.watchFile(file1, { interval: 10 }, onchange);

let as = hooks.activitiesOfTypes('STATWATCHER');
assert.strictEqual(as.length, 1);
Expand All @@ -28,7 +36,7 @@ checkInvocations(statwatcher1, { init: 1 },
'watcher1: when started to watch file');

// install second file watcher
fs.watchFile(commonPath, onchange);
const w2 = fs.watchFile(file2, { interval: 10 }, onchange);
as = hooks.activitiesOfTypes('STATWATCHER');
assert.strictEqual(as.length, 2);

Expand All @@ -41,19 +49,27 @@ checkInvocations(statwatcher1, { init: 1 },
checkInvocations(statwatcher2, { init: 1 },
'watcher2: when started to watch second file');

// remove first file watcher
fs.unwatchFile(__filename);
checkInvocations(statwatcher1, { init: 1, before: 1, after: 1 },
'watcher1: when unwatched first file');
checkInvocations(statwatcher2, { init: 1 },
'watcher2: when unwatched first file');
setTimeout(() => fs.writeFileSync(file1, 'foo++'), 10);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be setImmediate or is that unreliable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we can't use that, shouldn't we be using the platform timeout modifier?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So… the reason for this is that the stat watcher itself internally itself uses file system requests + timeouts. In order to witness a change, the write needs to hit a spot after the initial internal stat request has completed, but that’s timing that we can’t really track.

I’ve updated to use common.platformTimeout(100).

w1.once('change', common.mustCall(() => {
setImmediate(() => {
checkInvocations(statwatcher1, { init: 1, before: 1, after: 1 },
'watcher1: when unwatched first file');
checkInvocations(statwatcher2, { init: 1 },
'watcher2: when unwatched first file');

// remove second file watcher
fs.unwatchFile(commonPath);
checkInvocations(statwatcher1, { init: 1, before: 1, after: 1 },
'watcher1: when unwatched second file');
checkInvocations(statwatcher2, { init: 1, before: 1, after: 1 },
'watcher2: when unwatched second file');
setTimeout(() => fs.writeFileSync(file2, 'bar++'), 10);
w2.once('change', common.mustCall(() => {
setImmediate(() => {
checkInvocations(statwatcher1, { init: 1, before: 1, after: 1 },
'watcher1: when unwatched second file');
checkInvocations(statwatcher2, { init: 1, before: 1, after: 1 },
'watcher2: when unwatched second file');
fs.unwatchFile(file1);
fs.unwatchFile(file2);
});
}));
});
}));

process.on('exit', onexit);

Expand Down
Loading