Skip to content
This repository has been archived by the owner on Aug 12, 2020. It is now read-only.

Adds begin/end byte slices to exporter #207

Merged
merged 1 commit into from
Mar 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 12 additions & 61 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -165,9 +165,9 @@ filesStream.on('data', (file) => file.content.pipe(process.stdout))
const Exporter = require('ipfs-unixfs-engine').Exporter
```

### new Exporter(<cid or ipfsPath>, <dag or ipld-resolver>)
### new Exporter(<cid or ipfsPath>, <dag or ipld-resolver>, <options>)

Uses the given [dag API] or an [ipld-resolver instance][] to fetch an IPFS [UnixFS][] object(s) by their multiaddress.
Uses the given [dag API][] or an [ipld-resolver instance][] to fetch an IPFS [UnixFS][] object(s) by their multiaddress.

Creates a new readable stream in object mode that outputs objects of the form

Expand All @@ -178,57 +178,6 @@ Creates a new readable stream in object mode that outputs objects of the form
}
```

Errors are received as with a normal stream, by listening on the `'error'` event to be emitted.


[dag API]: https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/DAG.md
[ipld-resolver instance]: https://github.com/ipld/js-ipld-resolver
[UnixFS]: https://github.com/ipfs/specs/tree/master/unixfs

## Reader

The `reader` allows you to receive part or all of a file as a [pull-stream].

#### Reader example

```js
const readable = require('ipfs-unixfs-engine').readable
const pull = require('pull-stream')
const drain = require('pull-stream/sinks/collect')

pull(
readable(cid, ipldResolver)
collect((error, chunks) => {
// do something with the file chunks and/or handle errors
})
)
```

#### Reader API

```js
const reader = require('ipfs-unixfs-engine').reader
```

### reader(<cid or ipfsPath>, <dag or ipld-resolver>, <begin>, <end>)

Uses the given [dag API][] or an [ipld-resolver instance][] to fetch an IPFS [UnixFS][] object by their multiaddress.

Creates a new [pull-stream][] that sends the requested chunks of data as a series of [Buffer][] objects.

```js
const readable = require('ipfs-unixfs-engine').readable
const pull = require('pull-stream')
const drain = require('pull-stream/sinks/drain')

pull(
readable(cid, ipldResolver),
drain((chunk) => {
// do something with the file chunk
})
)
```

#### `begin` and `end`

`begin` and `end` arguments can optionally be passed to the reader function. These follow the same semantics as the JavaScript [`Array.slice(begin, end)`][] method.
Expand All @@ -240,14 +189,17 @@ A negative `begin` starts the slice from the end of the stream and a negative `e
See [the tests](test/reader.js) for examples of using these arguments.

```js
const readable = require('ipfs-unixfs-engine').readable
const exporter = require('ipfs-unixfs-engine').exporter
const pull = require('pull-stream')
const drain = require('pull-stream/sinks/drain')

