diff --git a/package.json b/package.json index 02b62a5..fb6636e 100644 --- a/package.json +++ b/package.json @@ -38,28 +38,30 @@ "aegir": "^13.0.6", "chai": "^4.1.2", "dirty-chai": "^2.0.1", - "ipfs": "^0.28.2", + "ipfs": "~0.28.2", "pre-commit": "^1.2.2", "safe-buffer": "^5.1.1", - "tmp": "0.0.33" + "sign-commit": "~0.1.0", + "tmp": "~0.0.33" }, "dependencies": { "async": "^2.6.0", - "blob": "0.0.4", + "blob": "~0.0.4", "bs58": "^4.0.1", "cids": "~0.5.3", "debug": "^3.1.0", "detect-node": "^2.0.3", - "file-api": "^0.10.4", + "file-api": "~0.10.4", "filereader-stream": "^1.0.0", - "interface-datastore": "^0.4.2", - "ipfs-unixfs": "^0.1.14", - "ipfs-unixfs-engine": "^0.29.0", - "is-pull-stream": "0.0.0", + "interface-datastore": "~0.4.2", + "ipfs-unixfs": "~0.1.14", + "ipfs-unixfs-engine": "~0.29.0", + "is-pull-stream": "~0.0.0", "is-stream": "^1.1.0", "promisify-es6": "^1.0.3", "pull-cat": "^1.1.11", "pull-paramap": "^1.2.2", + "pull-pushable": "^2.2.0", "pull-stream": "^3.6.7", "pull-traverse": "^1.0.3", "stream-to-pull-stream": "^1.7.2" diff --git a/src/core/stat.js b/src/core/stat.js index a1fc5bf..ed6c3f9 100644 --- a/src/core/stat.js +++ b/src/core/stat.js @@ -45,19 +45,9 @@ module.exports = function mfsStat (ipfs) { const meta = unmarshal(node.data) - let size = 0 - - if (meta.data && meta.data.length) { - size = meta.data.length - } - - if (meta.blockSizes && meta.blockSizes.length) { - size = meta.blockSizes.reduce((acc, curr) => acc + curr, 0) - } - done(null, { hash: node.multihash, - size: size, + size: meta.fileSize(), cumulativeSize: node.size, childBlocks: meta.blockSizes.length, type: meta.type diff --git a/src/core/utils/add-link.js b/src/core/utils/add-link.js index c2af10f..2811e2a 100644 --- a/src/core/utils/add-link.js +++ b/src/core/utils/add-link.js @@ -12,7 +12,7 @@ const addLink = (ipfs, options, callback) => { options = Object.assign({}, { parent: undefined, child: undefined, - name: undefined, + name: '', flush: true }, options) @@ -24,14 +24,14 @@ const addLink = (ipfs, options, callback) => { return callback(new Error('No child passed to addLink')) } - if (!options.name) { - return callback(new Error('No name passed to addLink')) - } - waterfall([ (done) => { - // Remove the old link if necessary - DAGNode.rmLink(options.parent, options.name, done) + if (options.name) { + // Remove the old link if necessary + return DAGNode.rmLink(options.parent, options.name, done) + } + + done(null, options.parent) }, (parent, done) => { // Add the new link to the parent diff --git a/src/core/utils/constants.js b/src/core/utils/constants.js index c82b1f7..2dd40e7 100644 --- a/src/core/utils/constants.js +++ b/src/core/utils/constants.js @@ -4,5 +4,7 @@ const Key = require('interface-datastore').Key module.exports = { FILE_SEPARATOR: '/', - MFS_ROOT_KEY: new Key('/local/filesroot') + MFS_ROOT_KEY: new Key('/local/filesroot'), + MAX_CHUNK_SIZE: 262144, + MAX_LINKS: 174 } diff --git a/src/core/utils/create-node.js b/src/core/utils/create-node.js new file mode 100644 index 0000000..a8d7521 --- /dev/null +++ b/src/core/utils/create-node.js @@ -0,0 +1,22 @@ +'use strict' + +const waterfall = require('async/waterfall') +const { + DAGNode +} = require('ipld-dag-pb') + +const createNode = (ipfs, data, links, options, callback) => { + waterfall([ + // Create a DAGNode with the new data + (cb) => DAGNode.create(data, links, cb), + (newNode, cb) => { + // Persist it + ipfs.dag.put(newNode, { + format: options.format, + hashAlg: options.hashAlg + }, (error) => cb(error, newNode)) + } + ], callback) +} + +module.exports = createNode diff --git a/src/core/utils/end-pull-stream.js b/src/core/utils/end-pull-stream.js new file mode 100644 index 0000000..1e10d41 --- /dev/null +++ b/src/core/utils/end-pull-stream.js @@ -0,0 +1,9 @@ +'use strict' + +const endPullStream = (callback) => { + // Ugh. https://github.com/standard/standard/issues/623 + const foo = true + return callback(foo) +} + +module.exports = endPullStream diff --git a/src/core/utils/index.js b/src/core/utils/index.js index 7faa2c8..33e064d 100644 --- a/src/core/utils/index.js +++ b/src/core/utils/index.js @@ -3,12 +3,18 @@ const constants = require('./constants') module.exports = { + endPullStream: require('./end-pull-stream'), validatePath: require('./validate-path'), withMfsRoot: require('./with-mfs-root'), updateMfsRoot: require('./update-mfs-root'), traverseTo: require('./traverse-to'), addLink: require('./add-link'), updateTree: require('./update-tree'), + createNode: require('./create-node'), limitStreamBytes: require('./limit-stream-bytes'), - FILE_SEPARATOR: constants.FILE_SEPARATOR + loadNode: require('./load-node'), + zeros: require('./zeros'), + FILE_SEPARATOR: constants.FILE_SEPARATOR, + MAX_CHUNK_SIZE: constants.MAX_CHUNK_SIZE, + MAX_LINKS: constants.MAX_LINKS } diff --git a/src/core/utils/limit-stream-bytes.js b/src/core/utils/limit-stream-bytes.js index 1365336..54cf8be 100644 --- a/src/core/utils/limit-stream-bytes.js +++ b/src/core/utils/limit-stream-bytes.js @@ -1,15 +1,14 @@ 'use strict' const asyncMap = require('pull-stream/throughs/async-map') +const endPullStream = require('./end-pull-stream') const limitStreamBytes = (limit) => { let bytesRead = 0 return asyncMap((buffer, cb) => { if (bytesRead > limit) { - // Ugh. https://github.com/standard/standard/issues/623 - const foo = true - return cb(foo) + endPullStream(cb) } // If we only need to return part of this buffer, slice it to make it smaller diff --git a/src/core/utils/load-node.js b/src/core/utils/load-node.js new file mode 100644 index 0000000..17af041 --- /dev/null +++ b/src/core/utils/load-node.js @@ -0,0 +1,25 @@ +'use strict' + +const waterfall = require('async/waterfall') +const CID = require('cids') +const log = require('debug')('mfs:utils:load-node') +const bs58 = require('bs58') + +const loadNode = (ipfs, cid, callback) => { + const multihash = cid && (cid.multihash || cid.hash) + + if (!multihash) { + log(`No multihash passed so cannot load DAGNode`) + + return callback() + } + + log(`Loading DAGNode for child ${bs58.encode(multihash)}`) + + waterfall([ + (cb) => ipfs.dag.get(new CID(multihash), cb), + (result, cb) => cb(null, result.value) + ], callback) +} + +module.exports = loadNode diff --git a/src/core/utils/traverse-to.js b/src/core/utils/traverse-to.js index c89db7e..9fc991f 100644 --- a/src/core/utils/traverse-to.js +++ b/src/core/utils/traverse-to.js @@ -48,7 +48,7 @@ const traverseTo = (ipfs, path, options, callback) => { node: rootNode, parent: null }, (parent, {pathSegment, index}, done) => { - log(`Looking for ${pathSegment} in ${parent.name} ${bs58.encode(parent.node.multihash)}s`) + log(`Looking for ${pathSegment} in ${parent.name} ${bs58.encode(parent.node.multihash)}`) parent.node.links.forEach(link => { log(`${bs58.encode(link.multihash)} ${link.name}`) diff --git a/src/core/utils/zeros.js b/src/core/utils/zeros.js new file mode 100644 index 0000000..6a27aeb --- /dev/null +++ b/src/core/utils/zeros.js @@ -0,0 +1,31 @@ +'use strict' + +// A pull stream source that will emit buffers full of zeros up to the specified length +const zeros = (max = Infinity, increment = 4096) => { + let i = 0 + + return (end, cb) => { + if (end) { + return cb && cb(end) + } + + if (i >= max) { + // Ugh. https://github.com/standard/standard/issues/623 + const foo = true + return cb(foo) + } + + let nextLength = increment + + if ((i + nextLength) > max) { + // final chunk doesn't divide neatly into increment + nextLength = max - i + } + + i += nextLength + + cb(null, Buffer.alloc(nextLength, 0)) + } +} + +module.exports = zeros diff --git a/src/core/write/index.js b/src/core/write/index.js index 1bc9ba2..9c63569 100644 --- a/src/core/write/index.js +++ b/src/core/write/index.js @@ -199,6 +199,6 @@ module.exports = function mfsWrite (ipfs) { (newRoot, next) => updateMfsRoot(ipfs, newRoot.node.multihash, next) ], done) } - ], (error, result) => callback(error, result)) + ], (error) => callback(error)) }) } diff --git a/src/core/write/update-node.js b/src/core/write/update-node.js index e5a7a8b..dd3dab8 100644 --- a/src/core/write/update-node.js +++ b/src/core/write/update-node.js @@ -5,8 +5,11 @@ const { unmarshal } = UnixFs const pull = require('pull-stream/pull') +const cat = require('pull-cat') const values = require('pull-stream/sources/values') const collect = require('pull-stream/sinks/collect') +const pushable = require('pull-pushable') +const map = require('pull-stream/throughs/map') const asyncMap = require('pull-stream/throughs/async-map') const filter = require('pull-stream/throughs/filter') const paramap = require('pull-paramap') @@ -14,7 +17,7 @@ const { leafFirst } = require('pull-traverse') const waterfall = require('async/waterfall') -const CID = require('cids') +const parallel = require('async/parallel') const findFileSize = require('./find-file-size') const { DAGNode, @@ -23,8 +26,14 @@ const { const log = require('debug')('mfs:write:update-node') const bs58 = require('bs58') const { - limitStreamBytes + limitStreamBytes, + addLink, + createNode, + zeros, + loadNode, + MAX_CHUNK_SIZE } = require('../utils') +const importer = require('ipfs-unixfs-engine').importer const updateNode = (ipfs, cidToUpdate, source, options, callback) => { let offset = options.offset || 0 @@ -35,118 +44,233 @@ const updateNode = (ipfs, cidToUpdate, source, options, callback) => { // Where we want to stop writing in the stream const streamEnd = offset + options.length - // Where we currently are in the stream - let streamPosition = 0 + // Where we currently are in the file + let destinationStreamPosition = streamStart waterfall([ - (done) => ipfs.dag.get(cidToUpdate, done), - (result, done) => { - const node = result.value + (done) => loadNode(ipfs, cidToUpdate, done), + (node, done) => { const fileSize = findFileSize(node) log(`Updating bytes ${streamStart}-${streamEnd} of ${fileSize} bytes from ${cidToUpdate.toBaseEncodedString()} with source`) - if (streamEnd > fileSize) { - // We are going to expand the file. this can lead to the DAG structure - // changing so reimport the whole file instead - // - // Create a stream from the existing node, then switch to the - // passed buffer when required - log(`New bytes would expand the file and potentially reorder the DAG, reimporting instead`) - - return done(new Error('Expanding files is not implemented yet')) - } else { - // We are going to replace bytes in existing leaf nodes and rewrite - // their CIDs and the CIDs of the parent nodes - log(`Updating one or more more leaf nodes`) - - pull( - source, - limitStreamBytes(options.length), - asyncMap((buffer, done) => { - // Find the DAGNodes that contain the data at the specified offset/length - // Merge the data and create new DAGNodes with the merged data - // Keep a record of the new CIDs and update the tree - pull( - leafFirst({ - parent: null, - link: null, - index: null, - node, - nodeStart: streamPosition, - nodeEnd: fileSize - }, findDAGNodesWithRequestedData), - paramap(updateNodeData(buffer)), - filter(Boolean), - asyncMap((link, next) => { - if (!link.parent || link.index === undefined) { - return next(null, link) - } + // We are going to replace bytes in existing leaf nodes and rewrite + // their CIDs and the CIDs of the parent nodes - // Create a new list of links - const links = link.parent.node.links.map((existingLink, index) => { - if (index === link.index) { - return new DAGLink('', link.size, link.multihash) - } - - return new DAGLink('', existingLink.size, existingLink.multihash) - }) - - // Update node's parent - waterfall([ - // Create a DAGNode with the new data - (cb) => DAGNode.create(link.parent.node.data, links, cb), - (newNode, cb) => { - log(`Persisting new parent DAGNode ${bs58.encode(newNode.multihash)} with links:`) - - // Persist it - ipfs.dag.put(newNode, { - format: options.format, - hashAlg: options.hashAlg - }, (error, cid) => { - log(`New parent CID ${cid.toBaseEncodedString()}`) - - link.parent.node = newNode - - cb(error, newNode) - }) - }, - (newNode, cb) => { - link.parent.node = newNode - - cb(null, link) - } - ], next) - }), - collect((error, results) => { - let updatedRoot - - if (!error) { - updatedRoot = results[0] - - while (updatedRoot.parent) { - updatedRoot = updatedRoot.parent - } - - if (updatedRoot.node) { - updatedRoot = updatedRoot.node - } - } + // if we start writing past the end of the file we'll need to pad it with zeros, + // remember how many zeros we need to let us still write the correct number of + // bytes when --count has been specified + let paddingBytesLength = 0 - offset += buffer.length + if (streamStart > fileSize) { + // we will start writing past the end of the file, pad the gap with empty bytes + paddingBytesLength = streamStart - fileSize - log(`Updated root is ${bs58.encode(updatedRoot.multihash)}`) + log(`Adding ${paddingBytesLength} 0s to the start of the block`) - done(error, updatedRoot) - }) - ) - }), - collect((error, results) => done(error, results && results[0])) - ) + source = cat([ + zeros(paddingBytesLength), + source + ]) } + + // create two pushable streams, one for updating existing DAGNode data + // and one for creating new DAGNodes + const updateSource = pushable() + const appendSource = pushable() + + // receive bytes from source + pull( + source, + filter(Boolean), + limitStreamBytes(options.length + paddingBytesLength), + map((buffer) => { + log(`Writing ${buffer.length} at ${destinationStreamPosition} of ${fileSize}`) + + // write to either the updating stream or appending stream depending on + // where we are in the stream + if (destinationStreamPosition < fileSize) { + if (destinationStreamPosition + buffer.length > fileSize) { + // this buffer starts inside the file but ends outside of it. + // split the buffer into two pieces, update one and append the other + updateSource.push(buffer.slice(0, fileSize - destinationStreamPosition)) + appendSource.push(buffer.slice(fileSize - destinationStreamPosition)) + } else { + // this buffer starts and ends inside the file + updateSource.push(buffer) + } + } else { + // this buffer starts outside the file + appendSource.push(buffer) + } + + // the next buffer will start after this one has finished + destinationStreamPosition += buffer.length + }), + collect((error) => { + updateSource.end(error) + appendSource.end() + }) + ) + + waterfall([ + (next) => { + // wait for both streams to end + parallel([ + // set up pull stream for replacing bytes + (cb) => updateNodeBytes(node, fileSize, updateSource, cb), + + // setup pull stream for appending bytes + (cb) => appendNodeBytes(appendSource, cb) + ], next) + }, + ([updatedNode, appendedNode], next) => { + updatedNode = updatedNode || node + + const updatedMeta = unmarshal(updatedNode.data) + const appendedMeta = unmarshal(appendedNode.data) + + if (appendedMeta.fileSize()) { + // both nodes are small + if (!updatedNode.links.length && !appendedNode.links.length) { + const totalDataLength = updatedMeta.data.length + appendedMeta.data.length + + if (totalDataLength < MAX_CHUNK_SIZE) { + // Our data should fit into one DAGNode so merge the data from both nodes.. + const newMeta = new UnixFs(updatedMeta.type, Buffer.concat([updatedMeta.data, appendedMeta.data])) + + log('combined two nodes') + return createNode(ipfs, newMeta.marshal(), [], options, next) + } else { + // We expanded one DAGNode into two so create a tree + const link1 = new DAGLink('', updatedMeta.fileSize(), updatedNode.multihash) + const link2 = new DAGLink('', appendedMeta.fileSize(), appendedNode.multihash) + + const newMeta = new UnixFs(updatedMeta.type) + newMeta.addBlockSize(updatedMeta.fileSize()) + newMeta.addBlockSize(appendedMeta.fileSize()) + + log('created one new node from two small nodes') + return createNode(ipfs, newMeta.marshal(), [link1, link2], options, next) + } + } + + // if we added new bytes, add them to the root node of the original file + // this is consistent with the go implementation but probably broken + + // update UnixFs metadata on the root node + updatedMeta.addBlockSize(appendedMeta.fileSize()) + + return waterfall([ + (cb) => DAGNode.create(updatedMeta.marshal(), updatedNode.links, cb), + (newNode, cb) => addLink(ipfs, { + parent: newNode, + child: appendedNode + }, cb) + ], next) + } + + next(null, updatedNode) + } + ], done) } ], callback) + function appendNodeBytes (source, callback) { + waterfall([ + (cb) => pull( + values([{ + content: pull(source) + }]), + importer(ipfs._ipld, { + progress: options.progress, + hashAlg: options.hash, + cidVersion: options.cidVersion, + strategy: options.strategy + }), + collect(cb) + ), + (results, cb) => loadNode(ipfs, results[0], cb) + ], callback) + } + + function updateNodeBytes (node, fileSize, source, callback) { + waterfall([ + (cb) => pull( + source, + asyncMap((buffer, done) => { + // Find the DAGNodes that contain the data at the specified offset/length + // Merge the data and create new DAGNodes with the merged data + // Keep a record of the new CIDs and update the tree + + pull( + leafFirst({ + parent: null, + link: null, + index: null, + node, + nodeStart: streamPosition, + nodeEnd: fileSize + }, findDAGNodesWithRequestedData), + paramap(updateNodeData(buffer)), + filter(Boolean), + asyncMap((link, next) => { + if (!link.parent || link.index === undefined) { + return next(null, link) + } + + // Create a new list of links + const links = link.parent.node.links.map((existingLink, index) => { + if (index === link.index) { + return new DAGLink('', link.size, link.multihash) + } + + return existingLink + }) + + // Update node's parent + waterfall([ + // Create a DAGNode with the new data + (cb) => createNode(ipfs, link.parent.node.data, links, options, cb), + (newNode, cb) => { + link.parent.node = newNode + + cb(null, link) + } + ], next) + }), + collect((error, results) => { + let updatedRoot + + if (!error) { + updatedRoot = results[0] + + while (updatedRoot.parent) { + updatedRoot = updatedRoot.parent + } + + if (updatedRoot.node) { + updatedRoot = updatedRoot.node + } + } + + offset += buffer.length + + log(`Updated root is ${bs58.encode(updatedRoot.multihash)}`) + + done(error, updatedRoot) + }) + ) + }), + collect((error, results) => cb(error, results && results[0])) + ), + (updatedNodeCID, cb) => loadNode(ipfs, updatedNodeCID, cb) + ], callback) + } + + // Where we currently are in the existing file + let streamPosition = 0 + // Returns a pull stream that will load the data from the children of the passed node function findDAGNodesWithRequestedData ({ node }) { const meta = unmarshal(node.data) @@ -192,13 +316,11 @@ const updateNode = (ipfs, cidToUpdate, source, options, callback) => { return pull( values(filteredLinks), paramap((child, cb) => { - const cid = new CID(child.link.multihash) - - log(`Loading DAGNode for child ${cid.toBaseEncodedString()}, index ${child.index}`) - - ipfs.dag.get(cid, (error, result) => cb(error, Object.assign({}, child, { - node: result && result.value - }))) + loadNode(ipfs, child.link, (error, node) => { + cb(error, Object.assign({}, child, { + node + })) + }) }) ) } @@ -232,13 +354,7 @@ const updateNode = (ipfs, cidToUpdate, source, options, callback) => { waterfall([ // Create a DAGNode with the new data - (cb) => DAGNode.create(nodeData, cb), - (newNode, cb) => { - // Persist it - ipfs.dag.put(newNode, { - cid: new CID(newNode.multihash) - }, (error) => cb(error, newNode)) - }, + (cb) => createNode(ipfs, nodeData, [], options, cb), (newNode, cb) => { log(`Created DAGNode with new data with hash ${bs58.encode(newNode.multihash)} to replace ${bs58.encode(node.multihash)}`) diff --git a/test/fixtures/buffer-stream.js b/test/fixtures/buffer-stream.js new file mode 100644 index 0000000..c9eaaab --- /dev/null +++ b/test/fixtures/buffer-stream.js @@ -0,0 +1,43 @@ +'use strict' + +const endPullStream = require('../../src/core/utils/end-pull-stream') +const crypto = require('crypto') + +const defaultOptions = { + chunkSize: 4096, + collector: () => {} +} + +const bufferStream = (limit, options = {}) => { + options = Object.assign({}, defaultOptions, options) + let emitted = 0 + + return (error, cb) => { + if (error) { + return cb(error) + } + + const nextLength = emitted + options.chunkSize + let nextChunkSize = options.chunkSize + + if (nextLength > limit) { + // emit the final chunk + nextChunkSize = limit - emitted + } + + if (nextChunkSize < 1) { + // we've emitted all requested data, end the stream + return endPullStream(cb) + } + + emitted += nextChunkSize + + const bytes = crypto.randomBytes(nextChunkSize) + + options.collector(bytes) + + return cb(null, bytes) + } +} + +module.exports = bufferStream diff --git a/test/stat.spec.js b/test/stat.spec.js index 12fa67b..78dfe18 100644 --- a/test/stat.spec.js +++ b/test/stat.spec.js @@ -60,7 +60,7 @@ describe('stat', function () { return mfs.mkdir('/empty-directory') .then(() => mfs.stat(path)) .then(stats => { - expect(stats.size).to.equal(0) + expect(stats.size).to.equal(undefined) expect(stats.cumulativeSize).to.equal(4) expect(stats.childBlocks).to.equal(0) expect(stats.type).to.equal('directory') diff --git a/test/write.spec.js b/test/write.spec.js index 9d1da46..c37007e 100644 --- a/test/write.spec.js +++ b/test/write.spec.js @@ -8,6 +8,12 @@ const path = require('path') const loadFixture = require('aegir/fixtures') const isNode = require('detect-node') const values = require('pull-stream/sources/values') +const bufferStream = require('./fixtures/buffer-stream') +const CID = require('cids') +const UnixFs = require('ipfs-unixfs') +const { + MAX_CHUNK_SIZE +} = require('../src/core/utils') let fs @@ -212,7 +218,7 @@ describe('write', function () { create: true }) .then(() => mfs.write(path, newContent)) - .then((result) => mfs.stat(path)) + .then(() => mfs.stat(path)) .then((stats) => expect(stats.size).to.equal(contentSize)) .then(() => mfs.read(path, { offset: 0, @@ -244,8 +250,101 @@ describe('write', function () { }) }) - it.skip('truncates a file when requested', () => { + runTest(({type, path, content, contentSize}) => { + it(`expands a file when an offset is specified (${type})`, () => { + const offset = contentSize - 1 + const newContent = Buffer.from('Oh hai!') + return mfs.write(path, content, { + create: true + }) + .then(() => mfs.write(path, newContent, { + offset + })) + .then(() => mfs.stat(path)) + .then((stats) => expect(stats.size).to.equal(contentSize + newContent.length - 1)) + .then(() => mfs.read(path, { + offset + })) + .then((buffer) => expect(buffer).to.deep.equal(newContent)) + }) + }) + + runTest(({type, path, content, contentSize}) => { + it(`expands a file when an offset is specified and the offset is longer than the file (${type})`, () => { + const offset = contentSize + 5 + const newContent = Buffer.from('Oh hai!') + + return mfs.write(path, content, { + create: true + }) + .then(() => mfs.write(path, newContent, { + offset + })) + .then(() => mfs.stat(path)) + .then((stats) => expect(stats.size).to.equal(newContent.length + offset)) + .then(() => mfs.read(path, { + offset: offset - 5 + })) + .then((buffer) => expect(buffer).to.deep.equal(Buffer.concat([Buffer.from([0, 0, 0, 0, 0]), newContent]))) + }) + }) + + it(`expands one DAGNode into a balanced tree`, () => { + const path = `/some-file-${Math.random()}.txt` + const data = [] + + return mfs.write(path, bufferStream(MAX_CHUNK_SIZE - 10, { + collector: (bytes) => data.push(bytes) + }), { + create: true + }) + .then(() => mfs.stat(path)) + .then((stats) => mfs.node.dag.get(new CID(stats.hash))) + .then((result) => result.value) + .then((node) => { + expect(node.links.length).to.equal(0) + + const meta = UnixFs.unmarshal(node.data) + + expect(meta.fileSize()).to.equal(data.reduce((acc, curr) => acc + curr.length, 0)) + expect(meta.data).to.deep.equal(data.reduce((acc, curr) => Buffer.concat([acc, curr]), Buffer.alloc(0))) + }) + .then(() => mfs.write(path, bufferStream(20, { + collector: (bytes) => data.push(bytes) + }), { + offset: MAX_CHUNK_SIZE - 10 + })) + .then(() => mfs.stat(path)) + .then((stats) => mfs.node.dag.get(new CID(stats.hash))) + .then((result) => result.value) + .then((node) => { + expect(node.links.length).to.equal(2) + + const meta = UnixFs.unmarshal(node.data) + + expect(meta.fileSize()).to.equal(data.reduce((acc, curr) => acc + curr.length, 0)) + expect(meta.data).to.equal(undefined) + }) + .then(() => mfs.read(path)) + .then((buffer) => expect(buffer).to.deep.equal(data.reduce((acc, curr) => Buffer.concat([acc, curr]), Buffer.alloc(0)))) + }) + + runTest(({type, path, content}) => { + it.skip(`truncates a file when requested (${type})`, () => { + const newContent = Buffer.from('Oh hai!') + + return mfs.write(path, content, { + create: true + }) + .then(() => mfs.write(path, newContent, { + truncate: true + })) + .then(() => mfs.stat(path)) + .then((stats) => expect(stats.size).to.equal(newContent.length)) + .then(() => mfs.read(path)) + .then((buffer) => expect(buffer).to.deep.equal(newContent)) + }) }) it.skip('writes a file with raw blocks for newly created leaf nodes', () => {