diff --git a/package.json b/package.json index 5b38b6d..c702963 100644 --- a/package.json +++ b/package.json @@ -36,7 +36,6 @@ "url": "git+https://github.com/libp2p/pull-mplex.git" }, "dependencies": { - "bl": "^1.2.2", "buffer-reuse-pool": "^1.0.0", "chai": "^4.1.2", "debug": "^3.1.0", diff --git a/src/channel.js b/src/channel.js index 00fae5d..264f915 100644 --- a/src/channel.js +++ b/src/channel.js @@ -38,22 +38,22 @@ class Channel extends EE { ? consts.type.OUT_RESET : consts.type.IN_RESET - this._log = (name, data) => { - log({ - op: name, - name: this._name, - id: this._id, - endedLocal: this._endedLocal, - endedRemote: this._endedRemote, - initiator: this._initiator, - data: (data && data.toString()) || '' - }) - } - - this._log('new channel', this._name) + // this._log = (name, data) => { + // log({ + // op: name, + // name: this._name, + // id: this._id, + // endedLocal: this._endedLocal, + // endedRemote: this._endedRemote, + // initiator: this._initiator, + // data: (data && data.toString()) || '' + // }) + // } + + // this._log('new channel', this._name) this._msgs = pushable((err) => { - this._log('source closed', err) + // this._log('source closed', err) if (err && typeof err !== 'boolean') { setImmediate(() => this.emit('error', err)) } @@ -64,7 +64,7 @@ class Channel extends EE { this.sink = (read) => { const next = (end, data) => { - this._log('sink', data) + // this._log('sink', data) // stream already ended if (this._endedLocal) { return } @@ -73,7 +73,7 @@ class Channel extends EE { // source ended, close the stream if (end === true) { - return this.endChan() + return this.endChan(null) } // source errored, reset stream @@ -118,13 +118,13 @@ class Channel extends EE { } push (data) { - this._log('push', data) + // this._log('push', data) this._msgs.push(data) } // close for reading close (err) { - this._log('close', err) + // this._log('close', err) if (!this._endedRemote) { this._endedRemote = err || true this._msgs.end(this._endedRemote) @@ -133,13 +133,13 @@ class Channel extends EE { } reset (err) { - this._log('reset', err) + // this._log('reset', err) this._reset = err || 'channel reset!' this.close(this._reset) } openChan () { - this._log('openChan') + // this._log('openChan') if (this.open) { return } // chan already open @@ -153,7 +153,7 @@ class Channel extends EE { } sendMsg (data) { - this._log('sendMsg', data) + // this._log('sendMsg', data) if (!this.open) { this.openChan() @@ -167,7 +167,7 @@ class Channel extends EE { } endChan () { - this._log('endChan') + // this._log('endChan') if (!this.open) { return @@ -180,7 +180,7 @@ class Channel extends EE { } resetChan () { - this._log('endChan') + // this._log('endChan') if (!this.open) { return diff --git a/src/coder.js b/src/coder.js index 72fd3b3..d7efb37 100644 --- a/src/coder.js +++ b/src/coder.js @@ -1,29 +1,29 @@ 'use strict' -const pull = require('pull-stream') const varint = require('varint') const through = require('pull-through') -const BufferList = require('bl') const debug = require('debug') const log = debug('pull-plex:coder') log.err = debug('pull-plex:coder:err') -let pool = Buffer.allocUnsafe(100 * 1024) -let used = 0 +const PULL_LENGTH = 10 * 1024 const empty = Buffer.alloc(0) exports.encode = () => { + let pool = Buffer.alloc(PULL_LENGTH) + let used = 0 + return through(function (msg) { const oldUsed = used varint.encode(msg[0] << 3 | msg[1], pool, used) used += varint.encode.bytes - varint.encode(varint.encode(msg[2] ? Buffer.byteLength(msg[2]) : 0), pool, used) + varint.encode(varint.encode(msg[2] ? msg[2].length : 0), pool, used) used += varint.encode.bytes this.queue(pool.slice(oldUsed, used)) // send header - if (pool.length - used < 100) { - pool = Buffer.allocUnsafe(10 * 1024) + if (PULL_LENGTH - used < 100) { + pool = Buffer.alloc(PULL_LENGTH) used = 0 } @@ -31,7 +31,7 @@ exports.encode = () => { }) } -let States = { +const States = { PARSING: 0, READING: 1 } @@ -42,14 +42,24 @@ exports.decode = () => { let length = 0 let buffer = null - const decode = (msg) => { + const tryDecode = (msg) => { + let offset = 0 + let length = 0 try { - let offset = 0 - let length = 0 - const h = varint.decode(msg) + let h = varint.decode(msg) offset += varint.decode.bytes length = varint.decode(msg, offset) offset += varint.decode.bytes + return [h, offset, length] + } catch (err) { + log.err(err) // ignore if data is empty + } + return [] + } + + const decode = (msg) => { + const [h, offset, length] = tryDecode(msg) + if (h !== void 0) { const message = { id: h >> 3, type: h & 7, @@ -58,10 +68,9 @@ exports.decode = () => { state = States.READING return [msg.slice(offset), message, length] - } catch (err) { - log.err(err) // ignore if data is empty - return [msg, undefined, undefined] } + + return [msg] } const read = (msg, data, length) => { @@ -80,7 +89,8 @@ exports.decode = () => { return [left, msg.slice(length - left), data] } - return through(function (msg) { + return through(function (msg_) { + let msg = msg_ while (msg && msg.length) { if (States.PARSING === state) { if (!buffer) { diff --git a/src/index.js b/src/index.js index 10cdc70..156458e 100644 --- a/src/index.js +++ b/src/index.js @@ -57,7 +57,7 @@ class Mplex extends EE { } this._chandata = pushable((err) => { - this._log('chandata ended') + // this._log('chandata ended') this._endedRemote = true this.close(err) }) @@ -74,7 +74,7 @@ class Mplex extends EE { const self = this this.sink = pull( through(function (data) { - if (Buffer.byteLength(data) > self._maxMsgSize) { + if (data && data.length >= self._maxMsgSize) { setImmediate(() => self.emit('error', new Error('message too large!'))) return this.queue(null) } @@ -82,15 +82,15 @@ class Mplex extends EE { }), coder.decode(), (read) => { - const next = (end, data) => { - if (this._endedLocal) { return } - if (end === true) { return this.close() } - if (end) { return this.reset(end) } - this._handle(data) + function next (end, data) { + if (self._endedLocal) { return } + if (end === true) { return self.close() } + if (end) { return self.reset(end) } + self._handle(data) return read(null, next) } - read(null, next) + return read(null, next) }) } @@ -99,7 +99,7 @@ class Mplex extends EE { } close (err) { - this._log('close', err) + // this._log('close', err) if (this.destroyed) { return } @@ -130,7 +130,7 @@ class Mplex extends EE { } push (data) { - this._log('push', data) + // this._log('push', data) if (data.data && Buffer.byteLength(data.data) > this._maxMsgSize) { this._chandata.end(new Error('message too large!')) @@ -208,13 +208,13 @@ class Mplex extends EE { } _handle (msg) { - this._log('_handle', msg) + // this._log('_handle', msg) const { id, type, data } = msg switch (type) { case consts.type.NEW: { const chan = this._newStream(id, false, true, data.toString(), this._inChannels) setImmediate(() => this.emit('stream', chan, id)) - return + break } case consts.type.OUT_MESSAGE: @@ -224,7 +224,7 @@ class Mplex extends EE { if (chan) { chan.push(data) } - return + break } case consts.type.OUT_CLOSE: @@ -234,7 +234,7 @@ class Mplex extends EE { if (chan) { chan.close() } - return + break } case consts.type.OUT_RESET: @@ -244,7 +244,7 @@ class Mplex extends EE { if (chan) { chan.reset() } - return + break } default: diff --git a/test/plex.spec.js b/test/plex.spec.js index 597f1a6..f62a187 100644 --- a/test/plex.spec.js +++ b/test/plex.spec.js @@ -135,7 +135,7 @@ describe('plex', () => { }) pull( - pull.values([Array(1048576 + 1).join('\xff')]), // 1mb + pull.values([Array(1048576 + 2).join('\xff')]), // 1mb plex ) })