Skip to content
This repository has been archived by the owner on Aug 12, 2020. It is now read-only.

Commit

Permalink
feat: exporter - support slicing streams stored in deeply nested DAGs (
Browse files Browse the repository at this point in the history
  • Loading branch information
achingbrain authored and daviddias committed Mar 27, 2018
1 parent 34d08a6 commit 8568cd5
Show file tree
Hide file tree
Showing 4 changed files with 227 additions and 190 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
"lodash": "^4.17.5",
"multihashes": "~0.4.13",
"multihashing-async": "~0.4.8",
"pull-async-values": "^1.0.3",
"pull-batch": "^1.0.0",
"pull-block": "^1.4.0",
"pull-cat": "^1.1.11",
Expand All @@ -75,6 +74,7 @@
"pull-pause": "0.0.2",
"pull-pushable": "^2.2.0",
"pull-stream": "^3.6.2",
"pull-through": "^1.0.18",
"pull-traverse": "^1.0.3",
"pull-write": "^1.1.4",
"sparse-array": "^1.3.1"
Expand Down
145 changes: 116 additions & 29 deletions src/exporter/file.js
Original file line number Diff line number Diff line change
@@ -1,31 +1,13 @@
'use strict'

const traverse = require('pull-traverse')
const traverseSlice = require('./traverse-slice')
const UnixFS = require('ipfs-unixfs')
const CID = require('cids')
const pull = require('pull-stream')
const paramap = require('pull-paramap')

// Logic to export a single (possibly chunked) unixfs file.
module.exports = (node, name, path, pathRest, resolve, size, dag, parent, depth, begin, end) => {
function getData (node) {
try {
const file = UnixFS.unmarshal(node.data)
return file.data || Buffer.alloc(0)
} catch (err) {
throw new Error('Failed to unmarshal node')
}
}

function visitor (node) {
return pull(
pull.values(node.links),
paramap((link, cb) => dag.get(new CID(link.multihash), cb)),
pull.map((result) => result.value)
)
}

const accepts = pathRest[0]

if (accepts !== undefined && accepts !== path) {
Expand All @@ -34,17 +16,7 @@ module.exports = (node, name, path, pathRest, resolve, size, dag, parent, depth,

const file = UnixFS.unmarshal(node.data)
const fileSize = size || file.fileSize()

let content

if (!isNaN(begin)) {
content = traverseSlice(node, dag, begin, end)
} else {
content = pull(
traverse.depthFirst(node, visitor),
pull.map(getData)
)
}
const content = streamBytes(dag, node, fileSize, findByteRange(fileSize, begin, end))

return pull.values([{
depth: depth,
Expand All @@ -56,3 +28,118 @@ module.exports = (node, name, path, pathRest, resolve, size, dag, parent, depth,
type: 'file'
}])
}

function findByteRange (fileSize, begin, end) {
if (!begin) {
begin = 0
}

if (!end || end > fileSize) {
end = fileSize
}

if (begin < 0) {
begin = fileSize + begin
}

if (end < 0) {
end = fileSize + end
}

return {
begin, end
}
}

function streamBytes (dag, node, fileSize, { begin, end }) {
if (begin === end) {
return pull.empty()
}

let streamPosition = 0

function getData ({ node, start }) {
if (!node || !node.data) {
return
}

try {
const file = UnixFS.unmarshal(node.data)

if (!file.data) {
return
}

const block = extractDataFromBlock(file.data, start, begin, end)

return block
} catch (err) {
throw new Error('Failed to unmarshal node')
}
}

function visitor ({ node }) {
const file = UnixFS.unmarshal(node.data)

// work out which child nodes contain the requested data
const filteredLinks = node.links
.map((link, index) => {
const child = {
link: link,
start: streamPosition,
end: streamPosition + file.blockSizes[index]
}

streamPosition = child.end

return child
})
.filter((child, index) => {
return (begin >= child.start && begin < child.end) || // child has begin byte
(end > child.start && end <= child.end) || // child has end byte
(begin < child.start && end > child.end) // child is between begin and end bytes
})

if (filteredLinks.length) {
// move stream position to the first node we're going to return data from
streamPosition = filteredLinks[0].start
}

return pull(
pull.values(filteredLinks),
paramap((child, cb) => {
dag.get(new CID(child.link.multihash), (error, result) => cb(error, {
start: child.start,
end: child.end,
node: result && result.value
}))
})
)
}

return pull(
traverse.depthFirst({
node,
start: 0,
end: fileSize
}, visitor),
pull.map(getData),
pull.filter(Boolean)
)
}

function extractDataFromBlock (block, streamPosition, begin, end) {
const blockLength = block.length

if (end - streamPosition < blockLength) {
// If the end byte is in the current block, truncate the block to the end byte
block = block.slice(0, end - streamPosition)
}

if (begin > streamPosition && begin < (streamPosition + blockLength)) {
// If the start byte is in the current block, skip to the start byte
block = block.slice(begin - streamPosition)
}

return block
}
104 changes: 0 additions & 104 deletions src/exporter/traverse-slice.js

This file was deleted.

Loading

0 comments on commit 8568cd5

Please sign in to comment.