Skip to content
This repository has been archived by the owner on Aug 23, 2019. It is now read-only.

Commit

Permalink
feat(dial): exposing connection handle flow
Browse files Browse the repository at this point in the history
  • Loading branch information
dignifiedquire authored and dryajov committed Apr 7, 2017
1 parent 5274e87 commit 514d480
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 138 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -87,4 +87,4 @@
"greenkeeper[bot] <greenkeeper[bot]@users.noreply.github.com>",
"ᴠɪᴄᴛᴏʀ ʙᴊᴇʟᴋʜᴏʟᴍ <[email protected]>"
]
}
}
142 changes: 5 additions & 137 deletions src/dial.js
Original file line number Diff line number Diff line change
@@ -1,12 +1,8 @@
'use strict'

const multistream = require('multistream-select')
const Connection = require('interface-connection').Connection
const debug = require('debug')
const log = debug('libp2p:swarm:dial')
const setImmediate = require('async/setImmediate')

const protocolMuxer = require('./protocol-muxer')

module.exports = function dial (swarm) {
return (pi, protocol, callback) => {
Expand All @@ -18,6 +14,7 @@ module.exports = function dial (swarm) {
callback = callback || function noop () {}

const proxyConn = new Connection()
const connHandler = swarm.connHandler(pi, protocol, proxyConn)

const b58Id = pi.id.toB58String()
log('dialing %s', b58Id)
Expand All @@ -28,54 +25,22 @@ module.exports = function dial (swarm) {
if (err) {
return callback(err)
}
gotWarmedUpConn(conn)
connHandler.handleNew(conn, callback)
})
} else {
const conn = swarm.conns[b58Id]
swarm.conns[b58Id] = undefined
gotWarmedUpConn(conn)
connHandler.handleWarmedUp(conn, callback)
}
} else {
if (!protocol) {
return callback()
}
gotMuxer(swarm.muxedConns[b58Id].muxer)
connHandler.gotMuxer(swarm.muxedConns[b58Id].muxer, callback)
}

return proxyConn

function gotWarmedUpConn (conn) {
conn.setPeerInfo(pi)
attemptMuxerUpgrade(conn, (err, muxer) => {
if (!protocol) {
if (err) {
swarm.conns[b58Id] = conn
}
return callback()
}

if (err) {
// couldn't upgrade to Muxer, it is ok
protocolHandshake(conn, protocol, callback)
} else {
gotMuxer(muxer)
}
})
}

function gotMuxer (muxer) {
if (swarm.identify) {
// TODO: Consider:
// 1. overload getPeerInfo
// 2. exec identify (through getPeerInfo)
// 3. update the peerInfo that is already stored in the conn
}

openConnInMuxedConn(muxer, (conn) => {
protocolHandshake(conn, protocol, callback)
})
}

function attemptDial (pi, cb) {
const tKeys = swarm.availableTransports(pi)

Expand All @@ -94,106 +59,9 @@ module.exports = function dial (swarm) {
return nextTransport(tKeys.shift())
}

cryptoDial()

function cryptoDial () {
const ms = new multistream.Dialer()
ms.handle(conn, (err) => {
if (err) {
return cb(err)
}

const id = swarm._peerInfo.id
log('selecting crypto: %s', swarm.crypto.tag)
ms.select(swarm.crypto.tag, (err, conn) => {
if (err) {
return cb(err)
}

const wrapped = swarm.crypto.encrypt(id, id.privKey, conn)
cb(null, wrapped)
})
})
}
cb(null, conn)
})
}
}

function attemptMuxerUpgrade (conn, cb) {
const muxers = Object.keys(swarm.muxers)
if (muxers.length === 0) {
return cb(new Error('no muxers available'))
}

// 1. try to handshake in one of the muxers available
// 2. if succeeds
// - add the muxedConn to the list of muxedConns
// - add incomming new streams to connHandler

const ms = new multistream.Dialer()
ms.handle(conn, (err) => {
if (err) {
return callback(new Error('multistream not supported'))
}

nextMuxer(muxers.shift())
})

function nextMuxer (key) {
log('selecting %s', key)
ms.select(key, (err, conn) => {
if (err) {
if (muxers.length === 0) {
cb(new Error('could not upgrade to stream muxing'))
} else {
nextMuxer(muxers.shift())
}
return
}

const muxedConn = swarm.muxers[key].dialer(conn)
swarm.muxedConns[b58Id] = {}
swarm.muxedConns[b58Id].muxer = muxedConn
// should not be needed anymore - swarm.muxedConns[b58Id].conn = conn

muxedConn.once('close', () => {
const b58Str = pi.id.toB58String()
delete swarm.muxedConns[b58Str]
pi.disconnect()
swarm._peerBook.get(b58Str).disconnect()
setImmediate(() => swarm.emit('peer-mux-closed', pi))
})

// For incoming streams, in case identify is on
muxedConn.on('stream', (conn) => {
protocolMuxer(swarm.protocols, conn)
})

setImmediate(() => swarm.emit('peer-mux-established', pi))

cb(null, muxedConn)
})
}
}

