From 0317ed6f3f08e579020cc4794166774e4a7bd9ae Mon Sep 17 00:00:00 2001 From: Dmitriy Ryajov Date: Sun, 2 Apr 2017 12:39:12 -0700 Subject: [PATCH] feat(dial): exposing connection handle flow --- src/dial.js | 142 ++--------------------------------------- src/handler.js | 162 +++++++++++++++++++++++++++++++++++++++++++++++ src/index.js | 2 + src/transport.js | 17 ----- 4 files changed, 169 insertions(+), 154 deletions(-) create mode 100644 src/handler.js diff --git a/src/dial.js b/src/dial.js index c1de123..0591605 100644 --- a/src/dial.js +++ b/src/dial.js @@ -1,12 +1,8 @@ 'use strict' -const multistream = require('multistream-select') const Connection = require('interface-connection').Connection const debug = require('debug') const log = debug('libp2p:swarm:dial') -const setImmediate = require('async/setImmediate') - -const protocolMuxer = require('./protocol-muxer') module.exports = function dial (swarm) { return (pi, protocol, callback) => { @@ -18,6 +14,7 @@ module.exports = function dial (swarm) { callback = callback || function noop () {} const proxyConn = new Connection() + const connHandler = swarm.connHandler(pi, protocol, proxyConn) const b58Id = pi.id.toB58String() log('dialing %s', b58Id) @@ -28,54 +25,22 @@ module.exports = function dial (swarm) { if (err) { return callback(err) } - gotWarmedUpConn(conn) + connHandler.handleNew(conn, callback) }) } else { const conn = swarm.conns[b58Id] swarm.conns[b58Id] = undefined - gotWarmedUpConn(conn) + connHandler.handleWarmedUp(conn, callback) } } else { if (!protocol) { return callback() } - gotMuxer(swarm.muxedConns[b58Id].muxer) + connHandler.gotMuxer(swarm.muxedConns[b58Id].muxer, callback) } return proxyConn - function gotWarmedUpConn (conn) { - conn.setPeerInfo(pi) - attemptMuxerUpgrade(conn, (err, muxer) => { - if (!protocol) { - if (err) { - swarm.conns[b58Id] = conn - } - return callback() - } - - if (err) { - // couldn't upgrade to Muxer, it is ok - protocolHandshake(conn, protocol, callback) - } else { - gotMuxer(muxer) - } - }) - } - - function gotMuxer (muxer) { - if (swarm.identify) { - // TODO: Consider: - // 1. overload getPeerInfo - // 2. exec identify (through getPeerInfo) - // 3. update the peerInfo that is already stored in the conn - } - - openConnInMuxedConn(muxer, (conn) => { - protocolHandshake(conn, protocol, callback) - }) - } - function attemptDial (pi, cb) { const tKeys = swarm.availableTransports(pi) @@ -94,106 +59,9 @@ module.exports = function dial (swarm) { return nextTransport(tKeys.shift()) } - cryptoDial() - - function cryptoDial () { - const ms = new multistream.Dialer() - ms.handle(conn, (err) => { - if (err) { - return cb(err) - } - - const id = swarm._peerInfo.id - log('selecting crypto: %s', swarm.crypto.tag) - ms.select(swarm.crypto.tag, (err, conn) => { - if (err) { - return cb(err) - } - - const wrapped = swarm.crypto.encrypt(id, id.privKey, conn) - cb(null, wrapped) - }) - }) - } + cb(null, conn) }) } } - - function attemptMuxerUpgrade (conn, cb) { - const muxers = Object.keys(swarm.muxers) - if (muxers.length === 0) { - return cb(new Error('no muxers available')) - } - - // 1. try to handshake in one of the muxers available - // 2. if succeeds - // - add the muxedConn to the list of muxedConns - // - add incomming new streams to connHandler - - const ms = new multistream.Dialer() - ms.handle(conn, (err) => { - if (err) { - return callback(new Error('multistream not supported')) - } - - nextMuxer(muxers.shift()) - }) - - function nextMuxer (key) { - log('selecting %s', key) - ms.select(key, (err, conn) => { - if (err) { - if (muxers.length === 0) { - cb(new Error('could not upgrade to stream muxing')) - } else { - nextMuxer(muxers.shift()) - } - return - } - - const muxedConn = swarm.muxers[key].dialer(conn) - swarm.muxedConns[b58Id] = {} - swarm.muxedConns[b58Id].muxer = muxedConn - // should not be needed anymore - swarm.muxedConns[b58Id].conn = conn - - muxedConn.once('close', () => { - const b58Str = pi.id.toB58String() - delete swarm.muxedConns[b58Str] - pi.disconnect() - swarm._peerBook.get(b58Str).disconnect() - setImmediate(() => swarm.emit('peer-mux-closed', pi)) - }) - - // For incoming streams, in case identify is on - muxedConn.on('stream', (conn) => { - protocolMuxer(swarm.protocols, conn) - }) - - setImmediate(() => swarm.emit('peer-mux-established', pi)) - - cb(null, muxedConn) - }) - } - } - - function openConnInMuxedConn (muxer, cb) { - cb(muxer.newStream()) - } - - function protocolHandshake (conn, protocol, cb) { - const ms = new multistream.Dialer() - ms.handle(conn, (err) => { - if (err) { - return callback(err) - } - ms.select(protocol, (err, conn) => { - if (err) { - return callback(err) - } - proxyConn.setInnerConn(conn) - callback(null, proxyConn) - }) - }) - } } } diff --git a/src/handler.js b/src/handler.js new file mode 100644 index 0000000..23d3276 --- /dev/null +++ b/src/handler.js @@ -0,0 +1,162 @@ +'use strict' + +const multistream = require('multistream-select') +const protocolMuxer = require('./protocol-muxer') + +const debug = require('debug') +const log = debug('libp2p:swarm:handle') + +module.exports = function process (swarm) { + return (pi, protocol, proxyConn) => { + const b58Id = pi.id.toB58String() + + function cryptoDial (conn, cb) { + const ms = new multistream.Dialer() + ms.handle(conn, (err) => { + if (err) { + return cb(err) + } + + const id = swarm._peerInfo.id + log('selecting crypto: %s', swarm.crypto.tag) + ms.select(swarm.crypto.tag, (err, conn) => { + if (err) { + return cb(err) + } + + const wrapped = swarm.crypto.encrypt(id, id.privKey, conn) + cb(null, wrapped) + }) + }) + } + + function gotWarmedUpConn (conn, cb) { + conn.setPeerInfo(pi) + attemptMuxerUpgrade(conn, (err, muxer) => { + if (!protocol) { + if (err) { + swarm.conns[b58Id] = conn + } + return cb() + } + + if (err) { + // couldn't upgrade to Muxer, it is ok + protocolHandshake(conn, protocol, cb) + } else { + gotMuxer(muxer, cb) + } + }) + } + + function attemptMuxerUpgrade (conn, cb) { + const muxers = Object.keys(swarm.muxers) + if (muxers.length === 0) { + return cb(new Error('no muxers available')) + } + + // 1. try to handshake in one of the muxers available + // 2. if succeeds + // - add the muxedConn to the list of muxedConns + // - add incomming new streams to connHandler + + const ms = new multistream.Dialer() + ms.handle(conn, (err) => { + if (err) { + return cb(new Error('multistream not supported')) + } + + nextMuxer(muxers.shift()) + }) + + function nextMuxer (key) { + log('selecting %s', key) + ms.select(key, (err, conn) => { + if (err) { + if (muxers.length === 0) { + cb(new Error('could not upgrade to stream muxing')) + } else { + nextMuxer(muxers.shift()) + } + return + } + + const muxedConn = swarm.muxers[key].dialer(conn) + swarm.muxedConns[b58Id] = {} + swarm.muxedConns[b58Id].muxer = muxedConn + // should not be needed anymore - swarm.muxedConns[b58Id].conn = conn + + swarm.emit('peer-mux-established', pi) + + muxedConn.once('close', () => { + const b58Str = pi.id.toB58String() + delete swarm.muxedConns[b58Str] + pi.disconnect() + swarm._peerBook.get(b58Str).disconnect() + swarm.emit('peer-mux-closed', pi) + }) + + // For incoming streams, in case identify is on + muxedConn.on('stream', (conn) => { + protocolMuxer(swarm.protocols, conn) + }) + + cb(null, muxedConn) + }) + } + } + + function openConnInMuxedConn (muxer, cb) { + cb(muxer.newStream()) + } + + function protocolHandshake (conn, protocol, cb) { + const ms = new multistream.Dialer() + ms.handle(conn, (err) => { + if (err) { + return cb(err) + } + ms.select(protocol, (err, conn) => { + if (err) { + return cb(err) + } + proxyConn.setInnerConn(conn) + cb(null, proxyConn) + }) + }) + } + + function handleNew (conn, cb) { + cryptoDial(conn, (err, conn) => { + if (err) { + log(err) + return cb(err) + } + gotWarmedUpConn(conn, cb) + }) + } + + function handleWarmedUp (conn, cb) { + gotWarmedUpConn(conn, cb) + } + + function gotMuxer (muxer, cb) { + if (swarm.identify) { + // TODO: Consider: + // 1. overload getPeerInfo + // 2. exec identify (through getPeerInfo) + // 3. update the peerInfo that is already stored in the conn + } + + openConnInMuxedConn(muxer, (conn) => { + protocolHandshake(conn, protocol, cb) + }) + } + + return { + handleNew: handleNew, + handleWarmedUp: handleWarmedUp, + gotMuxer: gotMuxer + } + } +} diff --git a/src/index.js b/src/index.js index ac459d1..eeda97f 100644 --- a/src/index.js +++ b/src/index.js @@ -6,6 +6,7 @@ const each = require('async/each') const series = require('async/series') const transport = require('./transport') const connection = require('./connection') +const handler = require('./handler') const dial = require('./dial') const protocolMuxer = require('./protocol-muxer') const plaintext = require('./plaintext') @@ -56,6 +57,7 @@ function Swarm (peerInfo, peerBook) { this.transport = transport(this) this.connection = connection(this) + this.connHandler = handler(this) this.availableTransports = (pi) => { const myAddrs = pi.multiaddrs.toArray() diff --git a/src/transport.js b/src/transport.js index 7bd1a5e..9d38978 100644 --- a/src/transport.js +++ b/src/transport.js @@ -1,8 +1,6 @@ 'use strict' const parallel = require('async/parallel') -const queue = require('async/queue') -const timeout = require('async/timeout') const once = require('once') const debug = require('debug') const log = debug('libp2p:swarm:transport') @@ -16,12 +14,6 @@ const defaultPerPeerRateLimit = 8 // the amount of time a single dial has to succeed const dialTimeout = 10 * 1000 -// number of concurrent outbound dials to make per peer, same as go-libp2p-swarm -const defaultPerPeerRateLimit = 8 - -// the amount of time a single dial has to succeed -const dialTimeout = 10 * 1000 - module.exports = function (swarm) { const dialer = new LimitDialer(defaultPerPeerRateLimit, dialTimeout) @@ -138,13 +130,4 @@ function dialables (tp, multiaddrs) { return tp.filter(multiaddrs) } -function dialWithTimeout (transport, multiaddr, maxTimeout, callback) { - timeout((cb) => { - const conn = transport.dial(multiaddr, (err) => { - log('dialed') - cb(err, conn) - }) - }, maxTimeout)(callback) -} - function noop () {}