Skip to content

Commit

Permalink
http2: submit Goaway frames & handle ECONNRESET
Browse files Browse the repository at this point in the history
Currently http2 does not properly submit Goaway frames when
a session is being destroyed. It also doesn't properly
handle when the other party severs the connection after
sending a Goaway frame, even though it should.
  • Loading branch information
apapirovski committed May 20, 2018
1 parent df511c6 commit 6cfca8a
Show file tree
Hide file tree
Showing 14 changed files with 104 additions and 107 deletions.
91 changes: 54 additions & 37 deletions lib/internal/http2/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -341,20 +341,25 @@ function onStreamClose(code) {

stream[kState].fd = -1;
// Defer destroy we actually emit end.
if (stream._readableState.endEmitted || code !== NGHTTP2_NO_ERROR) {
if (!stream.readable || code !== NGHTTP2_NO_ERROR) {
// If errored or ended, we can destroy immediately.
stream[kMaybeDestroy](null, code);
stream[kMaybeDestroy](code);
} else {
// Wait for end to destroy.
stream.on('end', stream[kMaybeDestroy]);
// Push a null so the stream can end whenever the client consumes
// it completely.
stream.push(null);
// If the client hasn't tried to consume the stream and there is no
// resume scheduled (which would indicate they would consume in the future),
// then just dump the incoming data so that the stream can be destroyed.
if (!stream[kState].didRead && !stream._readableState.resumeScheduled)

// If the user hasn't tried to consume the stream (and this is a server
// session) then just dump the incoming data so that the stream can
// be destroyed.
if (stream[kSession][kType] === NGHTTP2_SESSION_SERVER &&
!stream[kState].didRead &&
stream.readableFlowing === null)
stream.resume();
else
stream.read(0);
}
}

Expand All @@ -379,7 +384,7 @@ function onStreamRead(nread, buf) {
`${sessionName(stream[kSession][kType])}]: ending readable.`);

// defer this until we actually emit end
if (stream._readableState.endEmitted) {
if (!stream.readable) {
stream[kMaybeDestroy]();
} else {
stream.on('end', stream[kMaybeDestroy]);
Expand Down Expand Up @@ -469,8 +474,7 @@ function onGoawayData(code, lastStreamID, buf) {
// goaway using NGHTTP2_NO_ERROR because there was no error
// condition on this side of the session that caused the
// shutdown.
session.destroy(new ERR_HTTP2_SESSION_ERROR(code),
{ errorCode: NGHTTP2_NO_ERROR });
session.destroy(new ERR_HTTP2_SESSION_ERROR(code), NGHTTP2_NO_ERROR);
}
}

Expand Down Expand Up @@ -813,6 +817,21 @@ function emitClose(self, error) {
self.emit('close');
}

function finishSessionDestroy(session, error) {
const socket = session[kSocket];
if (!socket.destroyed)
socket.destroy(error);

session[kProxySocket] = undefined;
session[kSocket] = undefined;
session[kHandle] = undefined;
socket[kSession] = undefined;
socket[kServer] = undefined;

// Finally, emit the close and error events (if necessary) on next tick.
process.nextTick(emitClose, session, error);
}

// Upon creation, the Http2Session takes ownership of the socket. The session
// may not be ready to use immediately if the socket is not yet fully connected.
// In that case, the Http2Session will wait for the socket to connect. Once
Expand Down Expand Up @@ -869,6 +888,8 @@ class Http2Session extends EventEmitter {

this[kState] = {
flags: SESSION_FLAGS_PENDING,
goawayCode: null,
goawayLastStreamID: null,
streams: new Map(),
pendingStreams: new Set(),
pendingAck: 0,
Expand Down Expand Up @@ -1171,25 +1192,13 @@ class Http2Session extends EventEmitter {
if (handle !== undefined)
handle.destroy(code, socket.destroyed);

// If there is no error, use setImmediate to destroy the socket on the
// If the socket is alive, use setImmediate to destroy the session on the
// next iteration of the event loop in order to give data time to transmit.
// Otherwise, destroy immediately.
if (!socket.destroyed) {
if (!error) {
setImmediate(socket.destroy.bind(socket));
} else {
socket.destroy(error);
}
}

this[kProxySocket] = undefined;
this[kSocket] = undefined;
this[kHandle] = undefined;
socket[kSession] = undefined;
socket[kServer] = undefined;

// Finally, emit the close and error events (if necessary) on next tick.
process.nextTick(emitClose, this, error);
if (!socket.destroyed)
setImmediate(finishSessionDestroy, this, error);
else
finishSessionDestroy(this, error);
}

// Closing the session will:
Expand Down Expand Up @@ -1441,11 +1450,8 @@ function afterDoStreamWrite(status, handle) {
}

function streamOnResume() {
if (!this.destroyed && !this.pending) {
if (!this[kState].didRead)
this[kState].didRead = true;
if (!this.destroyed)
this[kHandle].readStart();
}
}

function streamOnPause() {
Expand Down Expand Up @@ -1521,6 +1527,10 @@ class Http2Stream extends Duplex {
this[kSession] = session;
session[kState].pendingStreams.add(this);

// Allow our logic for determining whether any reads have happened to
// work in all situations. This is similar to what we do in _http_incoming.
this._readableState.readingMore = true;

this[kTimeout] = null;

this[kState] = {
Expand All @@ -1531,7 +1541,6 @@ class Http2Stream extends Duplex {
trailersReady: false
};

this.on('resume', streamOnResume);
this.on('pause', streamOnPause);
}

Expand Down Expand Up @@ -1725,6 +1734,10 @@ class Http2Stream extends Duplex {
this.push(null);
return;
}
if (!this[kState].didRead) {
this._readableState.readingMore = false;
this[kState].didRead = true;
}
if (!this.pending) {
streamOnResume.call(this);
} else {
Expand Down Expand Up @@ -1866,15 +1879,15 @@ class Http2Stream extends Duplex {
}
// The Http2Stream can be destroyed if it has closed and if the readable
// side has received the final chunk.
[kMaybeDestroy](error, code = NGHTTP2_NO_ERROR) {
if (error || code !== NGHTTP2_NO_ERROR) {
this.destroy(error);
[kMaybeDestroy](code = NGHTTP2_NO_ERROR) {
if (code !== NGHTTP2_NO_ERROR) {
this.destroy();
return;
}

// TODO(mcollina): remove usage of _*State properties
if (this._writableState.ended && this._writableState.pendingcb === 0) {
if (this._readableState.ended && this.closed) {
if (!this.writable) {
if (!this.readable && this.closed) {
this.destroy();
return;
}
Expand All @@ -1887,7 +1900,7 @@ class Http2Stream extends Duplex {
this[kSession][kType] === NGHTTP2_SESSION_SERVER &&
!(state.flags & STREAM_FLAGS_HAS_TRAILERS) &&
!state.didRead &&
!this._readableState.resumeScheduled) {
this.readableFlowing === null) {
this.close();
}
}
Expand Down Expand Up @@ -2477,6 +2490,10 @@ Object.defineProperty(Http2Session.prototype, 'setTimeout', setTimeout);
function socketOnError(error) {
const session = this[kSession];
if (session !== undefined) {
// We can ignore ECONNRESET after GOAWAY was received as there's nothing
// we can do and the other side is fully within its rights to do so.
if (error.code === 'ECONNRESET' && session[kState].goawayCode !== null)
return session.destroy();
debug(`Http2Session ${sessionName(session[kType])}: socket error [` +
`${error.message}]`);
session.destroy(error);
Expand Down
33 changes: 20 additions & 13 deletions src/node_http2.cc
Original file line number Diff line number Diff line change
Expand Up @@ -577,26 +577,28 @@ void Http2Session::EmitStatistics() {
void Http2Session::Close(uint32_t code, bool socket_closed) {
DEBUG_HTTP2SESSION(this, "closing session");

if (flags_ & SESSION_STATE_CLOSED)
if (flags_ & SESSION_STATE_CLOSING)
return;
flags_ |= SESSION_STATE_CLOSED;
flags_ |= SESSION_STATE_CLOSING;

// Stop reading on the i/o stream
if (stream_ != nullptr)
stream_->ReadStop();

// If the socket is not closed, then attempt to send a closing GOAWAY
// frame. There is no guarantee that this GOAWAY will be received by
// the peer but the HTTP/2 spec recommends sendinng it anyway. We'll
// the peer but the HTTP/2 spec recommends sending it anyway. We'll
// make a best effort.
if (!socket_closed) {
Http2Scope h2scope(this);
DEBUG_HTTP2SESSION2(this, "terminating session with code %d", code);
CHECK_EQ(nghttp2_session_terminate_session(session_, code), 0);
SendPendingData();
} else if (stream_ != nullptr) {
stream_->RemoveStreamListener(this);
}

flags_ |= SESSION_STATE_CLOSED;

// If there are outstanding pings, those will need to be canceled, do
// so on the next iteration of the event loop to avoid calling out into
// javascript since this may be called during garbage collection.
Expand Down Expand Up @@ -1355,25 +1357,32 @@ void Http2Session::MaybeScheduleWrite() {
}
}

void Http2Session::MaybeStopReading() {
int want_read = nghttp2_session_want_read(session_);
DEBUG_HTTP2SESSION2(this, "wants read? %d", want_read);
if (want_read == 0)
stream_->ReadStop();
}

// Unset the sending state, finish up all current writes, and reset
// storage for data and metadata that was associated with these writes.
void Http2Session::ClearOutgoing(int status) {
CHECK_NE(flags_ & SESSION_STATE_SENDING, 0);

flags_ &= ~SESSION_STATE_SENDING;

if (outgoing_buffers_.size() > 0) {
outgoing_storage_.clear();

for (const nghttp2_stream_write& wr : outgoing_buffers_) {
std::vector<nghttp2_stream_write> current_outgoing_buffers_;
current_outgoing_buffers_.swap(outgoing_buffers_);
for (const nghttp2_stream_write& wr : current_outgoing_buffers_) {
WriteWrap* wrap = wr.req_wrap;
if (wrap != nullptr)
wrap->Done(status);
}

outgoing_buffers_.clear();
}

flags_ &= ~SESSION_STATE_SENDING;

// Now that we've finished sending queued data, if there are any pending
// RstStreams we should try sending again and then flush them one by one.
if (pending_rst_streams_.size() > 0) {
Expand Down Expand Up @@ -1484,8 +1493,7 @@ uint8_t Http2Session::SendPendingData() {
ClearOutgoing(res.err);
}

DEBUG_HTTP2SESSION2(this, "wants data in return? %d",
nghttp2_session_want_read(session_));
MaybeStopReading();

return 0;
}
Expand Down Expand Up @@ -1618,8 +1626,7 @@ void Http2Session::OnStreamRead(ssize_t nread, const uv_buf_t& buf) {
};
MakeCallback(env()->error_string(), arraysize(argv), argv);
} else {
DEBUG_HTTP2SESSION2(this, "processed %d bytes. wants more? %d", ret,
nghttp2_session_want_read(session_));
MaybeStopReading();
}
}

Expand Down
8 changes: 6 additions & 2 deletions src/node_http2.h
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,8 @@ enum session_state_flags {
SESSION_STATE_HAS_SCOPE = 0x1,
SESSION_STATE_WRITE_SCHEDULED = 0x2,
SESSION_STATE_CLOSED = 0x4,
SESSION_STATE_SENDING = 0x8,
SESSION_STATE_CLOSING = 0x8,
SESSION_STATE_SENDING = 0x10,
};

// This allows for 4 default-sized frames with their frame headers
Expand Down Expand Up @@ -619,7 +620,7 @@ class Http2Stream : public AsyncWrap,

inline bool IsClosed() const {
return flags_ & NGHTTP2_STREAM_FLAG_CLOSED;
}
}

inline bool HasTrailers() const {
return flags_ & NGHTTP2_STREAM_FLAG_TRAILERS;
Expand Down Expand Up @@ -827,6 +828,9 @@ class Http2Session : public AsyncWrap, public StreamListener {
// Schedule a write if nghttp2 indicates it wants to write to the socket.
void MaybeScheduleWrite();

// Stop reading if nghttp2 doesn't want to anymore.
void MaybeStopReading();

// Returns pointer to the stream, or nullptr if stream does not exist
inline Http2Stream* FindStream(int32_t id);

Expand Down
6 changes: 0 additions & 6 deletions test/parallel/parallel.status
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,6 @@ prefix parallel
# Postmortem debugging data is prone to accidental removal during V8 updates.
test-postmortem-metadata: PASS,FLAKY

# http2 has a few bugs that make these tests flaky and that are currently worked
# on.
test-http2-client-upload-reject: PASS,FLAKY
test-http2-pipe: PASS,FLAKY
test-http2-client-upload: PASS,FLAKY

[$system==win32]

[$system==linux]
Expand Down
8 changes: 1 addition & 7 deletions test/parallel/test-http2-client-destroy.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,20 +109,14 @@ const Countdown = require('../common/countdown');

server.listen(0, common.mustCall(() => {
const client = h2.connect(`http://localhost:${server.address().port}`);
// On some platforms (e.g. windows), an ECONNRESET may occur at this
// point -- or it may not. Do not make this a mustCall
client.on('error', () => {});

client.on('close', () => {
server.close();
// calling destroy in here should not matter
client.destroy();
});

const req = client.request();
// On some platforms (e.g. windows), an ECONNRESET may occur at this
// point -- or it may not. Do not make this a mustCall
req.on('error', () => {});
client.request();
}));
}

Expand Down
6 changes: 1 addition & 5 deletions test/parallel/test-http2-server-close-callback.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,7 @@ const http2 = require('http2');
const server = http2.createServer();

server.listen(0, common.mustCall(() => {
const client = http2.connect(`http://localhost:${server.address().port}`);
client.on('error', (err) => {
if (err.code !== 'ECONNRESET')
throw err;
});
http2.connect(`http://localhost:${server.address().port}`);
}));

server.on('session', common.mustCall((s) => {
Expand Down
13 changes: 8 additions & 5 deletions test/parallel/test-http2-server-sessionerror.js
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,17 @@ server.on('session', common.mustCall((session) => {
server.listen(0, common.mustCall(() => {
const url = `http://localhost:${server.address().port}`;
http2.connect(url)
// An ECONNRESET error may occur depending on the platform (due largely
// to differences in the timing of socket closing). Do not wrap this in
// a common must call.
.on('error', () => {})
.on('error', common.expectsError({
code: 'ERR_HTTP2_SESSION_ERROR',
message: 'Session closed with error code 2',
}))
.on('close', () => {
server.removeAllListeners('error');
http2.connect(url)
.on('error', () => {})
.on('error', common.expectsError({
code: 'ERR_HTTP2_SESSION_ERROR',
message: 'Session closed with error code 2',
}))
.on('close', () => server.close());
});
}));
Loading

0 comments on commit 6cfca8a

Please sign in to comment.