pull(
readable(cid, ipldResolver, 0, 10)
drain((chunk) => {
// chunk is a Buffer containing only the first 10 bytes of the stream
exporter(cid, ipldResolver, {
begin: 0,
end: 10
})
drain((file) => {
// file.content() is a pull stream containing only the first 10 bytes of the file
})
)
```
Expand All @@ -257,23 +209,22 @@ pull(
Errors are received by [pull-stream][] sinks.

```js
const readable = require('ipfs-unixfs-engine').readable
const exporter = require('ipfs-unixfs-engine').exporter
const pull = require('pull-stream')
const drain = require('pull-stream/sinks/collect')

pull(
readable(cid, ipldResolver, 0, 10)
exporter(cid, ipldResolver)
collect((error, chunks) => {
// handle the error
})
)
```

[pull-stream]: https://www.npmjs.com/package/pull-stream
[Buffer]: https://www.npmjs.com/package/buffer
[dag API]: https://github.com/ipfs/interface-ipfs-core/blob/master/SPEC/DAG.md
[ipld-resolver instance]: https://github.com/ipld/js-ipld-resolver
[UnixFS]: https://github.com/ipfs/specs/tree/master/unixfs
[pull-stream]: https://www.npmjs.com/package/pull-stream
[`Array.slice(begin, end)`]: https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/Array/slice

## Contribute
Expand Down
23 changes: 16 additions & 7 deletions src/exporter/file.js
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
'use strict'

const traverse = require('pull-traverse')
const traverseSlice = require('./traverse-slice')
const UnixFS = require('ipfs-unixfs')
const CID = require('cids')
const pull = require('pull-stream')
const paramap = require('pull-paramap')

// Logic to export a single (possibly chunked) unixfs file.
module.exports = (node, name, path, pathRest, resolve, size, dag, parent, depth) => {
module.exports = (node, name, path, pathRest, resolve, size, dag, parent, depth, begin, end) => {
function getData (node) {
try {
const file = UnixFS.unmarshal(node.data)
Expand All @@ -31,19 +32,27 @@ module.exports = (node, name, path, pathRest, resolve, size, dag, parent, depth)
return pull.empty()
}

let content = pull(
traverse.depthFirst(node, visitor),
pull.map(getData)
)

const file = UnixFS.unmarshal(node.data)
const fileSize = size || file.fileSize()

let content

if (!isNaN(begin)) {
content = traverseSlice(node, dag, begin, end)
} else {
content = pull(
traverse.depthFirst(node, visitor),
pull.map(getData)
)
}

return pull.values([{
depth: depth,
content: content,
name: name,
path: path,
hash: node.multihash,
size: size || file.fileSize(),
size: fileSize,
type: 'file'
}])
}
8 changes: 5 additions & 3 deletions src/exporter/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,13 @@ function pathBaseAndRest (path) {
}

const defaultOptions = {
maxDepth: Infinity
maxDepth: Infinity,
begin: undefined,
end: undefined
}

module.exports = (path, dag, _options) => {
const options = Object.assign({}, defaultOptions, _options)
module.exports = (path, dag, options) => {
options = Object.assign({}, defaultOptions, options)

let dPath
try {
Expand Down
12 changes: 6 additions & 6 deletions src/exporter/resolve.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,33 +32,33 @@ function createResolver (dag, options, depth, parent) {
return pull.error(new Error('no depth'))
}
if (item.object) {
return cb(null, resolveItem(item.object, item))
return cb(null, resolveItem(item.object, item, options.begin, options.end))
}
dag.get(new CID(item.multihash), (err, node) => {
if (err) {
return cb(err)
}
// const name = item.fromPathRest ? item.name : item.path
cb(null, resolveItem(node.value, item))
cb(null, resolveItem(node.value, item, options.begin, options.end))
})
}),
pull.flatten(),
pull.filter(Boolean),
pull.filter((node) => node.depth <= options.maxDepth)
)

function resolveItem (node, item) {
return resolve(node, item.name, item.path, item.pathRest, item.size, dag, item.parent || parent, item.depth)
function resolveItem (node, item, begin, end) {
return resolve(node, item.name, item.path, item.pathRest, item.size, dag, item.parent || parent, item.depth, begin, end)
}

function resolve (node, name, path, pathRest, size, dag, parentNode, depth) {
function resolve (node, name, path, pathRest, size, dag, parentNode, depth, begin, end) {
const type = typeOf(node)
const nodeResolver = resolvers[type]
if (!nodeResolver) {
return pull.error(new Error('Unkown node type ' + type))
}
const resolveDeep = createResolver(dag, options, depth, node)
return nodeResolver(node, name, path, pathRest, resolveDeep, size, dag, parentNode, depth)
return nodeResolver(node, name, path, pathRest, resolveDeep, size, dag, parentNode, depth, begin, end)
}
}

Expand Down
104 changes: 104 additions & 0 deletions src/exporter/traverse-slice.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
'use strict'

const CID = require('cids')
const pull = require('pull-stream')
const asyncValues = require('pull-async-values')
const asyncMap = require('pull-stream/throughs/async-map')
const map = require('pull-stream/throughs/map')
const UnixFS = require('ipfs-unixfs')
const waterfall = require('async/waterfall')

module.exports = (fileNode, dag, begin = 0, end) => {
let streamPosition = 0

return pull(
asyncValues((cb) => {
const meta = UnixFS.unmarshal(fileNode.data)

if (meta.type !== 'file') {
return cb(new Error(`Node ${fileNode} was not a file (was ${meta.type}), can only read files`))
}

const fileSize = meta.fileSize()

if (!end || end > fileSize) {
end = fileSize
}

if (begin < 0) {
begin = fileSize + begin
}

if (end < 0) {
end = fileSize + end
}

const links = fileNode.links

if (!links || !links.length) {
if (meta.data && meta.data.length) {
// file was small enough to fit in one DAGNode so has no links
return cb(null, [(done) => done(null, meta.data)])
}

return cb(new Error(`Path ${fileNode} had no links or data`))
}

const linkedDataSize = links.reduce((acc, curr) => acc + curr.size, 0)
const overhead = (linkedDataSize - meta.fileSize()) / links.length

// create an array of functions to fetch link data
cb(null, links.map((link) => (done) => {
// DAGNode Links report unixfs object data sizes $overhead bytes (typically 14)
// larger than they actually are due to the protobuf wrapper
const bytesInLinkedObjectData = link.size - overhead

if (begin > (streamPosition + bytesInLinkedObjectData)) {
// Start byte is after this block so skip it
streamPosition += bytesInLinkedObjectData

return done()
}

if (end < streamPosition) {
// End byte was before this block so skip it
streamPosition += bytesInLinkedObjectData

return done()
}

// transform the multihash to a cid, the cid to a node and the node to some data
waterfall([
(next) => dag.get(new CID(link.multihash), next),
(node, next) => next(null, node.value.data),
(data, next) => next(null, UnixFS.unmarshal(data).data)
], done)
}))
}),
asyncMap((loadLinkData, cb) => loadLinkData(cb)),
pull.filter(Boolean),
map((data) => {
const block = extractDataFromBlock(data, streamPosition, begin, end)

streamPosition += data.length

return block
})
)
}

function extractDataFromBlock (block, streamPosition, begin, end) {
const blockLength = block.length

if (end - streamPosition < blockLength) {
// If the end byte is in the current block, truncate the block to the end byte
block = block.slice(0, end - streamPosition)
}

if (begin > streamPosition && begin < (streamPosition + blockLength)) {
// If the start byte is in the current block, skip to the start byte
block = block.slice(begin - streamPosition)
}

return block
}
1 change: 0 additions & 1 deletion src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,3 @@

exports.importer = exports.Importer = require('./importer')
exports.exporter = exports.Exporter = require('./exporter')
exports.reader = exports.Reader = require('./reader')
Loading