Skip to content

Commit

Permalink
quic: stream fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Jun 16, 2020
1 parent 5536044 commit 364c85e
Showing 1 changed file with 36 additions and 74 deletions.
110 changes: 36 additions & 74 deletions lib/internal/quic/core.js
Original file line number Diff line number Diff line change
Expand Up @@ -2601,59 +2601,45 @@ class QuicStream extends Duplex {
this._readableState.readingMore = true;
this.on('pause', streamOnPause);

// See src/node_quic_stream.h for an explanation
// of the initial states for unidirectional streams.
if (this.unidirectional) {
if (session instanceof QuicServerSession) {
if (this.serverInitiated) {
// Close the readable side
this.push(null);
this.read();
} else {
// Close the writable side
this.end();
}
} else if (this.serverInitiated) {
// Close the writable side
this.end();
} else {
this.push(null);
this.read();
}
}

// The QuicStream writes are corked until kSetHandle
// is set, ensuring that writes are buffered in JavaScript
// until we have somewhere to send them.
this.cork();
}

// Set handle is called once the QuicSession has been able
// to complete creation of the internal QuicStream handle.
// This will happen only after the QuicSession's own
// internal handle has been created. The QuicStream object
// is still minimally usable before this but any data
// written will be buffered until kSetHandle is called.
[kSetHandle](handle) {
this[kHandle] = handle;
if (handle !== undefined) {
handle.onread = onStreamRead;
handle[owner_symbol] = this;
this[async_id_symbol] = handle.getAsyncId();
this.#id = handle.id();
this.#dataRateHistogram = new Histogram(handle.rate);
this.#dataSizeHistogram = new Histogram(handle.size);
this.#dataAckHistogram = new Histogram(handle.ack);
this.uncork();
this.emit('ready');
} else {
if (this.#dataRateHistogram)
this.#dataRateHistogram[kDestroyHistogram]();
if (this.#dataSizeHistogram)
this.#dataSizeHistogram[kDestroyHistogram]();
if (this.#dataAckHistogram)
this.#dataAckHistogram[kDestroyHistogram]();
}
_construct(cb) {
// Set handle is called once the QuicSession has been able
// to complete creation of the internal QuicStream handle.
// This will happen only after the QuicSession's own
// internal handle has been created. The QuicStream object
// is still minimally usable before this but any data
// written will be buffered until kSetHandle is called.
this[kSetHandle] = (handle) => {
this[kHandle] = handle;
if (handle !== undefined) {
handle.onread = onStreamRead;
handle[owner_symbol] = this;
this[async_id_symbol] = handle.getAsyncId();
this.#id = handle.id();
this.#dataRateHistogram = new Histogram(handle.rate);
this.#dataSizeHistogram = new Histogram(handle.size);
this.#dataAckHistogram = new Histogram(handle.ack);
this.uncork();

this.emit('ready');
// TODO (fix): What happens on failure? We still need to
// invoke the callback somehow? Change signature of kSetHandle
// to (err, handle)?
cb();
} else {
if (this.#dataRateHistogram)
this.#dataRateHistogram[kDestroyHistogram]();
if (this.#dataSizeHistogram)
this.#dataSizeHistogram[kDestroyHistogram]();
if (this.#dataAckHistogram)
this.#dataAckHistogram[kDestroyHistogram]();
}
};
}

[kStreamReset](code) {
Expand Down Expand Up @@ -2700,6 +2686,7 @@ class QuicStream extends Duplex {
if (this.destroyed || this.#closed)
return;

// TODO: This will deadlock on failure?
if (this.pending)
return this.once('ready', () => this[kClose](family, code));

Expand Down Expand Up @@ -2785,19 +2772,6 @@ class QuicStream extends Duplex {
}

#writeGeneric = function(writev, data, encoding, cb) {
if (this.destroyed)
return; // TODO(addaleax): Can this happen?

// The stream should be corked while still pending
// but just in case uncork
// was called early, defer the actual write until the
// ready event is emitted.
if (this.pending) {
return this.once('ready', () => {
this.#writeGeneric(writev, data, encoding, cb);
});
}

this[kUpdateTimer]();
const req = (writev) ?
writevGeneric(this, data, cb) :
Expand All @@ -2821,13 +2795,6 @@ class QuicStream extends Duplex {
// coming so that a fin stream packet can be
// sent.
_final(cb) {
// The QuicStream should be corked while pending
// so this shouldn't be called, but just in case
// the stream was prematurely uncorked, defer the
// operation until the ready event is emitted.
if (this.pending)
return this.once('ready', () => this._final(cb));

const handle = this[kHandle];
if (handle === undefined) {
cb();
Expand All @@ -2840,16 +2807,10 @@ class QuicStream extends Duplex {
const err = handle.shutdown(req);
if (err === 1)
return cb();
// TODO (fix): else if (err !== 0)?
}

_read(nread) {
if (this.pending)
return this.once('ready', () => this._read(nread));

if (this.destroyed) { // TODO(addaleax): Can this happen?
this.push(null);
return;
}
if (!this.#didRead) {
this._readableState.readingMore = false;
this.#didRead = true;
Expand Down Expand Up @@ -2895,6 +2856,7 @@ class QuicStream extends Duplex {
throw new ERR_INVALID_ARG_TYPE('fd', ['number', 'FileHandle'], fd);

if (this.pending) {
// TODO: This will deadlock on failure?
return this.once('ready', () => {
this.sendFD(fd, { offset, length }, ownsFd);
});
Expand Down

0 comments on commit 364c85e

Please sign in to comment.