Skip to content
This repository has been archived by the owner on Aug 23, 2019. It is now read-only.

fix: improve dial queue and parallel dials #315

Merged
merged 7 commits into from
Mar 28, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ const sw = new switch(peerInfo , peerBook [, options])
If defined, `options` should be an object with the following keys and respective values:

- `blacklistTTL`: - number of ms a peer should not be dialable to after it errors. Defaults to `120000`(120 seconds)
- `maxParallelDials` - number of concurrent dials the switch should allow. Defaults to `50`
- `maxParallelDials`: - number of concurrent dials the switch should allow. Defaults to `50`
- `dialTimeout`: - number of ms a dial to a peer should be allowed to run. Defaults to `30000` (30 seconds)
- `stats`: an object with the following keys and respective values:
- `maxOldPeersRetention`: maximum old peers retention. For when peers disconnect and keeping the stats around in case they reconnect. Defaults to `100`.
- `computeThrottleMaxQueueSize`: maximum queue size to perform stats computation throttling. Defaults to `1000`.
Expand Down
1 change: 1 addition & 0 deletions src/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@

module.exports = {
BLACK_LIST_TTL: 120e3, // How long before an errored peer can be dialed again
DIAL_TIMEOUT: 30e3, // How long in ms a dial attempt is allowed to take
MAX_PARALLEL_DIALS: 50 // Maximum allowed concurrent dials
}
10 changes: 3 additions & 7 deletions src/dialer/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ class Queue {
* @constructor
* @param {string} peerId
* @param {Switch} _switch
* @param {function} onStopped Called when the queue stops
* @param {function(string)} onStopped Called when the queue stops
*/
constructor (peerId, _switch, onStopped) {
this.id = peerId
Expand All @@ -78,20 +78,16 @@ class Queue {
}

/**
* Adds the dial request to the queue and starts the
* queue if it is stopped
* Adds the dial request to the queue. The queue is not automatically started
* @param {string} protocol
* @param {boolean} useFSM If callback should use a ConnectionFSM instead
* @param {function(Error, Connection)} callback
* @returns {boolean} whether or not the queue has been started
*/
add (protocol, useFSM, callback) {
if (!this.isDialAllowed()) {
nextTick(callback, ERR_BLACKLISTED())
return false
}
this._queue.push({ protocol, useFSM, callback })
return this.start()
}

/**
Expand Down Expand Up @@ -133,7 +129,7 @@ class Queue {
if (this.isRunning) {
log('stopping dial queue to %s', this.id)
this.isRunning = false
this.onStopped()
this.onStopped(this.id)
}
}

Expand Down
51 changes: 29 additions & 22 deletions src/dialer/queueManager.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

const once = require('once')
const Queue = require('./queue')
const { DIAL_ABORTED } = require('../errors')
const noop = () => {}

class DialQueueManager {
Expand All @@ -11,10 +10,10 @@ class DialQueueManager {
* @param {Switch} _switch
*/
constructor (_switch) {
this._queue = []
this._queue = new Set()
this._dialingQueues = new Set()
this._queues = {}
this.switch = _switch
this.dials = 0
}

/**
Expand All @@ -24,11 +23,8 @@ class DialQueueManager {
* This causes the entire DialerQueue to be drained
*/
abort () {
// Abort items in the general queue
while (this._queue.length > 0) {
let dial = this._queue.shift()
dial.callback(DIAL_ABORTED())
}
// Clear the general queue
this._queue.clear()

// Abort the individual peer queues
const queues = Object.values(this._queues)
Expand All @@ -46,29 +42,39 @@ class DialQueueManager {
add ({ peerInfo, protocol, useFSM, callback }) {
callback = callback ? once(callback) : noop

// If the target queue is currently running, just add the dial
// directly to it. This acts as a crude priority lane for multiple
// calls to a peer.
// Add the dial to its respective queue
const targetQueue = this.getQueue(peerInfo)
if (targetQueue.isRunning) {
targetQueue.add(protocol, useFSM, callback)
targetQueue.add(protocol, useFSM, callback)

// If we're already connected to the peer, start the queue now
// While it might cause queues to go over the max parallel amount,
// it avoids blocking peers we're already connected to
if (peerInfo.isConnected()) {
targetQueue.start()
return
}

this._queue.push({ peerInfo, protocol, useFSM, callback })
// Add the id to the general queue set if the queue isn't running
// and if the queue is allowed to dial
if (!targetQueue.isRunning && targetQueue.isDialAllowed()) {
this._queue.add(targetQueue.id)
}

this.run()
}

/**
* Will execute up to `MAX_PARALLEL_DIALS` dials
*/
run () {
if (this.dials < this.switch.dialer.MAX_PARALLEL_DIALS && this._queue.length > 0) {
let { peerInfo, protocol, useFSM, callback } = this._queue.shift()
let dialQueue = this.getQueue(peerInfo)
if (dialQueue.add(protocol, useFSM, callback)) {
this.dials++
}
if (this._dialingQueues.size < this.switch.dialer.MAX_PARALLEL_DIALS && this._queue.size > 0) {
let nextQueue = this._queue.values().next()
if (nextQueue.done) return

this._queue.delete(nextQueue.value)
let targetQueue = this._queues[nextQueue.value]
this._dialingQueues.add(targetQueue.id)
targetQueue.start()
}
}

Expand All @@ -84,9 +90,10 @@ class DialQueueManager {
* A handler for when dialing queues stop. This will trigger
* `run()` in order to keep the queue processing.
* @private
* @param {string} id peer id of the queue that stopped
*/
_onQueueStopped () {
this.dials--
_onQueueStopped (id) {
this._dialingQueues.delete(id)
this.run()
}

Expand Down
21 changes: 12 additions & 9 deletions src/limit-dialer/index.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
'use strict'

const tryEach = require('async/tryEach')
const race = require('async/race')
const debug = require('debug')
const once = require('once')

const log = debug('libp2p:switch:dialer')

const DialQueue = require('./queue')
const { CONNECTION_FAILED } = require('../errors')

/**
* Track dials per peer and limited them.
Expand Down Expand Up @@ -42,19 +43,21 @@ class LimitDialer {

let errors = []
const tasks = addrs.map((m) => {
return (cb) => this.dialSingle(peer, transport, m, token, (err, result) => {
if (err) {
errors.push(err)
return cb(err)
return (cb) => this.dialSingle(peer, transport, m, token, (err, res) => {
if (res) return cb(null, res)

errors.push(err || CONNECTION_FAILED())

if (errors.length === tasks.length) {
cb(errors)
}
return cb(null, result)
})
})

tryEach(tasks, (_, result) => {
if (result && result.conn) {
race(tasks, (_, successfulDial) => {
if (successfulDial) {
log('dialMany:success')
return callback(null, result)
return callback(null, successfulDial)
}

log('dialMany:error')
Expand Down
4 changes: 2 additions & 2 deletions src/limit-dialer/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,9 @@ class DialQueue {
pull(empty(), conn)
// If we can close the connection, do it
if (typeof conn.close === 'function') {
return conn.close((_) => callback(null, { cancel: true }))
return conn.close((_) => callback(null))
}
return callback(null, { cancel: true })
return callback(null)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does removing these objects do?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe these were left over from some old code, only valid connection results are being used now, so they were deleted. The intent is that it creates a cancel token that causes other successfully dialed connections to be destroyed. Now the token is created when we receive the first valid connection.

}

// one is enough
Expand Down
7 changes: 2 additions & 5 deletions src/transport.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,19 @@ const debug = require('debug')
const log = debug('libp2p:switch:transport')

const LimitDialer = require('./limit-dialer')
const { DIAL_TIMEOUT } = require('./constants')

// number of concurrent outbound dials to make per peer, same as go-libp2p-swtch
const defaultPerPeerRateLimit = 8

// the amount of time a single dial has to succeed
// TODO this should be exposed as a option
const dialTimeout = 30 * 1000

/**
* Manages the transports for the switch. This simplifies dialing and listening across
* multiple transports.
*/
class TransportManager {
constructor (_switch) {
this.switch = _switch
this.dialer = new LimitDialer(defaultPerPeerRateLimit, dialTimeout)
this.dialer = new LimitDialer(defaultPerPeerRateLimit, this.switch._options.dialTimeout || DIAL_TIMEOUT)
}

/**
Expand Down
13 changes: 9 additions & 4 deletions test/limit-dialer.node.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

const chai = require('chai')
chai.use(require('dirty-chai'))
chai.use(require('chai-checkmark'))
const expect = chai.expect
const multiaddr = require('multiaddr')
const pull = require('pull-stream')
Expand Down Expand Up @@ -52,13 +53,15 @@ describe('LimitDialer', () => {
it('two success', (done) => {
const dialer = new LimitDialer(2, 10)

expect(2).checks(done)

// mock transport
const t1 = {
dial (addr, cb) {
const as = addr.toString()
if (as.match(/191/)) {
setImmediate(() => cb(new Error('fail')))
return {}
return null
} else if (as.match(/192/)) {
setTimeout(cb, 2)
return {
Expand All @@ -69,7 +72,10 @@ describe('LimitDialer', () => {
setTimeout(cb, 8)
return {
source: pull.values([2]),
sink: pull.drain()
sink: pull.onEnd((err) => {
// Verify the unused connection gets closed
expect(err).to.not.exist().mark()
})
}
}
}
Expand All @@ -83,8 +89,7 @@ describe('LimitDialer', () => {
conn,
pull.collect((err, res) => {
expect(err).to.not.exist()
expect(res).to.be.eql([1])
done()
expect(res).to.be.eql([1]).mark()
})
)
})
Expand Down