Skip to content
This repository has been archived by the owner on Jun 24, 2024. It is now read-only.

Commit

Permalink
feat(reader): Add implementation with tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Walker authored and digitalsadhu committed Oct 20, 2017
1 parent aba27e7 commit 088bd18
Show file tree
Hide file tree
Showing 13 changed files with 786 additions and 137 deletions.
1 change: 1 addition & 0 deletions .eslintignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
coverage
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ tmp/**/*
.idea/**/*
*.iml
*.log
coverage
1 change: 0 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ language: node_js

node_js:
- "8"
- "6"

script:
- npm test
Expand Down
75 changes: 75 additions & 0 deletions lib/reader.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
const { Readable, PassThrough, Transform } = require('stream');
const { stringify, parse } = require('JSONStream');
const mergeStream = require('merge-stream');
const { dedupe, sort, setOrder } = require('./util');
const assert = require('assert');

module.exports = class Reader extends Readable {
constructor(streams) {
super();

assert(
streams,
`Expected first argument to new Reader() to be a stream or array of streams.
Instead got ${typeof stream}`
);

if (!Array.isArray(streams)) {
streams = [streams];
}

assert(
streams.length,
`Expected at least one stream to be provided to new Reader(). Got none.`
);

const merged = mergeStream();

let count = 0;
streams.forEach((readStream, index) => {
const tmpStream = new PassThrough({ objectMode: true });
merged.add(tmpStream);

readStream.on('file found', file => {
this.emit('file found', file);
readStream
.pipe(parse('*'))
.on('error', err => {
this.emit('error', err);
})
.pipe(setOrder(index))
.pipe(tmpStream);

count++;
if (count === streams.length) {
this.emit('pipeline ready');
}
});

readStream.on('file not found', file => {
this.emit('file not found', file);
tmpStream.end();

count++;
if (count === streams.length) {
this.emit('pipeline ready');
}
});
});

this.data = merged.pipe(dedupe()).pipe(sort());
this.data.pause();

this.data.on('data', chunk => {
this.push(chunk.content);
});

this.data.on('end', () => {
this.push(null);
});
}

_read(size) {
return this.data.resume();
}
};
85 changes: 85 additions & 0 deletions lib/util.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
const { Transform } = require('stream');

function compareByOrder(a, b) {
if (a.order === b.order) return 0;
return a.order > b.order ? 1 : -1;
}

class Dedupe extends Transform {
constructor() {
super({
objectMode: true
});

this.rows = new Map();
}

_transform(chunk, enc, callback) {
if (chunk && chunk.id) {
this.rows.set(chunk.id, chunk);
}

callback();
}

_flush(callback) {
Array.from(this.rows.values()).forEach(row => this.push(row));
callback();
}
}

class SetOrder extends Transform {
constructor(index) {
super({
objectMode: true
});

this.index = index;
}

_transform(chunk, enc, callback) {
chunk.order = this.index;
callback(null, chunk);
}
}

class Sort extends Transform {
constructor() {
super({
objectMode: true
});

this.rows = new Map();
}

_transform(chunk, enc, callback) {
if (
chunk &&
chunk.order !== null &&
typeof chunk.order !== 'undefined'
) {
this.rows.set(chunk.order, chunk);
}
callback();
}

_flush(callback) {
Array.from(this.rows.values())
.sort(compareByOrder)
.forEach(row => this.push(row));
callback();
}
}

module.exports = {
dedupe() {
return new Dedupe();
},
setOrder(index) {
return new SetOrder(index);
},
sort() {
return new Sort();
},
compareByOrder
};
Loading

0 comments on commit 088bd18

Please sign in to comment.