Skip to content

Commit

Permalink
Checksum files as they are being streamed in. Speeds up load times si…
Browse files Browse the repository at this point in the history
…gnificantly.
  • Loading branch information
mlogan committed Sep 9, 2016
1 parent 0e944de commit ddb3b48
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 54 deletions.
80 changes: 63 additions & 17 deletions index.js
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ var urllib = require('url');
var debug = require('debug')('http-disk-cache');
var glob = require('glob');

var util = require('util');
var stream = require('stream');

/////////////// CacheEntry ///////////////

function canonicalUrl(url) {
Expand Down Expand Up @@ -178,6 +181,33 @@ CacheWriter.prototype.pipeFrom = function pipeFrom(readable) {
});
};

function ChecksumStream(expectedChecksum, options) {
if (!(this instanceof ChecksumStream)) {
return new ChecksumStream(expectedChecksum, options);
}
stream.Transform.call(this, options);
this.hash = crypto.createHash('md5');
this.expectedChecksum = expectedChecksum;
}
util.inherits(ChecksumStream, stream.Transform);

ChecksumStream.prototype._transform = function (chunk, enc, cb) {
var buffer = Buffer.isBuffer(chunk) ? chunk : new Buffer(chunk, enc);
this.hash.update(buffer); // update hash
this.push(chunk, enc);
cb();
};

ChecksumStream.prototype._flush = function (cb) {
console.log('flushing checksum stream');
var checksum = this.hash.digest('hex');
if (checksum != this.expectedChecksum) {
return cb(new Error('invalid checksum'));
}
cb();
};


/////////////// HTTPCache ///////////////

// HTTPCache handles HTTP requests, and caches them to disk if allowed by the Cache-Control
Expand Down Expand Up @@ -247,7 +277,7 @@ function deleteEntry(metaPath, cb) {
// 'notcached' - the cache entry is missing, invalid, or expired, but ready to be cached anew.
// 'error' - the cache entry is corrupted, and could not be deleted. This indicates that
// we shouldn't try to cache any responses right now.
HTTPCache.prototype._checkCache = function(cacheEntry, callback) {
HTTPCache.prototype._checkCache = function(cacheEntry, skipVerify, callback) {
var _this = this;
function loadMetadata(cb) {
debug('loading metadata from', cacheEntry.metaPath);
Expand Down Expand Up @@ -339,8 +369,12 @@ HTTPCache.prototype._checkCache = function(cacheEntry, callback) {
});
}

// We now have valid metadata for an un-expired cache entry. Next, we checksum the contents.
validateContents(metadata);
if (skipVerify) {
return callback(null, CACHE_STATE_CACHED, metadata);
} else {
// We now have valid metadata for an un-expired cache entry. Next, we checksum the contents.
validateContents(metadata);
}
});
};

Expand Down Expand Up @@ -404,20 +438,22 @@ HTTPCache.prototype.assertCached = function(url, onProgress, cb) {
options = { url: url };
}

options._skipReadStream = true;

var entry = new CacheEntry(url, options.etagFormat);

