Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revamp - Update libp2p-utp to follow latest interface-connection and interface-transport #74

Merged
merged 6 commits into from
Mar 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 21 additions & 16 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,15 @@
"description": "Node.js implementation of the µTP module that libp2p uses, which implements the abstract-connection interface",
"main": "src/index.js",
"scripts": {
"lint": "aegir-lint",
"test": "aegir-test --env node",
"release": "aegir-release --env no-build",
"release-minor": "aegir-release --type minor --env no-build",
"release-major": "aegir-release --type major --env no-build",
"coverage": "aegir-coverage",
"coverage-publish": "aegir-coverage publish"
"lint": "aegir lint",
"test": "aegir test -t node",
"release": "aegir release -t node --no-build",
"release-minor": "aegir release --type minor -t node --no-build",
"release-major": "aegir release --type major -t node --no-build",
"coverage": "aegir coverage",
"coverage-publish": "aegir coverage publish --providers coveralls"
},
"pre-commit": [
"pre-push": [
"lint",
"test"
],
Expand All @@ -29,17 +29,22 @@
},
"homepage": "https://github.com/libp2p/js-libp2p-utp",
"devDependencies": {
"aegir": "^11.0.2",
"aegir": "^13.0.1",
"chai": "^4.1.2",
"dirty-chai": "^2.0.1",
"interface-connection": "~0.3.2",
"interface-transport": "~0.3.5",
"pre-commit": "^1.2.2",
"tape": "^4.8.0"
"pre-commit": "^1.2.2"
},
"dependencies": {
"multiaddr": "^3.0.0",
"utp-native": "^1.5.4"
"debug": "^3.1.0",
"interface-connection": "~0.3.2",
"ip-address": "^5.8.9",
"lodash.includes": "^4.3.0",
"lodash.isfunction": "^3.0.9",
"mafmt": "^4.0.0",
"multiaddr": "^3.0.2",
"once": "^1.4.0",
"stream-to-pull-stream": "^1.7.2",
"utp-native": "^1.6.2"
},
"contributors": [
"David Dias <[email protected]>",
Expand All @@ -48,4 +53,4 @@
"dignifiedquire <[email protected]>",
"greenkeeperio-bot <[email protected]>"
]
}
}
146 changes: 146 additions & 0 deletions src/create-listener.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
'use strict'

const multiaddr = require('multiaddr')
const Connection = require('interface-connection').Connection
// const os = require('os')
const includes = require('lodash.includes')
const utp = require('utp-native')
const toPull = require('stream-to-pull-stream')
const EventEmitter = require('events').EventEmitter
const debug = require('debug')
const log = debug('libp2p:utp:listen')

const getMultiaddr = require('./get-multiaddr')

const IPFS_CODE = 421
const CLOSE_TIMEOUT = 2000

function noop () {}

module.exports = (handler) => {
const listener = new EventEmitter()

const server = utp.createServer((socket) => {
// Avoid uncaught errors cause by unstable connections
socket.on('error', noop)

const addr = getMultiaddr(socket)

const s = toPull.duplex(socket)

s.getObservedAddrs = (cb) => cb(null, [addr])

trackSocket(server, socket)

const conn = new Connection(s)
handler(conn)
listener.emit('connection', conn)
})

server.on('listening', () => listener.emit('listening'))
server.on('error', (err) => listener.emit('error', err))
server.on('close', () => listener.emit('close'))

// Keep track of open connections to destroy in case of timeout
server.__connections = {}

listener.close = (options, callback) => {
if (typeof options === 'function') {
callback = options
options = {}
}
callback = callback || noop
options = options || {}

const timeout = setTimeout(() => {
log('unable to close graciously, destroying conns')
Object.keys(server.__connections).forEach((key) => {
log('destroying %s', key)
server.__connections[key].destroy()
})
}, options.timeout || CLOSE_TIMEOUT)

server.close(callback)

server.once('close', () => {
clearTimeout(timeout)
})
}

let ipfsId
let listeningAddr

listener.listen = (ma, callback) => {
listeningAddr = ma
if (includes(ma.protoNames(), 'ipfs')) {
ipfsId = getIpfsId(ma)
listeningAddr = ma.decapsulate('ipfs')
}

const lOpts = listeningAddr.toOptions()
log('Listening on %s %s', lOpts.port, lOpts.host)
server.listen(lOpts.port, lOpts.host, callback)
}

listener.getAddrs = (callback) => {
const multiaddrs = []
const address = server.address()

if (!address) {
return callback(new Error('Listener is not ready yet'))
}

// Because TCP will only return the IPv6 version
// we need to capture from the passed multiaddr
/*
if (listeningAddr.toString().indexOf('ip4') !== -1) {
let m = listeningAddr.decapsulate('utp')
m = m.encapsulate('/tcp/' + address.port)
if (ipfsId) {
m = m.encapsulate('/ipfs/' + ipfsId)
}

if (m.toString().indexOf('0.0.0.0') !== -1) {
const netInterfaces = os.networkInterfaces()
Object.keys(netInterfaces).forEach((niKey) => {
netInterfaces[niKey].forEach((ni) => {
if (ni.family === 'IPv4') {
multiaddrs.push(multiaddr(m.toString().replace('0.0.0.0', ni.address)))
}
})
})
} else {
multiaddrs.push(m)
}
}
*/

if (address.family === 'IPv6') {
let ma = multiaddr('/ip6/' + address.address + '/tcp/' + address.port)
if (ipfsId) {
ma = ma.encapsulate('/ipfs/' + ipfsId)
}

multiaddrs.push(ma)
}

callback(null, multiaddrs)
}

return listener
}

