From 3ded576d3f33182aa2726297605aa613fb94fac8 Mon Sep 17 00:00:00 2001 From: Alan Shaw Date: Tue, 11 Dec 2018 23:01:42 +0000 Subject: [PATCH] fix: streaming cat over http api (#1760) `/api/v0/cat` calls `ipfs.cat`, but `ipfs.cat` returns a buffer of file output. This changes `/api/v0/cat` to actually stream output by calling `ipfs.catPullStream` instead. It uses `pull-pushable` in order to catch an initial error in the stream before it starts flowing and instead return a plain JSON response with an appropriate HTTP status. This isn't ideal as it means there's no backpressure - if the consumer doesn't consume fast enough then data will start to get buffered into memory. However this is significantly better than buffering _everything_ into memory before replying. License: MIT Signed-off-by: Alan Shaw --- src/core/components/files-regular.js | 21 ++++++---- src/http/api/resources/files-regular.js | 55 ++++++++++++++++--------- 2 files changed, 49 insertions(+), 27 deletions(-) diff --git a/src/core/components/files-regular.js b/src/core/components/files-regular.js index 01ba7b30a7..ed41c1eaaa 100644 --- a/src/core/components/files-regular.js +++ b/src/core/components/files-regular.js @@ -232,21 +232,28 @@ module.exports = function (self) { pull( exporter(ipfsPath, self._ipld, options), + pull.filter(filterFile), + pull.take(1), pull.collect((err, files) => { - if (err) { return d.abort(err) } - if (files && files.length > 1) { - files = files.filter(filterFile) + if (err) { + return d.abort(err) } - if (!files || !files.length) { + + if (!files.length) { return d.abort(new Error('No such file')) } const file = files[0] - const content = file.content - if (!content && file.type === 'dir') { + + if (!file.content && file.type === 'dir') { return d.abort(new Error('this dag node is a directory')) } - d.resolve(content) + + if (!file.content) { + return d.abort(new Error('this dag node has no content')) + } + + d.resolve(file.content) }) ) diff --git a/src/http/api/resources/files-regular.js b/src/http/api/resources/files-regular.js index 1a02fab3d1..c8fae5112f 100644 --- a/src/http/api/resources/files-regular.js +++ b/src/http/api/resources/files-regular.js @@ -14,6 +14,7 @@ const toStream = require('pull-stream-to-stream') const abortable = require('pull-abortable') const Joi = require('joi') const ndjson = require('pull-ndjson') +const { PassThrough } = require('readable-stream') exports = module.exports @@ -79,27 +80,41 @@ exports.cat = { const options = request.pre.args.options const ipfs = request.server.app.ipfs - ipfs.cat(key, options, (err, stream) => { - if (err) { - log.error(err) - if (err.message === 'No such file') { - reply({ Message: 'No such file', Code: 0, Type: 'error' }).code(500) - } else { - reply({ Message: 'Failed to cat file: ' + err, Code: 0, Type: 'error' }).code(500) - } - return - } + let pusher + let started = false - // hapi is not very clever and throws if no - // - _read method - // - _readableState object - // are there :( - if (!stream._read) { - stream._read = () => {} - stream._readableState = {} - } - return reply(stream).header('X-Stream-Output', '1') - }) + pull( + ipfs.catPullStream(key, options), + pull.drain( + chunk => { + if (!started) { + started = true + pusher = pushable() + reply(toStream.source(pusher).pipe(new PassThrough())) + .header('X-Stream-Output', '1') + } + pusher.push(chunk) + }, + err => { + if (err) { + log.error(err) + + // We already started flowing, abort the stream + if (started) { + return pusher.end(err) + } + + const msg = err.message === 'No such file' + ? err.message + : 'Failed to cat file: ' + err + + return reply({ Message: msg, Code: 0, Type: 'error' }).code(500) + } + + pusher.end() + } + ) + ) } }