Skip to content

Commit

Permalink
fix: request body edge cases
Browse files Browse the repository at this point in the history
Readable can emit 'end' after .destroy() as well
as legacy streams might not implement the whole
Readable contract properly. Try to handle these
edge cases better.

Fixes: nodejs#66
  • Loading branch information
ronag committed May 5, 2020
1 parent e5f1d70 commit d94df77
Show file tree
Hide file tree
Showing 2 changed files with 106 additions and 30 deletions.
74 changes: 44 additions & 30 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ const kLastBody = Symbol('lastBody')
const kStream = Symbol('kStream')
const kClosed = Symbol('kClosed')

function nop () {}

function connect (client) {
var socket = null
var url = client.url
Expand Down Expand Up @@ -74,7 +76,7 @@ function reconnect (client, err) {
client.socket.removeAllListeners('end')
client.socket.removeAllListeners('finish')
client.socket.removeAllListeners('error')
client.socket.on('error', () => {})
client.socket.on('error', nop)
client.socket = null

// we reset the callbacks
Expand Down Expand Up @@ -160,19 +162,14 @@ class Client extends EventEmitter {
this.socket.write(body)
endRequest()
} else if (body && typeof body.pipe === 'function') {
const cleanup = this[kStream].finished(this.socket, (err) => {
if (err) {
body.destroy(err)
}
})

if (chunked) {
this.socket.write('transfer-encoding: chunked\r\n', 'ascii')
} else {
this.socket.write('\r\n', 'ascii')
}

// TODO we should pause the queue while we are piping
// TODO: Pause the queue while piping.

const onData = (chunk) => {
if (chunked) {
this.socket.write('\r\n' + Buffer.byteLength(chunk).toString(16) + '\r\n')
Expand All @@ -181,39 +178,56 @@ class Client extends EventEmitter {
body.pause()
}
}
const onEnd = () => {
this.socket.removeListener('drain', onDrain)

if (chunked) {
this.socket.cork()
this.socket.write('\r\n0\r\n', 'ascii')
}

endRequest()
}
const onDrain = () => {
body.resume()
}
const onFinished = (err) => {
freeSocketFinished()
freeBodyFinished()

body.on('data', onData)
this.socket.on('drain', onDrain)

this.socket.uncork()
this[kStream].finished(body, (err) => {
cleanup()
if (err) {
// TODO we might want to wait before previous in-flight
// requests are finished before destroying
if (this.socket) {
finishedSocket(this, cb)
this.socket.destroy(err)
} else {
callback(err, null)
}
if (!err) {
return
}

assert(this.socket)
body
.removeListener('data', onData)
.removeListener('end', onEnd)

this.socket.removeListener('drain', onDrain)
if (typeof body.destroy === 'function') {
body.destroy(err)
}

if (chunked) {
this.socket.cork()
this.socket.write('\r\n0\r\n', 'ascii')
// TODO we might want to wait before previous in-flight
// requests are finished before destroying
if (this.socket) {
finishedSocket(this, cb)
this.socket.destroy(err)
} else {
callback(err, null)
}
}

endRequest()
})
body
.on('data', onData)
.on('end', onEnd)
.on('error', nop)

this.socket
.on('drain', onDrain)
.uncork()

const freeSocketFinished = this[kStream].finished(this.socket, onFinished)
const freeBodyFinished = this[kStream].finished(body, onFinished)
} else {
assert(!body)
endRequest()
Expand Down
62 changes: 62 additions & 0 deletions test/client-errors.js
Original file line number Diff line number Diff line change
Expand Up @@ -440,3 +440,65 @@ test('fail invalid body regexp', (t) => {
})
})
})

test('socket fail while writing request body', (t) => {
t.plan(1)

const server = createServer()
server.once('request', (req, res) => {
res.write('asd')
})
t.tearDown(server.close.bind(server))

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.close.bind(client))
const body = new Readable({ read () {} })
body.push('asd')

client.on('connect', () => {
client.socket.destroy('kaboom')
})

client.request({
path: '/',
method: 'POST',
body
}, (err) => {
t.ok(err)
})
})
})

test('socket fail while ending request body', (t) => {
t.plan(2)

const server = createServer()
server.once('request', (req, res) => {
res.end()
})
t.tearDown(server.close.bind(server))

server.listen(0, () => {
const client = new Client(`http://localhost:${server.address().port}`)
t.tearDown(client.close.bind(client))

client.request({
path: '/',
method: 'POST',
body: 'asd'
}, (err) => {
t.error(err)
const body = new Readable({ read () {} })
body.push(null)
client.request({
path: '/',
method: 'POST',
body
}, (err) => {
t.ok(err)
})
client.socket.destroy('kaboom')
})
})
})

0 comments on commit d94df77

Please sign in to comment.