This repository has been archived by the owner on Aug 23, 2019. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 37
/
manager.js
261 lines (229 loc) · 7.27 KB
/
manager.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
'use strict'
const identify = require('libp2p-identify')
const multistream = require('multistream-select')
const waterfall = require('async/waterfall')
const debug = require('debug')
const log = debug('libp2p:switch:conn-manager')
const once = require('once')
const ConnectionFSM = require('../connection')
const Circuit = require('libp2p-circuit')
const plaintext = require('../plaintext')
/**
* Contains methods for binding handlers to the Switch
* in order to better manage its connections.
*/
class ConnectionManager {
constructor (_switch) {
this.switch = _switch
this.connections = {}
}
/**
* Adds the connection for tracking if it's not already added
* @private
* @param {ConnectionFSM} connection
* @returns {void}
*/
add (connection) {
this.connections[connection.theirB58Id] = this.connections[connection.theirB58Id] || []
// Only add it if it's not there
if (!this.get(connection)) {
this.connections[connection.theirB58Id].push(connection)
}
}
/**
* Gets the connection from the list if it exists
* @private
* @param {ConnectionFSM} connection
* @returns {ConnectionFSM|null} The found connection or null
*/
get (connection) {
if (!this.connections[connection.theirB58Id]) return null
for (let i = 0; i < this.connections[connection.theirB58Id].length; i++) {
if (this.connections[connection.theirB58Id][i] === connection) {
return this.connections[connection.theirB58Id][i]
}
}
return null
}
/**
* Gets a connection associated with the given peer
* @private
* @param {string} peerId The peers id
* @returns {ConnectionFSM|null} The found connection or null
*/
getOne (peerId) {
if (this.connections[peerId]) {
// TODO: Maybe select the best?
return this.connections[peerId][0]
}
return null
}
/**
* Removes the connection from tracking
* @private
* @param {ConnectionFSM} connection The connection to remove
* @returns {void}
*/
remove (connection) {
if (!this.connections[connection.theirB58Id]) return
for (let i = 0; i < this.connections[connection.theirB58Id].length; i++) {
if (this.connections[connection.theirB58Id][i] === connection) {
this.connections[connection.theirB58Id].splice(i, 1)
return
}
}
}
/**
* Returns all connections being tracked
* @private
* @returns {ConnectionFSM[]}
*/
getAll () {
let connections = []
for (const conns of Object.values(this.connections)) {
connections = [...connections, ...conns]
}
return connections
}
/**
* Returns all connections being tracked for a given peer id
* @private
* @param {string} peerId Stringified peer id
* @returns {ConnectionFSM[]}
*/
getAllById (peerId) {
return this.connections[peerId] || []
}
/**
* Adds a listener for the given `muxer` and creates a handler for it
* leveraging the Switch.protocolMuxer handler factory
*
* @param {Muxer} muxer
* @returns {void}
*/
addStreamMuxer (muxer) {
// for dialing
this.switch.muxers[muxer.multicodec] = muxer
// for listening
this.switch.handle(muxer.multicodec, (protocol, conn) => {
const muxedConn = muxer.listener(conn)
muxedConn.on('stream', this.switch.protocolMuxer(null))
// If identify is enabled
// 1. overload getPeerInfo
// 2. call getPeerInfo
// 3. add this conn to the pool
if (this.switch.identify) {
// Get the peer info from the crypto exchange
conn.getPeerInfo((err, cryptoPI) => {
if (err || !cryptoPI) {
log('crypto peerInfo wasnt found')
}
// overload peerInfo to use Identify instead
conn.getPeerInfo = (callback) => {
const conn = muxedConn.newStream()
const ms = new multistream.Dialer()
callback = once(callback)
waterfall([
(cb) => ms.handle(conn, cb),
(cb) => ms.select(identify.multicodec, cb),
// run identify and verify the peer has the same info from crypto
(conn, cb) => identify.dialer(conn, cryptoPI, cb),
(peerInfo, observedAddrs, cb) => {
observedAddrs.forEach((oa) => {
this.switch._peerInfo.multiaddrs.addSafe(oa)
})
cb(null, peerInfo)
}
], (err, peerInfo) => {
if (err) {
return muxedConn.end(() => {
callback(err, null)
})
}
if (peerInfo) {
conn.setPeerInfo(peerInfo)
}
callback(err, peerInfo)
})
}
conn.getPeerInfo((err, peerInfo) => {
/* eslint no-warning-comments: off */
if (err) {
return log('identify not successful')
}
const b58Str = peerInfo.id.toB58String()
const connection = new ConnectionFSM({
_switch: this.switch,
peerInfo,
muxer: muxedConn,
conn: conn,
type: 'inc'
})
this.switch.connection.add(connection)
if (peerInfo.multiaddrs.size > 0) {
// with incomming conn and through identify, going to pick one
// of the available multiaddrs from the other peer as the one
// I'm connected to as we really can't be sure at the moment
// TODO add this consideration to the connection abstraction!
peerInfo.connect(peerInfo.multiaddrs.toArray()[0])
} else {
// for the case of websockets in the browser, where peers have
// no addr, use just their IPFS id
peerInfo.connect(`/ipfs/${b58Str}`)
}
peerInfo = this.switch._peerBook.put(peerInfo)
muxedConn.once('close', () => {
connection.close()
})
this.switch.emit('peer-mux-established', peerInfo)
})
})
}
return conn
})
}
/**
* Adds the `encrypt` handler for the given `tag` and also sets the
* Switch's crypto to passed `encrypt` function
*
* @param {String} tag
* @param {function(PeerID, Connection, PeerId, Callback)} encrypt
* @returns {void}
*/
crypto (tag, encrypt) {
if (!tag && !encrypt) {
tag = plaintext.tag
encrypt = plaintext.encrypt
}
this.switch.crypto = { tag, encrypt }
}
/**
* If config.enabled is true, a Circuit relay will be added to the
* available Switch transports.
*
* @param {any} config
* @returns {void}
*/
enableCircuitRelay (config) {
config = config || {}
if (config.enabled) {
if (!config.hop) {
Object.assign(config, { hop: { enabled: false, active: false } })
}
this.switch.transport.add(Circuit.tag, new Circuit(this.switch, config))
}
}
/**
* Sets identify to true on the Switch and performs handshakes
* for libp2p-identify leveraging the Switch's muxer.
*
* @returns {void}
*/
reuse () {
this.switch.identify = true
this.switch.handle(identify.multicodec, (protocol, conn) => {
identify.listener(conn, this.switch._peerInfo)
})
}
}
module.exports = ConnectionManager