Skip to content

Commit

Permalink
@gabrielf's patches to make it work on node 0.10
Browse files Browse the repository at this point in the history
  • Loading branch information
pkrumins committed Mar 23, 2013
1 parent 1ccf46c commit 7bf31b8
Showing 1 changed file with 82 additions and 57 deletions.
139 changes: 82 additions & 57 deletions lazy.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
var EventEmitter = require('events').EventEmitter;
var util = require('util');
var stream = require('stream');

Lazy.prototype = new EventEmitter;
module.exports = Lazy;
function Lazy (em, opts) {
function Lazy(em, opts) {
if (!(this instanceof Lazy)) return new Lazy(em, opts);
EventEmitter.call(this);
var self = this;
if (em) {
if (!em._events) em._events = {};
self._events = em._events;
}



self.once = function (name, f) {
self.on(name, function g () {
self.removeListener(name, g);
Expand Down Expand Up @@ -38,18 +36,46 @@ function Lazy (em, opts) {
self.emit(endName);
}

function newLazy (g, h) {
if (!g) g = function () { return true };
if (!h) h = function (x) { return x };
var lazy = new Lazy(null, opts);
self.on(dataName, function (x) {
if (g.call(lazy, x)) lazy.emit(dataName, h(x));
if (em && em.on) {
em.on(endName, function () {
self.emit(endName);
});
self.once(pipeName, function () {
lazy.emit(pipeName)
self.on(pipeName, function () {
em.emit(pipeName);
});
self.once(endName, function () {
lazy.emit(endName)
// Check for v0.10 or Greater (Stream2 has Duplex type)
if (stream.Duplex && em instanceof(stream)) {
em.on('readable', function () {
var x = em.read();
self.emit(dataName, x);
});
} else {
// Old Stream1 or Event support
em.on(dataName, function (x) {
self.emit(dataName, x);
});
}
}

function newLazy (g, h, l) {
if (!g) {
g = function () {
return true;
};
}
if (!h) {
h = function (x) {
return x;
};
}
var lazy = new Lazy(null, opts, l);
self.on(dataName, function (x, y) {
if (g.call(lazy, x)) {
lazy.emit(dataName, h(x), y);
}
});
self.once(pipeName, function () {
lazy.emit(pipeName);
});
return lazy;
}
Expand Down Expand Up @@ -148,68 +174,63 @@ function Lazy (em, opts) {
lazy.once(pipeName, function () { f(data) });
return self;
}

self.bucket = function (init, f) {
var lazy = new Lazy(null, opts);
var yield = function (x) {
var yieldTo = function (x) {
lazy.emit(dataName, x);
};

var acc = init;

self.on(dataName, function (x) {
acc = f.call(yield, acc, x);
acc = f.call(yieldTo, acc, x);
});

self.once(pipeName, function () {
lazy.emit(pipeName)
lazy.emit(pipeName);
});

// flush on end event
self.once(endName, function () {
var finalBuffer = mergeBuffers(acc);
if(finalBuffer) yield(finalBuffer);
lazy.emit(endName)
if (finalBuffer) {
yieldTo(finalBuffer);
}
});

return lazy;
}

// Streams that use this should emit strings or buffers only
self.__defineGetter__('lines', function () {
return self.bucket([], function (chunkArray, chunk) {
var newline = ['\r'.charCodeAt(0), '\n'.charCodeAt(0)], lastNewLineIndex = 0;
var newline = '\n'.charCodeAt(0), lastNewLineIndex = 0;
if (typeof chunk === 'string') chunk = new Buffer(chunk);

for (var i = 0; i < chunk.length; i++) {
// Match line separator characters
if (newline.indexOf(chunk[i]) !== -1) {
// If we have content from the current chunk to append to our buffers, do it.
if(i>0){
if(i === lastNewLineIndex){
lastNewLineIndex = i-1;
}
chunkArray.push(chunk.slice(lastNewLineIndex, i));
}

// Skip second separator byte on \r\n terminated lines ( yeah, stupid DOS / Windows, I know... )
if ((i + 1 < chunk.length) && (chunk[i] === newline[0]) && (chunk[i + 1] === newline[1])) {
i++;
}

// Wrap all our buffers and emit it.
this(mergeBuffers(chunkArray));
for (var i = 0; i < chunk.length; i++) {
if (chunk[i] === newline) {
// If we have content from the current chunk to append to our buffers, do it.
if (i > 0) {
chunkArray.push(chunk.slice(lastNewLineIndex, i));
}

lastNewLineIndex = i + 1;
}
// Wrap all our buffers and emit it.
this(mergeBuffers(chunkArray));
lastNewLineIndex = i + 1;
}
}

if(lastNewLineIndex>0) {
// New line found in the chunk, push the remaining part of the buffer.
if(lastNewLineIndex < chunk.length) chunkArray.push(chunk.slice(lastNewLineIndex));

if (lastNewLineIndex > 0) {
// New line found in the chunk, push the remaining part of the buffer.
if (lastNewLineIndex < chunk.length) {
chunkArray.push(chunk.slice(lastNewLineIndex));
}
} else {
// No new line found, push the whole buffer.
if(chunk.length) chunkArray.push(chunk);
// No new line found, push the whole buffer.
if (chunk.length) {
chunkArray.push(chunk);
}
}
return chunkArray;
});
Expand Down Expand Up @@ -321,3 +342,7 @@ var mergeBuffers = function mergeBuffers(buffers) {

return finalBuffer;
}


util.inherits(Lazy, EventEmitter);
module.exports = Lazy;

0 comments on commit 7bf31b8

Please sign in to comment.