Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http2: send GOAWAY properly & don't continue reading unnecessarily #20772

Closed
Closed
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 54 additions & 37 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -341,20 +341,25 @@ function onStreamClose(code) {

stream[kState].fd = -1;
// Defer destroy we actually emit end.
if (stream._readableState.endEmitted || code !== NGHTTP2_NO_ERROR) {
if (!stream.readable || code !== NGHTTP2_NO_ERROR) {
// If errored or ended, we can destroy immediately.
stream[kMaybeDestroy](null, code);
stream[kMaybeDestroy](code);
} else {
// Wait for end to destroy.
stream.on('end', stream[kMaybeDestroy]);
// Push a null so the stream can end whenever the client consumes
// it completely.
stream.push(null);
// If the client hasn't tried to consume the stream and there is no
// resume scheduled (which would indicate they would consume in the future),
// then just dump the incoming data so that the stream can be destroyed.
if (!stream[kState].didRead && !stream._readableState.resumeScheduled)

// If the user hasn't tried to consume the stream (and this is a server
// session) then just dump the incoming data so that the stream can
// be destroyed.
if (stream[kSession][kType] === NGHTTP2_SESSION_SERVER &&
!stream[kState].didRead &&
stream.readableFlowing === null)
stream.resume();
else
stream.read(0);
}
}

Expand All @@ -379,7 +384,7 @@ function onStreamRead(nread, buf) {
`${sessionName(stream[kSession][kType])}]: ending readable.`);

// defer this until we actually emit end
if (stream._readableState.endEmitted) {
if (!stream.readable) {
stream[kMaybeDestroy]();
} else {
stream.on('end', stream[kMaybeDestroy]);
Expand Down Expand Up @@ -469,8 +474,7 @@ function onGoawayData(code, lastStreamID, buf) {
// goaway using NGHTTP2_NO_ERROR because there was no error
// condition on this side of the session that caused the
// shutdown.
session.destroy(new ERR_HTTP2_SESSION_ERROR(code),
{ errorCode: NGHTTP2_NO_ERROR });
session.destroy(new ERR_HTTP2_SESSION_ERROR(code), NGHTTP2_NO_ERROR);
}
}

Expand Down Expand Up @@ -813,6 +817,21 @@ function emitClose(self, error) {
self.emit('close');
}

function finishSessionDestroy(session, error) {
const socket = session[kSocket];
if (!socket.destroyed)
socket.destroy(error);

session[kProxySocket] = undefined;
session[kSocket] = undefined;
session[kHandle] = undefined;
socket[kSession] = undefined;
socket[kServer] = undefined;

// Finally, emit the close and error events (if necessary) on next tick.
process.nextTick(emitClose, session, error);
}

// Upon creation, the Http2Session takes ownership of the socket. The session
// may not be ready to use immediately if the socket is not yet fully connected.
// In that case, the Http2Session will wait for the socket to connect. Once
Expand Down Expand Up @@ -869,6 +888,8 @@ class Http2Session extends EventEmitter {

this[kState] = {
flags: SESSION_FLAGS_PENDING,
goawayCode: null,
goawayLastStreamID: null,
streams: new Map(),
pendingStreams: new Set(),
pendingAck: 0,
Expand Down Expand Up @@ -1171,25 +1192,13 @@ class Http2Session extends EventEmitter {
if (handle !== undefined)
handle.destroy(code, socket.destroyed);

// If there is no error, use setImmediate to destroy the socket on the
// If the socket is alive, use setImmediate to destroy the session on the
// next iteration of the event loop in order to give data time to transmit.
// Otherwise, destroy immediately.
if (!socket.destroyed) {
if (!error) {
setImmediate(socket.destroy.bind(socket));
} else {
socket.destroy(error);
}
}

this[kProxySocket] = undefined;
this[kSocket] = undefined;
this[kHandle] = undefined;
socket[kSession] = undefined;
socket[kServer] = undefined;

// Finally, emit the close and error events (if necessary) on next tick.
process.nextTick(emitClose, this, error);
if (!socket.destroyed)
setImmediate(finishSessionDestroy, this, error);
else
finishSessionDestroy(this, error);
}

// Closing the session will:
Expand Down Expand Up @@ -1441,11 +1450,8 @@ function afterDoStreamWrite(status, handle) {
}

function streamOnResume() {
if (!this.destroyed && !this.pending) {
if (!this[kState].didRead)
this[kState].didRead = true;
if (!this.destroyed)
this[kHandle].readStart();
}
}

function streamOnPause() {
Expand Down Expand Up @@ -1521,6 +1527,10 @@ class Http2Stream extends Duplex {
this[kSession] = session;
session[kState].pendingStreams.add(this);

// Allow our logic for determining whether any reads have happened to
// work in all situations. This is similar to what we do in _http_incoming.
this._readableState.readingMore = true;

this[kTimeout] = null;

this[kState] = {
Expand All @@ -1531,7 +1541,6 @@ class Http2Stream extends Duplex {
trailersReady: false
};

this.on('resume', streamOnResume);
this.on('pause', streamOnPause);
}

Expand Down Expand Up @@ -1725,6 +1734,10 @@ class Http2Stream extends Duplex {
this.push(null);
return;
}
if (!this[kState].didRead) {
this._readableState.readingMore = false;
this[kState].didRead = true;
}
if (!this.pending) {
streamOnResume.call(this);
} else {
Expand Down Expand Up @@ -1866,15 +1879,15 @@ class Http2Stream extends Duplex {
}
// The Http2Stream can be destroyed if it has closed and if the readable
// side has received the final chunk.
[kMaybeDestroy](error, code = NGHTTP2_NO_ERROR) {
if (error || code !== NGHTTP2_NO_ERROR) {
this.destroy(error);
[kMaybeDestroy](code = NGHTTP2_NO_ERROR) {
if (code !== NGHTTP2_NO_ERROR) {
this.destroy();
return;
}

// TODO(mcollina): remove usage of _*State properties
if (this._writableState.ended && this._writableState.pendingcb === 0) {
if (this._readableState.ended && this.closed) {
if (!this.writable) {
if (!this.readable && this.closed) {
this.destroy();
return;
}
Expand All @@ -1887,7 +1900,7 @@ class Http2Stream extends Duplex {
this[kSession][kType] === NGHTTP2_SESSION_SERVER &&
!(state.flags & STREAM_FLAGS_HAS_TRAILERS) &&
!state.didRead &&
!this._readableState.resumeScheduled) {
this.readableFlowing === null) {
this.close();
}
}
Expand Down Expand Up @@ -2477,6 +2490,10 @@ Object.defineProperty(Http2Session.prototype, 'setTimeout', setTimeout);
function socketOnError(error) {
const session = this[kSession];
if (session !== undefined) {
// We can ignore ECONNRESET after GOAWAY was received as there's nothing
// we can do and the other side is fully within its rights to do so.
if (error.code === 'ECONNRESET' && session[kState].goawayCode !== null)
return session.destroy();
debug(`Http2Session ${sessionName(session[kType])}: socket error [` +
`${error.message}]`);
session.destroy(error);
Expand Down
33 changes: 20 additions & 13 deletions src/node_http2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -577,26 +577,28 @@ void Http2Session::EmitStatistics() {
void Http2Session::Close(uint32_t code, bool socket_closed) {
DEBUG_HTTP2SESSION(this, "closing session");

if (flags_ & SESSION_STATE_CLOSED)
if (flags_ & SESSION_STATE_CLOSING)
return;
flags_ |= SESSION_STATE_CLOSED;
flags_ |= SESSION_STATE_CLOSING;

// Stop reading on the i/o stream
if (stream_ != nullptr)
stream_->ReadStop();

// If the socket is not closed, then attempt to send a closing GOAWAY
// frame. There is no guarantee that this GOAWAY will be received by
// the peer but the HTTP/2 spec recommends sendinng it anyway. We'll
// the peer but the HTTP/2 spec recommends sending it anyway. We'll
// make a best effort.
if (!socket_closed) {
Http2Scope h2scope(this);
DEBUG_HTTP2SESSION2(this, "terminating session with code %d", code);
CHECK_EQ(nghttp2_session_terminate_session(session_, code), 0);
SendPendingData();
} else if (stream_ != nullptr) {
stream_->RemoveStreamListener(this);
}

flags_ |= SESSION_STATE_CLOSED;

// If there are outstanding pings, those will need to be canceled, do
// so on the next iteration of the event loop to avoid calling out into
// javascript since this may be called during garbage collection.
Expand Down Expand Up @@ -1355,25 +1357,32 @@ void Http2Session::MaybeScheduleWrite() {
}
}

void Http2Session::MaybeStopReading() {
int want_read = nghttp2_session_want_read(session_);
DEBUG_HTTP2SESSION2(this, "wants read? %d", want_read);
if (want_read == 0)
stream_->ReadStop();
}

// Unset the sending state, finish up all current writes, and reset
// storage for data and metadata that was associated with these writes.
void Http2Session::ClearOutgoing(int status) {
CHECK_NE(flags_ & SESSION_STATE_SENDING, 0);

flags_ &= ~SESSION_STATE_SENDING;

if (outgoing_buffers_.size() > 0) {
outgoing_storage_.clear();

for (const nghttp2_stream_write& wr : outgoing_buffers_) {
std::vector<nghttp2_stream_write> current_outgoing_buffers_;
current_outgoing_buffers_.swap(outgoing_buffers_);
for (const nghttp2_stream_write& wr : current_outgoing_buffers_) {
WriteWrap* wrap = wr.req_wrap;
if (wrap != nullptr)
wrap->Done(status);
}

outgoing_buffers_.clear();
}

flags_ &= ~SESSION_STATE_SENDING;

// Now that we've finished sending queued data, if there are any pending
// RstStreams we should try sending again and then flush them one by one.
if (pending_rst_streams_.size() > 0) {
Expand Down Expand Up @@ -1484,8 +1493,7 @@ uint8_t Http2Session::SendPendingData() {
ClearOutgoing(res.err);
}

DEBUG_HTTP2SESSION2(this, "wants data in return? %d",
nghttp2_session_want_read(session_));
MaybeStopReading();

return 0;
}
Expand Down Expand Up @@ -1618,8 +1626,7 @@ void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
};
MakeCallback(env()->error_string(), arraysize(argv), argv);
} else {
DEBUG_HTTP2SESSION2(this, "processed %d bytes. wants more? %d", ret,
nghttp2_session_want_read(session_));
MaybeStopReading();
}
}

Expand Down
8 changes: 6 additions & 2 deletions src/node_http2.h
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,8 @@ enum session_state_flags {
SESSION_STATE_HAS_SCOPE = 0x1,
SESSION_STATE_WRITE_SCHEDULED = 0x2,
SESSION_STATE_CLOSED = 0x4,
SESSION_STATE_SENDING = 0x8,
SESSION_STATE_CLOSING = 0x8,
SESSION_STATE_SENDING = 0x10,
};

// This allows for 4 default-sized frames with their frame headers
Expand Down Expand Up @@ -619,7 +620,7 @@ class Http2Stream : public AsyncWrap,

inline bool IsClosed() const {
return flags_ & NGHTTP2_STREAM_FLAG_CLOSED;
}
}

inline bool HasTrailers() const {
return flags_ & NGHTTP2_STREAM_FLAG_TRAILERS;
Expand Down Expand Up @@ -827,6 +828,9 @@ class Http2Session : public AsyncWrap, public StreamListener {
// Schedule a write if nghttp2 indicates it wants to write to the socket.
void MaybeScheduleWrite();

// Stop reading if nghttp2 doesn't want to anymore.
void MaybeStopReading();

// Returns pointer to the stream, or nullptr if stream does not exist
inline Http2Stream* FindStream(int32_t id);

Expand Down
6 changes: 0 additions & 6 deletions test/parallel/parallel.status
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,6 @@ prefix parallel
# Postmortem debugging data is prone to accidental removal during V8 updates.
test-postmortem-metadata: PASS,FLAKY

# http2 has a few bugs that make these tests flaky and that are currently worked
# on.
test-http2-client-upload-reject: PASS,FLAKY
test-http2-pipe: PASS,FLAKY
test-http2-client-upload: PASS,FLAKY

[$system==win32]

[$system==linux]
Expand Down
8 changes: 1 addition & 7 deletions test/parallel/test-http2-client-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,20 +109,14 @@ const Countdown = require('../common/countdown');

server.listen(0, common.mustCall(() => {
const client = h2.connect(`http://localhost:${server.address().port}`);
// On some platforms (e.g. windows), an ECONNRESET may occur at this
// point -- or it may not. Do not make this a mustCall
client.on('error', () => {});

client.on('close', () => {
server.close();
// calling destroy in here should not matter
client.destroy();
});

const req = client.request();
// On some platforms (e.g. windows), an ECONNRESET may occur at this
// point -- or it may not. Do not make this a mustCall
req.on('error', () => {});
client.request();
}));
}

Expand Down
19 changes: 11 additions & 8 deletions test/parallel/test-http2-server-close-callback.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,24 @@ const common = require('../common');
if (!common.hasCrypto)
common.skip('missing crypto');

const Countdown = require('../common/countdown');
const http2 = require('http2');

const server = http2.createServer();

let session;

const countdown = new Countdown(2, () => {
server.close(common.mustCall());
session.destroy();
});

server.listen(0, common.mustCall(() => {
const client = http2.connect(`http://localhost:${server.address().port}`);
client.on('error', (err) => {
if (err.code !== 'ECONNRESET')
throw err;
});
client.on('connect', common.mustCall(() => countdown.dec()));
}));

server.on('session', common.mustCall((s) => {
setImmediate(() => {
server.close(common.mustCall());
s.destroy();
});
session = s;
countdown.dec();
}));
Loading