this._checkCache(entry, function(err, cacheStatus) {
if (cacheStatus === CACHE_STATE_CACHED) {
debug('assert cache hit', url);
return cb();
} else {
debug('assert cache miss', url);
_this.openReadStream(options, onProgress, function(err, _, path) {
cb(err);
});
this.openReadStream(options, onProgress, function (err, readStream, path) {
if (err != null) {
return cb(err);
}
if (readStream == null) { throw new Error("HAY"); }
readStream.on('error', function(err) {
readStream.removeAllListeners();
cb(err);
});
readStream.on('end', function() {
readStream.removeAllListeners();
cb();
});
readStream.resume();
});
};

Expand Down Expand Up @@ -464,14 +500,21 @@ HTTPCache.prototype.openReadStream = function(url, onProgress, cb) {
var cacheWriter = this._createCacheWriter(entry);

// Check if the entry is available in the cache.
this._checkCache(entry, function(err, cacheStatus) {
this._checkCache(entry, true, function(err, cacheStatus, metadata) {

debug("cache entry", entry.url, "status=", cacheStatus);
if (cacheStatus === CACHE_STATE_CACHED) {
// The cache contents are present and valid, so serve the request from cache.
cacheWriter.end();
var readStream = options._skipReadStream ? null : _this._createContentReadStream(entry);
return cb(null, readStream, _this._absPath(entry.contentPath));
var checksumStream = new ChecksumStream(metadata.contentMD5);
checksumStream.on('error', function (err) {
if (err === 'invalid checksum') {
deleteEntry(_this._absPath(entry.metaPath), function(err) {});
}
});
readStream.pipe(checksumStream);
return cb(null, checksumStream, _this._absPath(entry.contentPath));
} else if (cacheStatus == CACHE_STATE_ERROR) {
// Some kind of error occurred and we can't access the cache.
return cb("Error: There was a problem with the asset cache and we can't write files");
Expand Down Expand Up @@ -582,6 +625,8 @@ HTTPCache.prototype.getContents = function(url, cb) {
}
debug("getContents start", options.url);

options._skipVerify = true;

this.openReadStream(options, function(err, readStream, path) {
if (err) { return cb(err); }

Expand Down Expand Up @@ -754,7 +799,7 @@ HTTPCache.prototype.repair = function(cb) {
return;
}

_this._checkCache(entry, function (err, status) {
_this._checkCache(entry, false, function (err, status) {
if (err != null) {
deleteEntry(metaPath, deleteCb);
return;
Expand Down Expand Up @@ -861,3 +906,4 @@ HTTPCache.prototype.clean = function (shouldClean, cb) {
};

exports.HTTPCache = HTTPCache;
exports.ChecksumStream = ChecksumStream;
75 changes: 38 additions & 37 deletions test.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ var execSync = require('child_process').execSync;
var debug = require('debug')('http-disk-cache');
var async = require('artillery-async');
var glob = require('glob');
var stream = require('stream');

var httpcache = require('./index');

Expand All @@ -36,16 +37,19 @@ function newUrlReply(contents, status, headers, defer) {

function catStream(stream, cb) {
chunks = [];
stream.on('error', function (err) {
cb(err);
});
stream.on('data', function (chunk) {
chunks.push(chunk);
});
stream.on('end', function () {
if (chunks.length === 0) {
cb(null);
cb(null, null);
} else if (typeof chunks[0] === 'string') {
cb(chunks.join(''));
cb(null, chunks.join(''));
} else { // Buffer
cb(Buffer.concat(chunks));
cb(null, Buffer.concat(chunks));
}
});
}
Expand Down Expand Up @@ -166,7 +170,7 @@ exports.tests = {
var _this = this;
this.cache.openReadStream(this.createUrl('/url1'), function(err, stream, path) {
test.ok(stream instanceof fs.ReadStream, "stream should be an fs.ReadStream");
catStream(stream, function (contents) {
catStream(stream, function (err, contents) {
test.equal(contents.toString('utf8'), 'url1 contents');
test.done();
});
Expand All @@ -182,7 +186,7 @@ exports.tests = {
test.equal(_this.requests.length, 1);
test.equal(_this.requests[0], '/url5');
test.ok(stream instanceof fs.ReadStream, "stream should be an fs.ReadStream");
catStream(stream, function (contents) {
catStream(stream, function (err, contents) {
test.equal(contents.toString('utf8'), 'url5 contents');
test.done();
});
Expand All @@ -192,7 +196,7 @@ exports.tests = {
_this.cache.openReadStream({ url: _this.createUrl('/url5'), etagFormat: 'md5' }, function(err, stream, path) {
test.equal(_this.requests.length, 1); // request is handled from cache.
test.ok(stream instanceof fs.ReadStream, "stream should be an fs.ReadStream");
catStream(stream, function (contents) {
catStream(stream, function (err, contents) {
test.equal(contents.toString('utf8'), 'url5 contents');
test.done();
});
Expand All @@ -213,7 +217,7 @@ exports.tests = {
test.equal(_this.requests.length, 1);
test.equal(_this.requests[0], '/url7');
test.ok(stream instanceof fs.ReadStream, "stream should be an fs.ReadStream");
catStream(stream, function (contents) {
catStream(stream, function (err, contents) {
test.equal(contents.toString('utf8'), 'url7 contents');
test.done();
});
Expand All @@ -223,7 +227,7 @@ exports.tests = {
_this.cache.openReadStream({ url: _this.createUrl('/url7'), etagFormat: 'md5' }, function(err, stream, path) {
test.equal(_this.requests.length, 1); // request is handled from cache.
test.ok(stream instanceof fs.ReadStream, "stream should be an fs.ReadStream");
catStream(stream, function (contents) {
catStream(stream, function (err, contents) {
test.equal(contents.toString('utf8'), 'url7 contents');
test.done();
});
Expand Down Expand Up @@ -256,7 +260,7 @@ exports.tests = {
_this.cache.openReadStream({ url: _this.createUrl('/url1'), etagFormat: 'md5' }, function(err, stream, path) {
test.equal(_this.requests.length, 1);
test.equal(_this.requests[0], '/url1');
catStream(stream, function (contents) {
catStream(stream, function (err, contents) {
test.equal(contents.toString('utf8'), 'url1 contents');
cb();
});
Expand All @@ -268,7 +272,7 @@ exports.tests = {
_this.cache.openReadStream({ url: _this.createUrl('/url1'), etagFormat: 'md5' }, function(err, stream, path) {
test.equal(_this.requests.length, 2);
test.equal(_this.requests[1], '/url1');
catStream(stream, function (contents) {
catStream(stream, function (err, contents) {
test.equal(contents.toString('utf8'), 'url1 contents');
cb();
});
Expand Down Expand Up @@ -358,7 +362,7 @@ exports.tests = {
},

testConcurrentRequests: function(test) {
test.expect(4);
test.expect(2);
var _this = this;
var count = 2;

Expand All @@ -367,8 +371,7 @@ exports.tests = {
if (count === 0) { test.done(); }
};
var cb = function(err, stream, path) {
test.ok(stream instanceof fs.ReadStream, "stream should be an fs.ReadStream");
catStream(stream, function (contents) {
catStream(stream, function (err, contents) {
test.equal(contents.toString('utf8'), 'url1 contents');
barrier();
});
Expand All @@ -379,34 +382,34 @@ exports.tests = {


testBasicCaching: function(test) {
test.expect(8);
test.expect(6);
doTest(this, test, '/url1', 'url1 contents', false, true, 0, test.done);
},

testExplicitNoCache: function(test) {
test.expect(8);
test.expect(6);
doTest(this, test, '/url2', 'url2 contents', false, false, 0, test.done);
},

testUnparseableCacheControl: function(test) {
test.expect(8);
test.expect(6);
doTest(this, test, '/url4', 'url4 contents', false, false, 0, test.done);
},

testNoCache: function(test) {
// URLs without a Cache-Control header don't get cached.
test.expect(8);
test.expect(6);
doTest(this, test, '/url3', 'url3 contents', false, false, 0, test.done);
},

testUnexpiredCache: function(test) {
test.expect(8);
test.expect(6);
// 200 is the maximum allowable age.
doTest(this, test, '/url1', 'url1 contents', false, true, 200, test.done);
},

testExpiredCache: function(test) {
test.expect(8);
test.expect(6);
doTest(this, test, '/url1', 'url1 contents', false, false, 201, test.done);
},

Expand Down Expand Up @@ -691,29 +694,27 @@ exports.tests = {
function doTest(_this, test, url, contents, firstCached, secondCached, deltaT, cb) {
var count = 0;
if (!deltaT) { deltaT = 0; }
_this.cache.openReadStream(_this.createUrl(url), function(err, stream, path) {

_this.cache.getContents(_this.createUrl(url), function(err, buffer, path) {
if (!firstCached) { count++; }
if (!stream) {
test.ok(err, "if stream is null there had better be an error");
if (!buffer) {
test.ok(err, "if buffer is null there had better be an error");
return cb();
}
test.ok(stream instanceof fs.ReadStream, "stream should be an fs.ReadStream");
test.ok(fs.existsSync(path));
catStream(stream, function (contents) {
test.equal(contents.toString('utf8'), contents);

test.equal(buffer.toString('utf8'), contents);
test.equal(_this.serverUrls[url].fetchCount, count);
_this.nowSeconds += deltaT;
_this.cache.reset();

_this.cache.getContents(_this.createUrl(url), function(err, buffer, path) {
if (!secondCached) { count++; }

test.ok(fs.existsSync(path));
test.equal(_this.serverUrls[url].fetchCount, count);
_this.nowSeconds += deltaT;
_this.cache.reset();
_this.cache.openReadStream(_this.createUrl(url), function(err, stream, path) {
if (!secondCached) { count++; }
test.ok(stream instanceof fs.ReadStream, "stream should be an fs.ReadStream");
test.ok(fs.existsSync(path));
test.equal(_this.serverUrls[url].fetchCount, count);
catStream(stream, function (contents) {
test.equal(contents.toString('utf8'), contents);
cb();
});
});
test.equal(buffer.toString('utf8'), contents);
cb();
});
});

Expand Down

0 comments on commit ddb3b48

Please sign in to comment.