diff --git a/lazy.js b/lazy.js index 9ff3520..fbdf2a4 100644 --- a/lazy.js +++ b/lazy.js @@ -1,15 +1,13 @@ var EventEmitter = require('events').EventEmitter; +var util = require('util'); +var stream = require('stream'); -Lazy.prototype = new EventEmitter; -module.exports = Lazy; -function Lazy (em, opts) { +function Lazy(em, opts) { if (!(this instanceof Lazy)) return new Lazy(em, opts); + EventEmitter.call(this); var self = this; - if (em) { - if (!em._events) em._events = {}; - self._events = em._events; - } - + + self.once = function (name, f) { self.on(name, function g () { self.removeListener(name, g); @@ -38,18 +36,46 @@ function Lazy (em, opts) { self.emit(endName); } - function newLazy (g, h) { - if (!g) g = function () { return true }; - if (!h) h = function (x) { return x }; - var lazy = new Lazy(null, opts); - self.on(dataName, function (x) { - if (g.call(lazy, x)) lazy.emit(dataName, h(x)); + if (em && em.on) { + em.on(endName, function () { + self.emit(endName); }); - self.once(pipeName, function () { - lazy.emit(pipeName) + self.on(pipeName, function () { + em.emit(pipeName); }); - self.once(endName, function () { - lazy.emit(endName) + // Check for v0.10 or Greater (Stream2 has Duplex type) + if (stream.Duplex && em instanceof(stream)) { + em.on('readable', function () { + var x = em.read(); + self.emit(dataName, x); + }); + } else { + // Old Stream1 or Event support + em.on(dataName, function (x) { + self.emit(dataName, x); + }); + } + } + + function newLazy (g, h, l) { + if (!g) { + g = function () { + return true; + }; + } + if (!h) { + h = function (x) { + return x; + }; + } + var lazy = new Lazy(null, opts, l); + self.on(dataName, function (x, y) { + if (g.call(lazy, x)) { + lazy.emit(dataName, h(x), y); + } + }); + self.once(pipeName, function () { + lazy.emit(pipeName); }); return lazy; } @@ -148,68 +174,63 @@ function Lazy (em, opts) { lazy.once(pipeName, function () { f(data) }); return self; } - + self.bucket = function (init, f) { var lazy = new Lazy(null, opts); - var yield = function (x) { + var yieldTo = function (x) { lazy.emit(dataName, x); }; - + var acc = init; - + self.on(dataName, function (x) { - acc = f.call(yield, acc, x); + acc = f.call(yieldTo, acc, x); }); - + self.once(pipeName, function () { - lazy.emit(pipeName) + lazy.emit(pipeName); }); - + // flush on end event self.once(endName, function () { var finalBuffer = mergeBuffers(acc); - if(finalBuffer) yield(finalBuffer); - lazy.emit(endName) + if (finalBuffer) { + yieldTo(finalBuffer); + } }); - + return lazy; } - + // Streams that use this should emit strings or buffers only self.__defineGetter__('lines', function () { return self.bucket([], function (chunkArray, chunk) { - var newline = ['\r'.charCodeAt(0), '\n'.charCodeAt(0)], lastNewLineIndex = 0; + var newline = '\n'.charCodeAt(0), lastNewLineIndex = 0; if (typeof chunk === 'string') chunk = new Buffer(chunk); - - for (var i = 0; i < chunk.length; i++) { - // Match line separator characters - if (newline.indexOf(chunk[i]) !== -1) { - // If we have content from the current chunk to append to our buffers, do it. - if(i>0){ - if(i === lastNewLineIndex){ - lastNewLineIndex = i-1; - } - chunkArray.push(chunk.slice(lastNewLineIndex, i)); - } - - // Skip second separator byte on \r\n terminated lines ( yeah, stupid DOS / Windows, I know... ) - if ((i + 1 < chunk.length) && (chunk[i] === newline[0]) && (chunk[i + 1] === newline[1])) { - i++; - } - // Wrap all our buffers and emit it. - this(mergeBuffers(chunkArray)); + for (var i = 0; i < chunk.length; i++) { + if (chunk[i] === newline) { + // If we have content from the current chunk to append to our buffers, do it. + if (i > 0) { + chunkArray.push(chunk.slice(lastNewLineIndex, i)); + } - lastNewLineIndex = i + 1; - } + // Wrap all our buffers and emit it. + this(mergeBuffers(chunkArray)); + lastNewLineIndex = i + 1; + } } - - if(lastNewLineIndex>0) { - // New line found in the chunk, push the remaining part of the buffer. - if(lastNewLineIndex < chunk.length) chunkArray.push(chunk.slice(lastNewLineIndex)); + + if (lastNewLineIndex > 0) { + // New line found in the chunk, push the remaining part of the buffer. + if (lastNewLineIndex < chunk.length) { + chunkArray.push(chunk.slice(lastNewLineIndex)); + } } else { - // No new line found, push the whole buffer. - if(chunk.length) chunkArray.push(chunk); + // No new line found, push the whole buffer. + if (chunk.length) { + chunkArray.push(chunk); + } } return chunkArray; }); @@ -321,3 +342,7 @@ var mergeBuffers = function mergeBuffers(buffers) { return finalBuffer; } + + +util.inherits(Lazy, EventEmitter); +module.exports = Lazy;