function getIpfsId (ma) {
return ma.stringTuples().filter((tuple) => {
return tuple[0] === IPFS_CODE
})[0][1]
}

function trackSocket (server, socket) {
const key = `${socket.remoteAddress}:${socket.remotePort}`
server.__connections[key] = socket

socket.on('close', () => {
delete server.__connections[key]
})
}
33 changes: 33 additions & 0 deletions src/get-multiaddr.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
'use strict'

// const multiaddr = require('multiaddr')
// const Address6 = require('ip-address').Address6

module.exports = (socket) => {
// utp-native does not pack the remoteAddress
// TODO check if it is possible
return null
/*
let ma

if (socket.remoteFamily === 'IPv6') {
const addr = new Address6(socket.remoteAddress)

if (addr.v4) {
const ip4 = addr.to4().correctForm()
ma = multiaddr('/ip4/' + ip4 +
'/tcp/' + socket.remotePort
)
} else {
ma = multiaddr('/ip6/' + socket.remoteAddress +
'/tcp/' + socket.remotePort
)
}
} else {
ma = multiaddr('/ip4/' + socket.remoteAddress +
'/tcp/' + socket.remotePort)
}

return ma
*/
}
84 changes: 75 additions & 9 deletions src/index.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,83 @@
'use strict'

var utp = require('utp-native')
const utp = require('utp-native')
const toPull = require('stream-to-pull-stream')
const mafmt = require('mafmt')
const includes = require('lodash.includes')
const isFunction = require('lodash.isfunction')
const Connection = require('interface-connection').Connection
const once = require('once')
const createListener = require('./create-listener.js')
const debug = require('debug')
const log = debug('libp2p:utp')

exports = module.exports
function noop () {}

exports.dial = function (multiaddr, options) {
options.ready = options.ready || function noop () {}
var opts = multiaddr.toOptions()
var client = utp.connect(opts.port, opts.host)
class UTP {
dial (ma, options, callback) {
if (isFunction(options)) {
callback = options
options = {}
}

client.once('connect', options.ready)
callback = once(callback || noop)

return client
const cOpts = ma.toOptions()
log('Connecting (UTP) to %s %s', cOpts.port, cOpts.host)

const rawSocket = utp.connect(cOpts)

rawSocket.once('timeout', () => {
log('timeout')
rawSocket.emit('error', new Error('Timeout'))
})

rawSocket.once('error', callback)

rawSocket.once('connect', () => {
rawSocket.removeListener('error', callback)
callback()
})

const socket = toPull.duplex(rawSocket)

const conn = new Connection(socket)

conn.getObservedAddrs = (callback) => {
return callback(null, [ma])
}

return conn
}

createListener (options, handler) {
if (isFunction(options)) {
handler = options
options = {}
}

handler = handler || noop

return createListener(handler)
}

filter (multiaddrs) {
if (!Array.isArray(multiaddrs)) {
multiaddrs = [multiaddrs]
}

return multiaddrs.filter((ma) => {
if (includes(ma.protoNames(), 'p2p-circuit')) {
return false
}

if (includes(ma.protoNames(), 'ipfs')) {
ma = ma.decapsulate('ipfs')
}

return mafmt.UTP.matches(ma)
})
}
}

exports.createListener = utp.createServer
module.exports = UTP
15 changes: 15 additions & 0 deletions test/connection.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/* eslint-env mocha */
'use strict'

const chai = require('chai')
const dirtyChai = require('dirty-chai')
const expect = chai.expect
chai.use(dirtyChai)
const UTP = require('../src')

describe('Connection', () => {
it('create an instance', () => {
const utp = new UTP()
expect(utp).to.exist()
})
})
15 changes: 15 additions & 0 deletions test/constructor.spec.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
/* eslint-env mocha */
'use strict'

const chai = require('chai')
const dirtyChai = require('dirty-chai')
const expect = chai.expect
chai.use(dirtyChai)
const UTP = require('../src')

describe('Constructor', () => {
it('create an instance', () => {
const utp = new UTP()
expect(utp).to.exist()
})
})
Loading