diff --git a/src/js_stream.cc b/src/js_stream.cc index 7d4ad7a4e978a6..e8e31f41cb64ad 100644 --- a/src/js_stream.cc +++ b/src/js_stream.cc @@ -181,7 +181,7 @@ void JSStream::DoAfterWrite(const FunctionCallbackInfo& args) { ASSIGN_OR_RETURN_UNWRAP(&wrap, args.Holder()); ASSIGN_OR_RETURN_UNWRAP(&w, args[0].As()); - wrap->OnAfterWrite(w); + w->Done(0); } diff --git a/src/node_http2.cc b/src/node_http2.cc index 89d68de88f8cfe..0747789786b028 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -975,9 +975,6 @@ inline void Http2Session::SetChunksSinceLastWrite(size_t n) { WriteWrap* Http2Session::AllocateSend() { HandleScope scope(env()->isolate()); - auto AfterWrite = [](WriteWrap* req, int status) { - req->Dispose(); - }; Local obj = env()->write_wrap_constructor_function() ->NewInstance(env()->context()).ToLocalChecked(); @@ -987,7 +984,7 @@ WriteWrap* Http2Session::AllocateSend() { session(), NGHTTP2_SETTINGS_MAX_FRAME_SIZE); // Max frame size + 9 bytes for the header - return WriteWrap::New(env(), obj, stream_, AfterWrite, size + 9); + return WriteWrap::New(env(), obj, stream_, size + 9); } void Http2Session::Send(WriteWrap* req, char* buf, size_t length) { diff --git a/src/stream_base-inl.h b/src/stream_base-inl.h index eacdb3832c0662..901e07c1e96a25 100644 --- a/src/stream_base-inl.h +++ b/src/stream_base-inl.h @@ -144,15 +144,19 @@ void StreamBase::JSMethod(const FunctionCallbackInfo& args) { } +inline void ShutdownWrap::OnDone(int status) { + stream()->AfterShutdown(this, status); +} + + WriteWrap* WriteWrap::New(Environment* env, Local obj, StreamBase* wrap, - DoneCb cb, size_t extra) { size_t storage_size = ROUND_UP(sizeof(WriteWrap), kAlignSize) + extra; char* storage = new char[storage_size]; - return new(storage) WriteWrap(env, obj, wrap, cb, storage_size); + return new(storage) WriteWrap(env, obj, wrap, storage_size); } @@ -172,6 +176,10 @@ size_t WriteWrap::ExtraSize() const { return storage_size_ - ROUND_UP(sizeof(*this), kAlignSize); } +inline void WriteWrap::OnDone(int status) { + stream()->AfterWrite(this, status); +} + } // namespace node #endif // defined(NODE_WANT_INTERNALS) && NODE_WANT_INTERNALS diff --git a/src/stream_base.cc b/src/stream_base.cc index 34564cadbe777a..4d9e1dfc6b2dba 100644 --- a/src/stream_base.cc +++ b/src/stream_base.cc @@ -55,8 +55,7 @@ int StreamBase::Shutdown(const FunctionCallbackInfo& args) { AsyncHooks::DefaultTriggerAsyncIdScope(env, wrap->get_async_id()); ShutdownWrap* req_wrap = new ShutdownWrap(env, req_wrap_obj, - this, - AfterShutdown); + this); int err = DoShutdown(req_wrap); if (err) @@ -66,7 +65,6 @@ int StreamBase::Shutdown(const FunctionCallbackInfo& args) { void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) { - StreamBase* wrap = req_wrap->wrap(); Environment* env = req_wrap->env(); // The wrap and request objects should still be there. @@ -78,7 +76,7 @@ void StreamBase::AfterShutdown(ShutdownWrap* req_wrap, int status) { Local req_wrap_obj = req_wrap->object(); Local argv[3] = { Integer::New(env->isolate(), status), - wrap->GetObject(), + GetObject(), req_wrap_obj }; @@ -159,8 +157,7 @@ int StreamBase::Writev(const FunctionCallbackInfo& args) { CHECK_NE(wrap, nullptr); AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(env, wrap->get_async_id()); - req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite, - storage_size); + req_wrap = WriteWrap::New(env, req_wrap_obj, this, storage_size); } offset = 0; @@ -252,7 +249,7 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo& args) { CHECK_NE(wrap, nullptr); AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(env, wrap->get_async_id()); - req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite); + req_wrap = WriteWrap::New(env, req_wrap_obj, this); } err = DoWrite(req_wrap, bufs, count, nullptr); @@ -338,8 +335,7 @@ int StreamBase::WriteString(const FunctionCallbackInfo& args) { CHECK_NE(wrap, nullptr); AsyncHooks::DefaultTriggerAsyncIdScope trigger_scope(env, wrap->get_async_id()); - req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite, - storage_size); + req_wrap = WriteWrap::New(env, req_wrap_obj, this, storage_size); } data = req_wrap->Extra(); @@ -401,7 +397,6 @@ int StreamBase::WriteString(const FunctionCallbackInfo& args) { void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) { - StreamBase* wrap = req_wrap->wrap(); Environment* env = req_wrap->env(); HandleScope handle_scope(env->isolate()); @@ -413,19 +408,19 @@ void StreamBase::AfterWrite(WriteWrap* req_wrap, int status) { // Unref handle property Local req_wrap_obj = req_wrap->object(); req_wrap_obj->Delete(env->context(), env->handle_string()).FromJust(); - wrap->OnAfterWrite(req_wrap); + OnAfterWrite(req_wrap, status); Local argv[] = { Integer::New(env->isolate(), status), - wrap->GetObject(), + GetObject(), req_wrap_obj, Undefined(env->isolate()) }; - const char* msg = wrap->Error(); + const char* msg = Error(); if (msg != nullptr) { argv[3] = OneByteString(env->isolate(), msg); - wrap->ClearError(); + ClearError(); } if (req_wrap_obj->Has(env->context(), env->oncomplete_string()).FromJust()) diff --git a/src/stream_base.h b/src/stream_base.h index 94e4bfd73961da..926a3132925c01 100644 --- a/src/stream_base.h +++ b/src/stream_base.h @@ -16,27 +16,27 @@ namespace node { // Forward declarations class StreamBase; -template +template class StreamReq { public: - typedef void (*DoneCb)(Req* req, int status); - - explicit StreamReq(DoneCb cb) : cb_(cb) { + explicit StreamReq(StreamBase* stream) : stream_(stream) { } inline void Done(int status, const char* error_str = nullptr) { - Req* req = static_cast(this); + Base* req = static_cast(this); Environment* env = req->env(); if (error_str != nullptr) { req->object()->Set(env->error_string(), OneByteString(env->isolate(), error_str)); } - cb_(req, status); + req->OnDone(status); } + inline StreamBase* stream() const { return stream_; } + private: - DoneCb cb_; + StreamBase* const stream_; }; class ShutdownWrap : public ReqWrap, @@ -44,11 +44,9 @@ class ShutdownWrap : public ReqWrap, public: ShutdownWrap(Environment* env, v8::Local req_wrap_obj, - StreamBase* wrap, - DoneCb cb) + StreamBase* stream) : ReqWrap(env, req_wrap_obj, AsyncWrap::PROVIDER_SHUTDOWNWRAP), - StreamReq(cb), - wrap_(wrap) { + StreamReq(stream) { Wrap(req_wrap_obj, this); } @@ -60,27 +58,22 @@ class ShutdownWrap : public ReqWrap, return ContainerOf(&ShutdownWrap::req_, req); } - inline StreamBase* wrap() const { return wrap_; } size_t self_size() const override { return sizeof(*this); } - private: - StreamBase* const wrap_; + inline void OnDone(int status); // Just calls stream()->AfterShutdown() }; -class WriteWrap: public ReqWrap, - public StreamReq { +class WriteWrap : public ReqWrap, + public StreamReq { public: static inline WriteWrap* New(Environment* env, v8::Local obj, - StreamBase* wrap, - DoneCb cb, + StreamBase* stream, size_t extra = 0); inline void Dispose(); inline char* Extra(size_t offset = 0); inline size_t ExtraSize() const; - inline StreamBase* wrap() const { return wrap_; } - size_t self_size() const override { return storage_size_; } static WriteWrap* from_req(uv_write_t* req) { @@ -91,24 +84,22 @@ class WriteWrap: public ReqWrap, WriteWrap(Environment* env, v8::Local obj, - StreamBase* wrap, - DoneCb cb) + StreamBase* stream) : ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP), - StreamReq(cb), - wrap_(wrap), + StreamReq(stream), storage_size_(0) { Wrap(obj, this); } + inline void OnDone(int status); // Just calls stream()->AfterWrite() + protected: WriteWrap(Environment* env, v8::Local obj, - StreamBase* wrap, - DoneCb cb, + StreamBase* stream, size_t storage_size) : ReqWrap(env, obj, AsyncWrap::PROVIDER_WRITEWRAP), - StreamReq(cb), - wrap_(wrap), + StreamReq(stream), storage_size_(storage_size) { Wrap(obj, this); } @@ -129,7 +120,6 @@ class WriteWrap: public ReqWrap, // WriteWrap. Ensure this never happens. void operator delete(void* ptr) { UNREACHABLE(); } - StreamBase* const wrap_; const size_t storage_size_; }; @@ -151,7 +141,7 @@ class StreamResource { void* ctx; }; - typedef void (*AfterWriteCb)(WriteWrap* w, void* ctx); + typedef void (*AfterWriteCb)(WriteWrap* w, int status, void* ctx); typedef void (*AllocCb)(size_t size, uv_buf_t* buf, void* ctx); typedef void (*ReadCb)(ssize_t nread, const uv_buf_t* buf, @@ -176,9 +166,9 @@ class StreamResource { virtual void ClearError(); // Events - inline void OnAfterWrite(WriteWrap* w) { + inline void OnAfterWrite(WriteWrap* w, int status) { if (!after_write_cb_.is_empty()) - after_write_cb_.fn(w, after_write_cb_.ctx); + after_write_cb_.fn(w, status, after_write_cb_.ctx); } inline void OnAlloc(size_t size, uv_buf_t* buf) { @@ -208,14 +198,12 @@ class StreamResource { inline Callback read_cb() { return read_cb_; } inline Callback destruct_cb() { return destruct_cb_; } - private: + protected: Callback after_write_cb_; Callback alloc_cb_; Callback read_cb_; Callback destruct_cb_; uint64_t bytes_read_; - - friend class StreamBase; }; class StreamBase : public StreamResource { @@ -257,6 +245,10 @@ class StreamBase : public StreamResource { v8::Local buf, v8::Local handle); + // These are called by the respective {Write,Shutdown}Wrap class. + virtual void AfterShutdown(ShutdownWrap* req, int status); + virtual void AfterWrite(WriteWrap* req, int status); + protected: explicit StreamBase(Environment* env) : env_(env), consumed_(false) { } @@ -267,10 +259,6 @@ class StreamBase : public StreamResource { virtual AsyncWrap* GetAsyncWrap() = 0; virtual v8::Local GetObject(); - // Libuv callbacks - static void AfterShutdown(ShutdownWrap* req, int status); - static void AfterWrite(WriteWrap* req, int status); - // JS Methods int ReadStart(const v8::FunctionCallbackInfo& args); int ReadStop(const v8::FunctionCallbackInfo& args); diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc index 8516e39f295f03..9ff6a8b69a3689 100644 --- a/src/stream_wrap.cc +++ b/src/stream_wrap.cc @@ -92,7 +92,6 @@ LibuvStreamWrap::LibuvStreamWrap(Environment* env, provider), StreamBase(env), stream_(stream) { - set_after_write_cb({ OnAfterWriteImpl, this }); set_alloc_cb({ OnAllocImpl, this }); set_read_cb({ OnReadImpl, this }); } @@ -299,13 +298,13 @@ void LibuvStreamWrap::SetBlocking(const FunctionCallbackInfo& args) { int LibuvStreamWrap::DoShutdown(ShutdownWrap* req_wrap) { int err; - err = uv_shutdown(req_wrap->req(), stream(), AfterShutdown); + err = uv_shutdown(req_wrap->req(), stream(), AfterUvShutdown); req_wrap->Dispatched(); return err; } -void LibuvStreamWrap::AfterShutdown(uv_shutdown_t* req, int status) { +void LibuvStreamWrap::AfterUvShutdown(uv_shutdown_t* req, int status) { ShutdownWrap* req_wrap = ShutdownWrap::from_req(req); CHECK_NE(req_wrap, nullptr); HandleScope scope(req_wrap->env()->isolate()); @@ -360,9 +359,9 @@ int LibuvStreamWrap::DoWrite(WriteWrap* w, uv_stream_t* send_handle) { int r; if (send_handle == nullptr) { - r = uv_write(w->req(), stream(), bufs, count, AfterWrite); + r = uv_write(w->req(), stream(), bufs, count, AfterUvWrite); } else { - r = uv_write2(w->req(), stream(), bufs, count, send_handle, AfterWrite); + r = uv_write2(w->req(), stream(), bufs, count, send_handle, AfterUvWrite); } if (!r) { @@ -383,7 +382,7 @@ int LibuvStreamWrap::DoWrite(WriteWrap* w, } -void LibuvStreamWrap::AfterWrite(uv_write_t* req, int status) { +void LibuvStreamWrap::AfterUvWrite(uv_write_t* req, int status) { WriteWrap* req_wrap = WriteWrap::from_req(req); CHECK_NE(req_wrap, nullptr); HandleScope scope(req_wrap->env()->isolate()); @@ -392,9 +391,9 @@ void LibuvStreamWrap::AfterWrite(uv_write_t* req, int status) { } -void LibuvStreamWrap::OnAfterWriteImpl(WriteWrap* w, void* ctx) { - LibuvStreamWrap* wrap = static_cast(ctx); - wrap->UpdateWriteQueueSize(); +void LibuvStreamWrap::AfterWrite(WriteWrap* w, int status) { + StreamBase::AfterWrite(w, status); + UpdateWriteQueueSize(); } } // namespace node diff --git a/src/stream_wrap.h b/src/stream_wrap.h index 43df504e81b86e..df7349b093f3c2 100644 --- a/src/stream_wrap.h +++ b/src/stream_wrap.h @@ -103,17 +103,18 @@ class LibuvStreamWrap : public HandleWrap, public StreamBase { static void OnRead(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf); - static void AfterWrite(uv_write_t* req, int status); - static void AfterShutdown(uv_shutdown_t* req, int status); + static void AfterUvWrite(uv_write_t* req, int status); + static void AfterUvShutdown(uv_shutdown_t* req, int status); // Resource interface implementation - static void OnAfterWriteImpl(WriteWrap* w, void* ctx); static void OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx); static void OnReadImpl(ssize_t nread, const uv_buf_t* buf, uv_handle_type pending, void* ctx); + void AfterWrite(WriteWrap* req_wrap, int status) override; + uv_stream_t* const stream_; }; diff --git a/src/tls_wrap.cc b/src/tls_wrap.cc index 3f5ed2c57580ff..c15cf166fb29d8 100644 --- a/src/tls_wrap.cc +++ b/src/tls_wrap.cc @@ -328,8 +328,7 @@ void TLSWrap::EncOut() { ->NewInstance(env()->context()).ToLocalChecked(); WriteWrap* write_req = WriteWrap::New(env(), req_wrap_obj, - this, - EncOutCb); + stream_); uv_buf_t buf[arraysize(data)]; for (size_t i = 0; i < count; i++) @@ -346,34 +345,31 @@ void TLSWrap::EncOut() { } -void TLSWrap::EncOutCb(WriteWrap* req_wrap, int status) { - TLSWrap* wrap = req_wrap->wrap()->Cast(); - req_wrap->Dispose(); - +void TLSWrap::EncOutAfterWrite(WriteWrap* req_wrap, int status) { // We should not be getting here after `DestroySSL`, because all queued writes // must be invoked with UV_ECANCELED - CHECK_NE(wrap->ssl_, nullptr); + CHECK_NE(ssl_, nullptr); // Handle error if (status) { // Ignore errors after shutdown - if (wrap->shutdown_) + if (shutdown_) return; // Notify about error - wrap->InvokeQueued(status); + InvokeQueued(status); return; } // Commit - crypto::NodeBIO::FromBIO(wrap->enc_out_)->Read(nullptr, wrap->write_size_); + crypto::NodeBIO::FromBIO(enc_out_)->Read(nullptr, write_size_); // Ensure that the progress will be made and `InvokeQueued` will be called. - wrap->ClearIn(); + ClearIn(); // Try writing more data - wrap->write_size_ = 0; - wrap->EncOut(); + write_size_ = 0; + EncOut(); } @@ -681,9 +677,9 @@ int TLSWrap::DoWrite(WriteWrap* w, } -void TLSWrap::OnAfterWriteImpl(WriteWrap* w, void* ctx) { +void TLSWrap::OnAfterWriteImpl(WriteWrap* w, int status, void* ctx) { TLSWrap* wrap = static_cast(ctx); - wrap->UpdateWriteQueueSize(); + wrap->EncOutAfterWrite(w, status); } diff --git a/src/tls_wrap.h b/src/tls_wrap.h index bd5a4d4028a408..96820e9b7201b1 100644 --- a/src/tls_wrap.h +++ b/src/tls_wrap.h @@ -112,7 +112,7 @@ class TLSWrap : public AsyncWrap, static void SSLInfoCallback(const SSL* ssl_, int where, int ret); void InitSSL(); void EncOut(); - static void EncOutCb(WriteWrap* req_wrap, int status); + void EncOutAfterWrite(WriteWrap* req_wrap, int status); bool ClearIn(); void ClearOut(); void MakePending(); @@ -135,7 +135,7 @@ class TLSWrap : public AsyncWrap, uint32_t UpdateWriteQueueSize(uint32_t write_queue_size = 0); // Resource implementation - static void OnAfterWriteImpl(WriteWrap* w, void* ctx); + static void OnAfterWriteImpl(WriteWrap* w, int status, void* ctx); static void OnAllocImpl(size_t size, uv_buf_t* buf, void* ctx); static void OnReadImpl(ssize_t nread, const uv_buf_t* buf,