function openConnInMuxedConn (muxer, cb) {
cb(muxer.newStream())
}

function protocolHandshake (conn, protocol, cb) {
const ms = new multistream.Dialer()
ms.handle(conn, (err) => {
if (err) {
return callback(err)
}
ms.select(protocol, (err, conn) => {
if (err) {
return callback(err)
}
proxyConn.setInnerConn(conn)
callback(null, proxyConn)
})
})
}
}
}
162 changes: 162 additions & 0 deletions src/handler.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
'use strict'

const multistream = require('multistream-select')
const protocolMuxer = require('./protocol-muxer')

const debug = require('debug')
const log = debug('libp2p:swarm:handle')

module.exports = function process (swarm) {
return (pi, protocol, proxyConn) => {
const b58Id = pi.id.toB58String()

function cryptoDial (conn, cb) {
const ms = new multistream.Dialer()
ms.handle(conn, (err) => {
if (err) {
return cb(err)
}

const id = swarm._peerInfo.id
log('selecting crypto: %s', swarm.crypto.tag)
ms.select(swarm.crypto.tag, (err, conn) => {
if (err) {
return cb(err)
}

const wrapped = swarm.crypto.encrypt(id, id.privKey, conn)
cb(null, wrapped)
})
})
}

function gotWarmedUpConn (conn, cb) {
conn.setPeerInfo(pi)
attemptMuxerUpgrade(conn, (err, muxer) => {
if (!protocol) {
if (err) {
swarm.conns[b58Id] = conn
}
return cb()
}

if (err) {
// couldn't upgrade to Muxer, it is ok
protocolHandshake(conn, protocol, cb)
} else {
gotMuxer(muxer, cb)
}
})
}

function attemptMuxerUpgrade (conn, cb) {
const muxers = Object.keys(swarm.muxers)
if (muxers.length === 0) {
return cb(new Error('no muxers available'))
}

// 1. try to handshake in one of the muxers available
// 2. if succeeds
// - add the muxedConn to the list of muxedConns
// - add incomming new streams to connHandler

const ms = new multistream.Dialer()
ms.handle(conn, (err) => {
if (err) {
return cb(new Error('multistream not supported'))
}

nextMuxer(muxers.shift())
})

function nextMuxer (key) {
log('selecting %s', key)
ms.select(key, (err, conn) => {
if (err) {
if (muxers.length === 0) {
cb(new Error('could not upgrade to stream muxing'))
} else {
nextMuxer(muxers.shift())
}
return
}

const muxedConn = swarm.muxers[key].dialer(conn)
swarm.muxedConns[b58Id] = {}
swarm.muxedConns[b58Id].muxer = muxedConn
// should not be needed anymore - swarm.muxedConns[b58Id].conn = conn

swarm.emit('peer-mux-established', pi)

muxedConn.once('close', () => {
const b58Str = pi.id.toB58String()
delete swarm.muxedConns[b58Str]
pi.disconnect()
swarm._peerBook.get(b58Str).disconnect()
swarm.emit('peer-mux-closed', pi)
})

// For incoming streams, in case identify is on
muxedConn.on('stream', (conn) => {
protocolMuxer(swarm.protocols, conn)
})

cb(null, muxedConn)
})
}
}

function openConnInMuxedConn (muxer, cb) {
cb(muxer.newStream())
}

function protocolHandshake (conn, protocol, cb) {
const ms = new multistream.Dialer()
ms.handle(conn, (err) => {
if (err) {
return cb(err)
}
ms.select(protocol, (err, conn) => {
if (err) {
return cb(err)
}
proxyConn.setInnerConn(conn)
cb(null, proxyConn)
})
})
}

function handleNew (conn, cb) {
cryptoDial(conn, (err, conn) => {
if (err) {
log(err)
return cb(err)
}
gotWarmedUpConn(conn, cb)
})
}

function handleWarmedUp (conn, cb) {
gotWarmedUpConn(conn, cb)
}

function gotMuxer (muxer, cb) {
if (swarm.identify) {
// TODO: Consider:
// 1. overload getPeerInfo
// 2. exec identify (through getPeerInfo)
// 3. update the peerInfo that is already stored in the conn
}

openConnInMuxedConn(muxer, (conn) => {
protocolHandshake(conn, protocol, cb)
})
}

return {
handleNew: handleNew,
handleWarmedUp: handleWarmedUp,
gotMuxer: gotMuxer
}
}
}
2 changes: 2 additions & 0 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ const each = require('async/each')
const series = require('async/series')
const transport = require('./transport')
const connection = require('./connection')
const handler = require('./handler')
const dial = require('./dial')
const protocolMuxer = require('./protocol-muxer')
const plaintext = require('./plaintext')
Expand Down Expand Up @@ -56,6 +57,7 @@ function Swarm (peerInfo, peerBook) {

this.transport = transport(this)
this.connection = connection(this)
this.connHandler = handler(this)

this.availableTransports = (pi) => {
const myAddrs = pi.multiaddrs.toArray()
Expand Down
Empty file added test/01-transport-tcp.node.js
Empty file.

0 comments on commit 514d480

Please sign in to comment.