Skip to content
This repository has been archived by the owner on Aug 12, 2020. It is now read-only.

Commit

Permalink
feat: Adds begin/end byte slices to exporter (#207)
Browse files Browse the repository at this point in the history
  • Loading branch information
achingbrain authored and daviddias committed Mar 22, 2018
1 parent 2b5a81f commit 8e11d77
Show file tree
Hide file tree
Showing 11 changed files with 466 additions and 477 deletions.
73 changes: 12 additions & 61 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ filesStream.on('data', (file) => file.content.pipe(process.stdout))
const Exporter = require('ipfs-unixfs-engine').Exporter
```

### new Exporter(<cid or ipfsPath>, <dag or ipld-resolver>)
### new Exporter(<cid or ipfsPath>, <dag or ipld-resolver>, <options>)

Uses the given [dag API] or an [ipld-resolver instance][] to fetch an IPFS [UnixFS][] object(s) by their multiaddress.
Uses the given [dag API][] or an [ipld-resolver instance][] to fetch an IPFS [UnixFS][] object(s) by their multiaddress.

Creates a new readable stream in object mode that outputs objects of the form

Expand All @@ -178,57 +178,6 @@ Creates a new readable stream in object mode that outputs objects of the form
}
```

Errors are received as with a normal stream, by listening on the `'error'` event to be emitted.


[dag API]: https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/DAG.md
[ipld-resolver instance]: https://github.com/ipld/js-ipld-resolver
[UnixFS]: https://github.com/ipfs/specs/tree/master/unixfs

## Reader

The `reader` allows you to receive part or all of a file as a [pull-stream].

#### Reader example

```js
const readable = require('ipfs-unixfs-engine').readable
const pull = require('pull-stream')
const drain = require('pull-stream/sinks/collect')

pull(
readable(cid, ipldResolver)
collect((error, chunks) => {
// do something with the file chunks and/or handle errors
})
)
```

#### Reader API

```js
const reader = require('ipfs-unixfs-engine').reader
```

### reader(<cid or ipfsPath>, <dag or ipld-resolver>, <begin>, <end>)

Uses the given [dag API][] or an [ipld-resolver instance][] to fetch an IPFS [UnixFS][] object by their multiaddress.

Creates a new [pull-stream][] that sends the requested chunks of data as a series of [Buffer][] objects.

```js
const readable = require('ipfs-unixfs-engine').readable
const pull = require('pull-stream')
const drain = require('pull-stream/sinks/drain')

pull(
readable(cid, ipldResolver),
drain((chunk) => {
// do something with the file chunk
})
)
```

#### `begin` and `end`

`begin` and `end` arguments can optionally be passed to the reader function. These follow the same semantics as the JavaScript [`Array.slice(begin, end)`][] method.
Expand All @@ -240,14 +189,17 @@ A negative `begin` starts the slice from the end of the stream and a negative `e
See [the tests](test/reader.js) for examples of using these arguments.

```js
const readable = require('ipfs-unixfs-engine').readable
const exporter = require('ipfs-unixfs-engine').exporter
const pull = require('pull-stream')
const drain = require('pull-stream/sinks/drain')

pull(
readable(cid, ipldResolver, 0, 10)
drain((chunk) => {
// chunk is a Buffer containing only the first 10 bytes of the stream
exporter(cid, ipldResolver, {
begin: 0,
end: 10
})
drain((file) => {
// file.content() is a pull stream containing only the first 10 bytes of the file
})
)
```
Expand All @@ -257,23 +209,22 @@ pull(
Errors are received by [pull-stream][] sinks.

```js
const readable = require('ipfs-unixfs-engine').readable
const exporter = require('ipfs-unixfs-engine').exporter
const pull = require('pull-stream')
const drain = require('pull-stream/sinks/collect')

pull(
readable(cid, ipldResolver, 0, 10)
exporter(cid, ipldResolver)
collect((error, chunks) => {
// handle the error
})
)
```

[pull-stream]: https://www.npmjs.com/package/pull-stream
[Buffer]: https://www.npmjs.com/package/buffer
[dag API]: https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/DAG.md
[ipld-resolver instance]: https://github.com/ipld/js-ipld-resolver
[UnixFS]: https://github.com/ipfs/specs/tree/master/unixfs
[pull-stream]: https://www.npmjs.com/package/pull-stream
[`Array.slice(begin, end)`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array/slice

## Contribute
Expand Down
23 changes: 16 additions & 7 deletions src/exporter/file.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
'use strict'

const traverse = require('pull-traverse')
const traverseSlice = require('./traverse-slice')
const UnixFS = require('ipfs-unixfs')
const CID = require('cids')
const pull = require('pull-stream')
const paramap = require('pull-paramap')

// Logic to export a single (possibly chunked) unixfs file.
module.exports = (node, name, path, pathRest, resolve, size, dag, parent, depth) => {
module.exports = (node, name, path, pathRest, resolve, size, dag, parent, depth, begin, end) => {
function getData (node) {
try {
const file = UnixFS.unmarshal(node.data)
Expand All @@ -31,19 +32,27 @@ module.exports = (node, name, path, pathRest, resolve, size, dag, parent, depth)
return pull.empty()
}

let content = pull(
traverse.depthFirst(node, visitor),
pull.map(getData)
)

const file = UnixFS.unmarshal(node.data)
const fileSize = size || file.fileSize()

let content

if (!isNaN(begin)) {
content = traverseSlice(node, dag, begin, end)
} else {
content = pull(
traverse.depthFirst(node, visitor),
pull.map(getData)
)
}

return pull.values([{
depth: depth,
content: content,
name: name,
path: path,
hash: node.multihash,
size: size || file.fileSize(),
size: fileSize,
type: 'file'
}])
}
8 changes: 5 additions & 3 deletions src/exporter/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ function pathBaseAndRest (path) {
}

const defaultOptions = {
maxDepth: Infinity
maxDepth: Infinity,
begin: undefined,
end: undefined
}

module.exports = (path, dag, _options) => {
const options = Object.assign({}, defaultOptions, _options)
module.exports = (path, dag, options) => {
options = Object.assign({}, defaultOptions, options)

let dPath
try {
Expand Down
12 changes: 6 additions & 6 deletions src/exporter/resolve.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,33 +32,33 @@ function createResolver (dag, options, depth, parent) {
return pull.error(new Error('no depth'))
}
if (item.object) {
return cb(null, resolveItem(item.object, item))
return cb(null, resolveItem(item.object, item, options.begin, options.end))
}
dag.get(new CID(item.multihash), (err, node) => {
if (err) {
return cb(err)
}
// const name = item.fromPathRest ? item.name : item.path
cb(null, resolveItem(node.value, item))
cb(null, resolveItem(node.value, item, options.begin, options.end))
})
}),
pull.flatten(),
pull.filter(Boolean),
pull.filter((node) => node.depth <= options.maxDepth)
)

function resolveItem (node, item) {
return resolve(node, item.name, item.path, item.pathRest, item.size, dag, item.parent || parent, item.depth)
function resolveItem (node, item, begin, end) {
return resolve(node, item.name, item.path, item.pathRest, item.size, dag, item.parent || parent, item.depth, begin, end)
}

function resolve (node, name, path, pathRest, size, dag, parentNode, depth) {
function resolve (node, name, path, pathRest, size, dag, parentNode, depth, begin, end) {
const type = typeOf(node)
const nodeResolver = resolvers[type]
if (!nodeResolver) {
return pull.error(new Error('Unkown node type ' + type))
}
const resolveDeep = createResolver(dag, options, depth, node)
return nodeResolver(node, name, path, pathRest, resolveDeep, size, dag, parentNode, depth)
return nodeResolver(node, name, path, pathRest, resolveDeep, size, dag, parentNode, depth, begin, end)
}
}

Expand Down
104 changes: 104 additions & 0 deletions src/exporter/traverse-slice.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
'use strict'

const CID = require('cids')
const pull = require('pull-stream')
const asyncValues = require('pull-async-values')
const asyncMap = require('pull-stream/throughs/async-map')
const map = require('pull-stream/throughs/map')
const UnixFS = require('ipfs-unixfs')
const waterfall = require('async/waterfall')

module.exports = (fileNode, dag, begin = 0, end) => {
let streamPosition = 0

return pull(
asyncValues((cb) => {
const meta = UnixFS.unmarshal(fileNode.data)

if (meta.type !== 'file') {
return cb(new Error(`Node ${fileNode} was not a file (was ${meta.type}), can only read files`))
}

const fileSize = meta.fileSize()

if (!end || end > fileSize) {
end = fileSize
}

if (begin < 0) {
begin = fileSize + begin
}

if (end < 0) {
end = fileSize + end
}

const links = fileNode.links

if (!links || !links.length) {
if (meta.data && meta.data.length) {
// file was small enough to fit in one DAGNode so has no links
return cb(null, [(done) => done(null, meta.data)])
}

return cb(new Error(`Path ${fileNode} had no links or data`))
}

const linkedDataSize = links.reduce((acc, curr) => acc + curr.size, 0)
const overhead = (linkedDataSize - meta.fileSize()) / links.length

// create an array of functions to fetch link data
cb(null, links.map((link) => (done) => {
// DAGNode Links report unixfs object data sizes $overhead bytes (typically 14)
// larger than they actually are due to the protobuf wrapper
const bytesInLinkedObjectData = link.size - overhead

if (begin > (streamPosition + bytesInLinkedObjectData)) {
// Start byte is after this block so skip it
streamPosition += bytesInLinkedObjectData

return done()
}

if (end < streamPosition) {
// End byte was before this block so skip it
streamPosition += bytesInLinkedObjectData

return done()
}

// transform the multihash to a cid, the cid to a node and the node to some data
waterfall([
(next) => dag.get(new CID(link.multihash), next),
(node, next) => next(null, node.value.data),
(data, next) => next(null, UnixFS.unmarshal(data).data)
], done)
}))
}),
asyncMap((loadLinkData, cb) => loadLinkData(cb)),
pull.filter(Boolean),
map((data) => {
const block = extractDataFromBlock(data, streamPosition, begin, end)

streamPosition += data.length

return block
})
)
}

function extractDataFromBlock (block, streamPosition, begin, end) {
const blockLength = block.length

if (end - streamPosition < blockLength) {
// If the end byte is in the current block, truncate the block to the end byte
block = block.slice(0, end - streamPosition)
}

if (begin > streamPosition && begin < (streamPosition + blockLength)) {
// If the start byte is in the current block, skip to the start byte
block = block.slice(begin - streamPosition)
}

return block
}
1 change: 0 additions & 1 deletion src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,3 @@

exports.importer = exports.Importer = require('./importer')
exports.exporter = exports.Exporter = require('./exporter')
exports.reader = exports.Reader = require('./reader')
Loading

0 comments on commit 8e11d77

Please sign in to comment.