diff --git a/doc/api/http2.md b/doc/api/http2.md index e902866598d7d4..0b13abfaa35e77 100644 --- a/doc/api/http2.md +++ b/doc/api/http2.md @@ -884,6 +884,10 @@ The `'trailers'` event is emitted when a block of headers associated with trailing header fields is received. The listener callback is passed the [HTTP/2 Headers Object][] and flags associated with the headers. +Note that this event might not be emitted if `http2stream.end()` is called +before trailers are received and the incoming data is not being read or +listened for. + ```js stream.on('trailers', (headers, flags) => { console.log(headers); diff --git a/lib/internal/http2/core.js b/lib/internal/http2/core.js index 106991dbd4224d..d2cea48fab6a26 100644 --- a/lib/internal/http2/core.js +++ b/lib/internal/http2/core.js @@ -292,20 +292,25 @@ function onStreamClose(code) { tryClose(stream[kState].fd); // Defer destroy we actually emit end. - if (stream._readableState.endEmitted || code !== NGHTTP2_NO_ERROR) { + if (!stream.readable || code !== NGHTTP2_NO_ERROR) { // If errored or ended, we can destroy immediately. - stream[kMaybeDestroy](null, code); + stream[kMaybeDestroy](code); } else { // Wait for end to destroy. stream.on('end', stream[kMaybeDestroy]); // Push a null so the stream can end whenever the client consumes // it completely. stream.push(null); - // If the client hasn't tried to consume the stream and there is no - // resume scheduled (which would indicate they would consume in the future), - // then just dump the incoming data so that the stream can be destroyed. - if (!stream[kState].didRead && !stream._readableState.resumeScheduled) + + // If the user hasn't tried to consume the stream (and this is a server + // session) then just dump the incoming data so that the stream can + // be destroyed. + if (stream[kSession][kType] === NGHTTP2_SESSION_SERVER && + !stream[kState].didRead && + stream.readableFlowing === null) stream.resume(); + else + stream.read(0); } } @@ -330,7 +335,7 @@ function onStreamRead(nread, buf) { `${sessionName(stream[kSession][kType])}]: ending readable.`); // defer this until we actually emit end - if (stream._readableState.endEmitted) { + if (!stream.readable) { stream[kMaybeDestroy](); } else { stream.on('end', stream[kMaybeDestroy]); @@ -421,7 +426,7 @@ function onGoawayData(code, lastStreamID, buf) { // condition on this side of the session that caused the // shutdown. session.destroy(new errors.Error('ERR_HTTP2_SESSION_ERROR', code), - { errorCode: NGHTTP2_NO_ERROR }); + NGHTTP2_NO_ERROR); } } @@ -772,6 +777,21 @@ function emitClose(self, error) { self.emit('close'); } +function finishSessionDestroy(session, error) { + const socket = session[kSocket]; + if (!socket.destroyed) + socket.destroy(error); + + session[kProxySocket] = undefined; + session[kSocket] = undefined; + session[kHandle] = undefined; + socket[kSession] = undefined; + socket[kServer] = undefined; + + // Finally, emit the close and error events (if necessary) on next tick. + process.nextTick(emitClose, session, error); +} + // Upon creation, the Http2Session takes ownership of the socket. The session // may not be ready to use immediately if the socket is not yet fully connected. // In that case, the Http2Session will wait for the socket to connect. Once @@ -828,6 +848,8 @@ class Http2Session extends EventEmitter { this[kState] = { flags: SESSION_FLAGS_PENDING, + goawayCode: null, + goawayLastStreamID: null, streams: new Map(), pendingStreams: new Set(), pendingAck: 0, @@ -1130,25 +1152,13 @@ class Http2Session extends EventEmitter { if (handle !== undefined) handle.destroy(code, socket.destroyed); - // If there is no error, use setImmediate to destroy the socket on the + // If the socket is alive, use setImmediate to destroy the session on the // next iteration of the event loop in order to give data time to transmit. // Otherwise, destroy immediately. - if (!socket.destroyed) { - if (!error) { - setImmediate(socket.destroy.bind(socket)); - } else { - socket.destroy(error); - } - } - - this[kProxySocket] = undefined; - this[kSocket] = undefined; - this[kHandle] = undefined; - socket[kSession] = undefined; - socket[kServer] = undefined; - - // Finally, emit the close and error events (if necessary) on next tick. - process.nextTick(emitClose, this, error); + if (!socket.destroyed) + setImmediate(finishSessionDestroy, this, error); + else + finishSessionDestroy(this, error); } // Closing the session will: @@ -1422,11 +1432,8 @@ function afterDoStreamWrite(status, handle, req) { } function streamOnResume() { - if (!this.destroyed && !this.pending) { - if (!this[kState].didRead) - this[kState].didRead = true; + if (!this.destroyed) this[kHandle].readStart(); - } } function streamOnPause() { @@ -1441,6 +1448,16 @@ function afterShutdown() { stream[kMaybeDestroy](); } +function finishSendTrailers(stream, headersList) { + stream[kState].flags &= ~STREAM_FLAGS_HAS_TRAILERS; + + const ret = stream[kHandle].trailers(headersList); + if (ret < 0) + stream.destroy(new NghttpError(ret)); + else + stream[kMaybeDestroy](); +} + function closeStream(stream, code, shouldSubmitRstStream = true) { const state = stream[kState]; state.flags |= STREAM_FLAGS_CLOSED; @@ -1502,6 +1519,10 @@ class Http2Stream extends Duplex { this[kSession] = session; session[kState].pendingStreams.add(this); + // Allow our logic for determining whether any reads have happened to + // work in all situations. This is similar to what we do in _http_incoming. + this._readableState.readingMore = true; + this[kState] = { didRead: false, flags: STREAM_FLAGS_PENDING, @@ -1510,7 +1531,6 @@ class Http2Stream extends Duplex { trailersReady: false }; - this.on('resume', streamOnResume); this.on('pause', streamOnPause); } @@ -1717,6 +1737,10 @@ class Http2Stream extends Duplex { this.push(null); return; } + if (!this[kState].didRead) { + this._readableState.readingMore = false; + this[kState].didRead = true; + } if (!this.pending) { streamOnResume.call(this); } else { @@ -1765,13 +1789,8 @@ class Http2Stream extends Duplex { throw headersList; this[kSentTrailers] = headers; - this[kState].flags &= ~STREAM_FLAGS_HAS_TRAILERS; - - const ret = this[kHandle].trailers(headersList); - if (ret < 0) - this.destroy(new NghttpError(ret)); - else - this[kMaybeDestroy](); + // Send the trailers in setImmediate so we don't do it on nghttp2 stack. + setImmediate(finishSendTrailers, this, headersList); } get closed() { @@ -1861,15 +1880,15 @@ class Http2Stream extends Duplex { } // The Http2Stream can be destroyed if it has closed and if the readable // side has received the final chunk. - [kMaybeDestroy](error, code = NGHTTP2_NO_ERROR) { - if (error || code !== NGHTTP2_NO_ERROR) { - this.destroy(error); + [kMaybeDestroy](code = NGHTTP2_NO_ERROR) { + if (code !== NGHTTP2_NO_ERROR) { + this.destroy(); return; } // TODO(mcollina): remove usage of _*State properties - if (this._writableState.ended && this._writableState.pendingcb === 0) { - if (this._readableState.ended && this.closed) { + if (!this.writable) { + if (!this.readable && this.closed) { this.destroy(); return; } @@ -1882,7 +1901,7 @@ class Http2Stream extends Duplex { this[kSession][kType] === NGHTTP2_SESSION_SERVER && !(state.flags & STREAM_FLAGS_HAS_TRAILERS) && !state.didRead && - !this._readableState.resumeScheduled) { + this.readableFlowing === null) { this.close(); } } @@ -2445,6 +2464,10 @@ Object.defineProperty(Http2Session.prototype, 'setTimeout', setTimeout); function socketOnError(error) { const session = this[kSession]; if (session !== undefined) { + // We can ignore ECONNRESET after GOAWAY was received as there's nothing + // we can do and the other side is fully within its rights to do so. + if (error.code === 'ECONNRESET' && session[kState].goawayCode !== null) + return session.destroy(); debug(`Http2Session ${sessionName(session[kType])}: socket error [` + `${error.message}]`); session.destroy(error); diff --git a/src/node_http2.cc b/src/node_http2.cc index 206a806b3c4596..fa0747b4261ee4 100644 --- a/src/node_http2.cc +++ b/src/node_http2.cc @@ -631,9 +631,9 @@ inline void Http2Session::EmitStatistics() { void Http2Session::Close(uint32_t code, bool socket_closed) { DEBUG_HTTP2SESSION(this, "closing session"); - if (flags_ & SESSION_STATE_CLOSED) + if (flags_ & SESSION_STATE_CLOSING) return; - flags_ |= SESSION_STATE_CLOSED; + flags_ |= SESSION_STATE_CLOSING; // Stop reading on the i/o stream if (stream_ != nullptr) @@ -641,16 +641,18 @@ void Http2Session::Close(uint32_t code, bool socket_closed) { // If the socket is not closed, then attempt to send a closing GOAWAY // frame. There is no guarantee that this GOAWAY will be received by - // the peer but the HTTP/2 spec recommends sendinng it anyway. We'll + // the peer but the HTTP/2 spec recommends sending it anyway. We'll // make a best effort. if (!socket_closed) { - Http2Scope h2scope(this); DEBUG_HTTP2SESSION2(this, "terminating session with code %d", code); CHECK_EQ(nghttp2_session_terminate_session(session_, code), 0); + SendPendingData(); } else { Unconsume(); } + flags_ |= SESSION_STATE_CLOSED; + // If there are outstanding pings, those will need to be canceled, do // so on the next iteration of the event loop to avoid calling out into // javascript since this may be called during garbage collection. @@ -1387,25 +1389,32 @@ void Http2Session::MaybeScheduleWrite() { } } +void Http2Session::MaybeStopReading() { + int want_read = nghttp2_session_want_read(session_); + DEBUG_HTTP2SESSION2(this, "wants read? %d", want_read); + if (want_read == 0) + stream_->ReadStop(); +} + // Unset the sending state, finish up all current writes, and reset // storage for data and metadata that was associated with these writes. void Http2Session::ClearOutgoing(int status) { CHECK_NE(flags_ & SESSION_STATE_SENDING, 0); + flags_ &= ~SESSION_STATE_SENDING; + if (outgoing_buffers_.size() > 0) { outgoing_storage_.clear(); - for (const nghttp2_stream_write& wr : outgoing_buffers_) { + std::vector current_outgoing_buffers_; + current_outgoing_buffers_.swap(outgoing_buffers_); + for (const nghttp2_stream_write& wr : current_outgoing_buffers_) { WriteWrap* wrap = wr.req_wrap; if (wrap != nullptr) wrap->Done(status); } - - outgoing_buffers_.clear(); } - flags_ &= ~SESSION_STATE_SENDING; - // Now that we've finished sending queued data, if there are any pending // RstStreams we should try sending again and then flush them one by one. if (pending_rst_streams_.size() > 0) { @@ -1525,8 +1534,7 @@ uint8_t Http2Session::SendPendingData() { req->Dispose(); } - DEBUG_HTTP2SESSION2(this, "wants data in return? %d", - nghttp2_session_want_read(session_)); + MaybeStopReading(); return 0; } @@ -1691,8 +1699,7 @@ void Http2Session::OnStreamReadImpl(ssize_t nread, }; session->MakeCallback(env->error_string(), arraysize(argv), argv); } else { - DEBUG_HTTP2SESSION2(session, "processed %d bytes. wants more? %d", ret, - nghttp2_session_want_read(**session)); + session->MaybeStopReading(); } } @@ -1928,6 +1935,7 @@ void Http2Stream::OnTrailers() { HandleScope scope(isolate); Local context = env()->context(); Context::Scope context_scope(context); + flags_ &= ~NGHTTP2_STREAM_FLAG_TRAILERS; MakeCallback(env()->ontrailers_string(), 0, nullptr); } @@ -1936,7 +1944,16 @@ int Http2Stream::SubmitTrailers(nghttp2_nv* nva, size_t len) { CHECK(!this->IsDestroyed()); Http2Scope h2scope(this); DEBUG_HTTP2STREAM2(this, "sending %d trailers", len); - int ret = nghttp2_submit_trailer(**session_, id_, nva, len); + int ret; + // Sending an empty trailers frame poses problems in Safari, Edge & IE. + // Instead we can just send an empty data frame with NGHTTP2_FLAG_END_STREAM + // to indicate that the stream is ready to be closed. + if (len == 0) { + Http2Stream::Provider::Stream prov(this, 0); + ret = nghttp2_submit_data(**session_, NGHTTP2_FLAG_END_STREAM, id_, *prov); + } else { + ret = nghttp2_submit_trailer(**session_, id_, nva, len); + } CHECK_NE(ret, NGHTTP2_ERR_NOMEM); return ret; } @@ -2562,8 +2579,7 @@ void Http2Stream::Info(const FunctionCallbackInfo& args) { Headers list(isolate, context, headers); args.GetReturnValue().Set(stream->SubmitInfo(*list, list.length())); - DEBUG_HTTP2STREAM2(stream, "%d informational headers sent", - headers->Length()); + DEBUG_HTTP2STREAM2(stream, "%d informational headers sent", list.length()); } // Submits trailing headers on the Http2Stream @@ -2578,7 +2594,7 @@ void Http2Stream::Trailers(const FunctionCallbackInfo& args) { Headers list(isolate, context, headers); args.GetReturnValue().Set(stream->SubmitTrailers(*list, list.length())); - DEBUG_HTTP2STREAM2(stream, "%d trailing headers sent", headers->Length()); + DEBUG_HTTP2STREAM2(stream, "%d trailing headers sent", list.length()); } // Grab the numeric id of the Http2Stream diff --git a/src/node_http2.h b/src/node_http2.h index 972c08a2cd98ba..009c05b731eee8 100644 --- a/src/node_http2.h +++ b/src/node_http2.h @@ -433,7 +433,8 @@ enum session_state_flags { SESSION_STATE_HAS_SCOPE = 0x1, SESSION_STATE_WRITE_SCHEDULED = 0x2, SESSION_STATE_CLOSED = 0x4, - SESSION_STATE_SENDING = 0x8, + SESSION_STATE_CLOSING = 0x8, + SESSION_STATE_SENDING = 0x10, }; // This allows for 4 default-sized frames with their frame headers @@ -624,7 +625,7 @@ class Http2Stream : public AsyncWrap, inline bool IsClosed() const { return flags_ & NGHTTP2_STREAM_FLAG_CLOSED; - } + } inline bool HasTrailers() const { return flags_ & NGHTTP2_STREAM_FLAG_TRAILERS; @@ -846,6 +847,9 @@ class Http2Session : public AsyncWrap { // Schedule a write if nghttp2 indicates it wants to write to the socket. void MaybeScheduleWrite(); + // Stop reading if nghttp2 doesn't want to anymore. + void MaybeStopReading(); + // Returns pointer to the stream, or nullptr if stream does not exist inline Http2Stream* FindStream(int32_t id); diff --git a/test/parallel/test-http2-client-destroy.js b/test/parallel/test-http2-client-destroy.js index 6238363511a791..43fc6819e21f7a 100644 --- a/test/parallel/test-http2-client-destroy.js +++ b/test/parallel/test-http2-client-destroy.js @@ -109,9 +109,6 @@ const Countdown = require('../common/countdown'); server.listen(0, common.mustCall(() => { const client = h2.connect(`http://localhost:${server.address().port}`); - // On some platforms (e.g. windows), an ECONNRESET may occur at this - // point -- or it may not. Do not make this a mustCall - client.on('error', () => {}); client.on('close', () => { server.close(); @@ -119,10 +116,7 @@ const Countdown = require('../common/countdown'); client.destroy(); }); - const req = client.request(); - // On some platforms (e.g. windows), an ECONNRESET may occur at this - // point -- or it may not. Do not make this a mustCall - req.on('error', () => {}); + client.request(); })); } diff --git a/test/parallel/test-http2-no-wanttrailers-listener.js b/test/parallel/test-http2-no-wanttrailers-listener.js index 7ba25d0e491e15..87bc21df48aa2c 100644 --- a/test/parallel/test-http2-no-wanttrailers-listener.js +++ b/test/parallel/test-http2-no-wanttrailers-listener.js @@ -3,7 +3,6 @@ const common = require('../common'); if (!common.hasCrypto) common.skip('missing crypto'); -const assert = require('assert'); const h2 = require('http2'); const server = h2.createServer(); @@ -25,9 +24,7 @@ server.on('listening', common.mustCall(function() { const client = h2.connect(`http://localhost:${this.address().port}`); const req = client.request(); req.resume(); - req.on('trailers', common.mustCall((headers) => { - assert.strictEqual(Object.keys(headers).length, 0); - })); + req.on('trailers', common.mustNotCall()); req.on('close', common.mustCall(() => { server.close(); client.close(); diff --git a/test/parallel/test-http2-server-close-callback.js b/test/parallel/test-http2-server-close-callback.js index 66887aa62bebe5..f822d8a4a92d78 100644 --- a/test/parallel/test-http2-server-close-callback.js +++ b/test/parallel/test-http2-server-close-callback.js @@ -4,21 +4,24 @@ const common = require('../common'); if (!common.hasCrypto) common.skip('missing crypto'); +const Countdown = require('../common/countdown'); const http2 = require('http2'); const server = http2.createServer(); +let session; + +const countdown = new Countdown(2, () => { + server.close(common.mustCall()); + session.destroy(); +}); + server.listen(0, common.mustCall(() => { const client = http2.connect(`http://localhost:${server.address().port}`); - client.on('error', (err) => { - if (err.code !== 'ECONNRESET') - throw err; - }); + client.on('connect', common.mustCall(() => countdown.dec())); })); server.on('session', common.mustCall((s) => { - setImmediate(() => { - server.close(common.mustCall()); - s.destroy(); - }); + session = s; + countdown.dec(); })); diff --git a/test/parallel/test-http2-server-sessionerror.js b/test/parallel/test-http2-server-sessionerror.js index 525eb2e6efd11a..c50352fcc35c28 100644 --- a/test/parallel/test-http2-server-sessionerror.js +++ b/test/parallel/test-http2-server-sessionerror.js @@ -35,14 +35,17 @@ server.on('session', common.mustCall((session) => { server.listen(0, common.mustCall(() => { const url = `http://localhost:${server.address().port}`; http2.connect(url) - // An ECONNRESET error may occur depending on the platform (due largely - // to differences in the timing of socket closing). Do not wrap this in - // a common must call. - .on('error', () => {}) + .on('error', common.expectsError({ + code: 'ERR_HTTP2_SESSION_ERROR', + message: 'Session closed with error code 2', + })) .on('close', () => { server.removeAllListeners('error'); http2.connect(url) - .on('error', () => {}) + .on('error', common.expectsError({ + code: 'ERR_HTTP2_SESSION_ERROR', + message: 'Session closed with error code 2', + })) .on('close', () => server.close()); }); })); diff --git a/test/parallel/test-http2-server-shutdown-options-errors.js b/test/parallel/test-http2-server-shutdown-options-errors.js index 2aedec1140701a..94733b199366db 100644 --- a/test/parallel/test-http2-server-shutdown-options-errors.js +++ b/test/parallel/test-http2-server-shutdown-options-errors.js @@ -54,13 +54,7 @@ server.listen( 0, common.mustCall(() => { const client = http2.connect(`http://localhost:${server.address().port}`); - // On certain operating systems, an ECONNRESET may occur. We do not need - // to test for it here. Do not make this a mustCall - client.on('error', () => {}); const req = client.request(); - // On certain operating systems, an ECONNRESET may occur. We do not need - // to test for it here. Do not make this a mustCall - req.on('error', () => {}); req.resume(); req.on('close', common.mustCall(() => { client.close(); diff --git a/test/parallel/test-http2-server-socket-destroy.js b/test/parallel/test-http2-server-socket-destroy.js index 03afc1957b8af4..99595aeb63004d 100644 --- a/test/parallel/test-http2-server-socket-destroy.js +++ b/test/parallel/test-http2-server-socket-destroy.js @@ -41,14 +41,20 @@ server.on('listening', common.mustCall(() => { // The client may have an ECONNRESET error here depending on the operating // system, due mainly to differences in the timing of socket closing. Do // not wrap this in a common mustCall. - client.on('error', () => {}); + client.on('error', (err) => { + if (err.code !== 'ECONNRESET') + throw err; + }); client.on('close', common.mustCall()); const req = client.request({ ':method': 'POST' }); // The client may have an ECONNRESET error here depending on the operating // system, due mainly to differences in the timing of socket closing. Do // not wrap this in a common mustCall. - req.on('error', () => {}); + req.on('error', (err) => { + if (err.code !== 'ECONNRESET') + throw err; + }); req.on('aborted', common.mustCall()); req.resume(); diff --git a/test/parallel/test-http2-server-stream-session-destroy.js b/test/parallel/test-http2-server-stream-session-destroy.js index e70630fe0b1351..6d8de4ba5f7618 100644 --- a/test/parallel/test-http2-server-stream-session-destroy.js +++ b/test/parallel/test-http2-server-stream-session-destroy.js @@ -39,16 +39,8 @@ server.on('stream', common.mustCall((stream) => { server.listen(0, common.mustCall(() => { const client = h2.connect(`http://localhost:${server.address().port}`); - client.on('error', (err) => { - if (err.code !== 'ECONNRESET') - throw err; - }); const req = client.request(); req.resume(); req.on('end', common.mustCall()); req.on('close', common.mustCall(() => server.close(common.mustCall()))); - req.on('error', (err) => { - if (err.code !== 'ECONNRESET') - throw err; - }); })); diff --git a/test/parallel/test-http2-server-timeout.js b/test/parallel/test-http2-server-timeout.js index 581a409ce9171d..88fc4eab2e08c0 100755 --- a/test/parallel/test-http2-server-timeout.js +++ b/test/parallel/test-http2-server-timeout.js @@ -6,10 +6,10 @@ if (!common.hasCrypto) const http2 = require('http2'); const server = http2.createServer(); -server.setTimeout(common.platformTimeout(1)); +server.setTimeout(common.platformTimeout(50)); const onServerTimeout = common.mustCall((session) => { - session.close(() => session.destroy()); + session.close(); }); server.on('stream', common.mustNotCall()); @@ -18,14 +18,8 @@ server.once('timeout', onServerTimeout); server.listen(0, common.mustCall(() => { const url = `http://localhost:${server.address().port}`; const client = http2.connect(url); - // Because of the timeout, an ECONRESET error may or may not happen here. - // Keep this as a non-op and do not use common.mustCall() - client.on('error', () => {}); client.on('close', common.mustCall(() => { const client2 = http2.connect(url); - // Because of the timeout, an ECONRESET error may or may not happen here. - // Keep this as a non-op and do not use common.mustCall() - client2.on('error', () => {}); client2.on('close', common.mustCall(() => server.close())); })); })); diff --git a/test/parallel/test-http2-too-many-settings.js b/test/parallel/test-http2-too-many-settings.js index 0302fe623da07c..acfd73ada68416 100644 --- a/test/parallel/test-http2-too-many-settings.js +++ b/test/parallel/test-http2-too-many-settings.js @@ -29,9 +29,10 @@ function doTest(session) { server.listen(0, common.mustCall(() => { const client = h2.connect(`http://localhost:${server.address().port}`); - // On some operating systems, an ECONNRESET error may be emitted. - // On others it won't be. Do not make this a mustCall - client.on('error', () => {}); + client.on('error', common.expectsError({ + code: 'ERR_HTTP2_SESSION_ERROR', + message: 'Session closed with error code 2', + })); client.on('close', common.mustCall(() => server.close())); })); } diff --git a/test/parallel/test-http2-trailers.js b/test/parallel/test-http2-trailers.js index 2b7f6158ca3951..bdc0931157ad58 100644 --- a/test/parallel/test-http2-trailers.js +++ b/test/parallel/test-http2-trailers.js @@ -18,6 +18,7 @@ server.on('stream', common.mustCall(onStream)); function onStream(stream, headers, flags) { stream.on('trailers', common.mustCall((headers) => { assert.strictEqual(headers[trailerKey], trailerValue); + stream.end(body); })); stream.respond({ 'content-type': 'text/html', @@ -41,8 +42,6 @@ function onStream(stream, headers, flags) { type: Error } ); - - stream.end(body); } server.listen(0);