Skip to content

Commit

Permalink
perf: optimize
Browse files Browse the repository at this point in the history
  • Loading branch information
dryajov authored and jacobheun committed Feb 7, 2019
1 parent 61f82f8 commit f22379f
Show file tree
Hide file tree
Showing 5 changed files with 65 additions and 56 deletions.
1 change: 0 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
"url": "git+https://github.com/libp2p/pull-mplex.git"
},
"dependencies": {
"bl": "^1.2.2",
"buffer-reuse-pool": "^1.0.0",
"chai": "^4.1.2",
"debug": "^3.1.0",
Expand Down
46 changes: 23 additions & 23 deletions src/channel.js
Original file line number Diff line number Diff line change
Expand Up @@ -38,22 +38,22 @@ class Channel extends EE {
? consts.type.OUT_RESET
: consts.type.IN_RESET

this._log = (name, data) => {
log({
op: name,
name: this._name,
id: this._id,
endedLocal: this._endedLocal,
endedRemote: this._endedRemote,
initiator: this._initiator,
data: (data && data.toString()) || ''
})
}

this._log('new channel', this._name)
// this._log = (name, data) => {
// log({
// op: name,
// name: this._name,
// id: this._id,
// endedLocal: this._endedLocal,
// endedRemote: this._endedRemote,
// initiator: this._initiator,
// data: (data && data.toString()) || ''
// })
// }

// this._log('new channel', this._name)

this._msgs = pushable((err) => {
this._log('source closed', err)
// this._log('source closed', err)
if (err && typeof err !== 'boolean') {
setImmediate(() => this.emit('error', err))
}
Expand All @@ -64,7 +64,7 @@ class Channel extends EE {

this.sink = (read) => {
const next = (end, data) => {
this._log('sink', data)
// this._log('sink', data)

// stream already ended
if (this._endedLocal) { return }
Expand All @@ -73,7 +73,7 @@ class Channel extends EE {

// source ended, close the stream
if (end === true) {
return this.endChan()
return this.endChan(null)
}

// source errored, reset stream
Expand Down Expand Up @@ -118,13 +118,13 @@ class Channel extends EE {
}

push (data) {
this._log('push', data)
// this._log('push', data)
this._msgs.push(data)
}

// close for reading
close (err) {
this._log('close', err)
// this._log('close', err)
if (!this._endedRemote) {
this._endedRemote = err || true
this._msgs.end(this._endedRemote)
Expand All @@ -133,13 +133,13 @@ class Channel extends EE {
}

reset (err) {
this._log('reset', err)
// this._log('reset', err)
this._reset = err || 'channel reset!'
this.close(this._reset)
}

openChan () {
this._log('openChan')
// this._log('openChan')

if (this.open) { return } // chan already open

Expand All @@ -153,7 +153,7 @@ class Channel extends EE {
}

sendMsg (data) {
this._log('sendMsg', data)
// this._log('sendMsg', data)

if (!this.open) {
this.openChan()
Expand All @@ -167,7 +167,7 @@ class Channel extends EE {
}

endChan () {
this._log('endChan')
// this._log('endChan')

if (!this.open) {
return
Expand All @@ -180,7 +180,7 @@ class Channel extends EE {
}

resetChan () {
this._log('endChan')
// this._log('endChan')

if (!this.open) {
return
Expand Down
42 changes: 26 additions & 16 deletions src/coder.js
Original file line number Diff line number Diff line change
@@ -1,37 +1,37 @@
'use strict'

const pull = require('pull-stream')
const varint = require('varint')
const through = require('pull-through')
const BufferList = require('bl')

const debug = require('debug')

const log = debug('pull-plex:coder')
log.err = debug('pull-plex:coder:err')

let pool = Buffer.allocUnsafe(100 * 1024)
let used = 0
const PULL_LENGTH = 10 * 1024
const empty = Buffer.alloc(0)
exports.encode = () => {
let pool = Buffer.alloc(PULL_LENGTH)
let used = 0

return through(function (msg) {
const oldUsed = used
varint.encode(msg[0] << 3 | msg[1], pool, used)
used += varint.encode.bytes
varint.encode(varint.encode(msg[2] ? Buffer.byteLength(msg[2]) : 0), pool, used)
varint.encode(varint.encode(msg[2] ? msg[2].length : 0), pool, used)
used += varint.encode.bytes
this.queue(pool.slice(oldUsed, used)) // send header

if (pool.length - used < 100) {
pool = Buffer.allocUnsafe(10 * 1024)
if (PULL_LENGTH - used < 100) {
pool = Buffer.alloc(PULL_LENGTH)
used = 0
}

this.queue(msg[2] || empty)
})
}

let States = {
const States = {
PARSING: 0,
READING: 1
}
Expand All @@ -42,14 +42,24 @@ exports.decode = () => {
let length = 0
let buffer = null

const decode = (msg) => {
const tryDecode = (msg) => {
let offset = 0
let length = 0
try {
let offset = 0
let length = 0
const h = varint.decode(msg)
let h = varint.decode(msg)
offset += varint.decode.bytes
length = varint.decode(msg, offset)
offset += varint.decode.bytes
return [h, offset, length]
} catch (err) {
log.err(err) // ignore if data is empty
}
return []
}

const decode = (msg) => {
const [h, offset, length] = tryDecode(msg)
if (h !== void 0) {
const message = {
id: h >> 3,
type: h & 7,
Expand All @@ -58,10 +68,9 @@ exports.decode = () => {

state = States.READING
return [msg.slice(offset), message, length]
} catch (err) {
log.err(err) // ignore if data is empty
return [msg, undefined, undefined]
}

return [msg]
}

const read = (msg, data, length) => {
Expand All @@ -80,7 +89,8 @@ exports.decode = () => {
return [left, msg.slice(length - left), data]
}

return through(function (msg) {
return through(function (msg_) {
let msg = msg_
while (msg && msg.length) {
if (States.PARSING === state) {
if (!buffer) {
Expand Down
30 changes: 15 additions & 15 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class Mplex extends EE {
}

this._chandata = pushable((err) => {
this._log('chandata ended')
// this._log('chandata ended')
this._endedRemote = true
this.close(err)
})
Expand All @@ -74,23 +74,23 @@ class Mplex extends EE {
const self = this
this.sink = pull(
through(function (data) {
if (Buffer.byteLength(data) > self._maxMsgSize) {
if (data && data.length >= self._maxMsgSize) {
setImmediate(() => self.emit('error', new Error('message too large!')))
return this.queue(null)
}
this.queue(data)
}),
coder.decode(),
(read) => {
const next = (end, data) => {
if (this._endedLocal) { return }
if (end === true) { return this.close() }
if (end) { return this.reset(end) }
this._handle(data)
function next (end, data) {
if (self._endedLocal) { return }
if (end === true) { return self.close() }
if (end) { return self.reset(end) }
self._handle(data)
return read(null, next)
}

read(null, next)
return read(null, next)
})
}

Expand All @@ -99,7 +99,7 @@ class Mplex extends EE {
}

close (err) {
this._log('close', err)
// this._log('close', err)

if (this.destroyed) { return }

Expand Down Expand Up @@ -130,7 +130,7 @@ class Mplex extends EE {
}

push (data) {
this._log('push', data)
// this._log('push', data)
if (data.data &&
Buffer.byteLength(data.data) > this._maxMsgSize) {
this._chandata.end(new Error('message too large!'))
Expand Down Expand Up @@ -208,13 +208,13 @@ class Mplex extends EE {
}

_handle (msg) {
this._log('_handle', msg)
// this._log('_handle', msg)
const { id, type, data } = msg
switch (type) {
case consts.type.NEW: {
const chan = this._newStream(id, false, true, data.toString(), this._inChannels)
setImmediate(() => this.emit('stream', chan, id))
return
break
}

case consts.type.OUT_MESSAGE:
Expand All @@ -224,7 +224,7 @@ class Mplex extends EE {
if (chan) {
chan.push(data)
}
return
break
}

case consts.type.OUT_CLOSE:
Expand All @@ -234,7 +234,7 @@ class Mplex extends EE {
if (chan) {
chan.close()
}
return
break
}

case consts.type.OUT_RESET:
Expand All @@ -244,7 +244,7 @@ class Mplex extends EE {
if (chan) {
chan.reset()
}
return
break
}

default:
Expand Down
2 changes: 1 addition & 1 deletion test/plex.spec.js
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ describe('plex', () => {
})

pull(
pull.values([Array(1048576 + 1).join('\xff')]), // 1mb
pull.values([Array(1048576 + 2).join('\xff')]), // 1mb
plex
)
})
Expand Down

0 comments on commit f22379f

Please sign in to comment.