Skip to content

Commit

Permalink
Feat: Arbitrary-depth recursive pin levels.
Browse files Browse the repository at this point in the history
This implements #5133 introducing an option to limit how deep we fetch and store
the DAG associated to a recursive pin ("--max-depth"). This feature
comes motivated by the need to fetch and pin partial DAGs in order to do
DAG sharding with IPFS Cluster.

This means that, when pinning something to --max-depth, the DAG will be
fetched only to that depth and not more.

In order to get this, the PR introduces new recursive pin types: "recursive1"
means: the given CID is pinned along with its direct children (maxDepth=1)

"recursive2" means: the given CID is pinned along with its direct children
and its grandchildren.

And so on...

This required introducing "maxDepth" limits to all the functions walking down
DAGs (in merkledag, pin, core/commands, core/coreapi, exchange/reprovide modules).

maxDepth == -1 effectively acts as no-limit, and all these functions behave like
they did before.

In order to facilitate the task, a new CID Set type has been added:
thirdparty/recpinset. This set carries the MaxDepth associated to every Cid.
This allows to shortcut exploring already explored branches just like the original
cid.Set does. It also allows to store the Recursive pinset (and replaces cid.Set).
recpinset should be moved outside to a different repo eventually.

TODO: tests
TODO: refs -r with --max-depth

License: MIT
Signed-off-by: Hector Sanjuan <[email protected]>
  • Loading branch information
hsanjuan committed Jun 20, 2018
1 parent 2a9de81 commit 55d980a
Show file tree
Hide file tree
Showing 10 changed files with 740 additions and 139 deletions.
106 changes: 77 additions & 29 deletions core/commands/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package commands
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"time"
Expand All @@ -16,6 +17,7 @@ import (
path "github.com/ipfs/go-ipfs/path"
resolver "github.com/ipfs/go-ipfs/path/resolver"
pin "github.com/ipfs/go-ipfs/pin"
"github.com/ipfs/go-ipfs/thirdparty/recpinset"
"github.com/ipfs/go-ipfs/thirdparty/verifcid"
uio "github.com/ipfs/go-ipfs/unixfs/io"

Expand Down Expand Up @@ -60,6 +62,7 @@ var addPinCmd = &cmds.Command{
Options: []cmdkit.Option{
cmdkit.BoolOption("recursive", "r", "Recursively pin the object linked to by the specified object(s).").WithDefault(true),
cmdkit.BoolOption("progress", "Show progress"),
cmdkit.IntOption("depth-limit", "For recursive pins, only pin until the given DAG depth").WithDefault(0),
},
Type: AddPinOutput{},
Run: func(req cmds.Request, res cmds.Response) {
Expand All @@ -77,10 +80,28 @@ var addPinCmd = &cmds.Command{
res.SetError(err, cmdkit.ErrNormal)
return
}
depthLimit, _, err := req.Option("depth-limit").Int()
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
}

if !recursive && depthLimit != 0 {
res.SetError(
errors.New("wrong depth-limit option. Non recursive pin"),
cmdkit.ErrNormal,
)
return
}

if recursive && depthLimit <= 0 {
depthLimit = -1
}

showProgress, _, _ := req.Option("progress").Bool()

if !showProgress {
added, err := corerepo.Pin(n, req.Context(), req.Arguments(), recursive)
added, err := corerepo.Pin(n, req.Context(), req.Arguments(), depthLimit)
if err != nil {
res.SetError(err, cmdkit.ErrNormal)
return
Expand All @@ -100,7 +121,7 @@ var addPinCmd = &cmds.Command{
}
ch := make(chan pinResult, 1)
go func() {
added, err := corerepo.Pin(n, ctx, req.Arguments(), recursive)
added, err := corerepo.Pin(n, ctx, req.Arguments(), depthLimit)
ch <- pinResult{pins: added, err: err}
}()

Expand Down Expand Up @@ -526,10 +547,13 @@ func pinLsKeys(ctx context.Context, args []string, typeStr string, n *core.IpfsN
return nil, fmt.Errorf("path '%s' is not pinned", p)
}

switch pinType {
case "direct", "indirect", "recursive", "internal":
default:
pinType = "indirect through " + pinType
mode, _ = pin.StringToMode(pinType)
if mode < pin.RecursiveN {
switch pinType {
case "direct", "indirect", "recursive", "internal":
default:
pinType = "indirect through " + pinType
}
}
keys[c.String()] = RefKeyObject{
Type: pinType,
Expand All @@ -555,17 +579,28 @@ func pinLsAll(ctx context.Context, typeStr string, n *core.IpfsNode) (map[string
AddToResultKeys(n.Pinning.DirectKeys(), "direct")
}
if typeStr == "indirect" || typeStr == "all" {
set := cid.NewSet()
for _, k := range n.Pinning.RecursiveKeys() {
err := dag.EnumerateChildren(ctx, dag.GetLinksWithDAG(n.DAG), k, set.Visit)
set := recpinset.New()
for _, recPin := range n.Pinning.RecursivePins() {
err := dag.EnumerateChildrenMaxDepth(
ctx,
dag.GetLinksWithDAG(n.DAG),
recPin.Cid,
recPin.MaxDepth,
set.Visit,
)
if err != nil {
return nil, err
}
}
AddToResultKeys(set.Keys(), "indirect")
}
if typeStr == "recursive" || typeStr == "all" {
AddToResultKeys(n.Pinning.RecursiveKeys(), "recursive")
for _, recPin := range n.Pinning.RecursivePins() {
mode, _ := pin.ModeToString(pin.MaxDepthToMode(recPin.MaxDepth))
keys[recPin.Cid.String()] = RefKeyObject{
Type: mode,
}
}
}

return keys, nil
Expand Down Expand Up @@ -595,60 +630,73 @@ type pinVerifyOpts struct {
}

func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts) <-chan interface{} {
visited := make(map[string]PinStatus)
statuses := make(map[string]PinStatus)
visited := recpinset.New()

bs := n.Blocks.Blockstore()
DAG := dag.NewDAGService(bserv.New(bs, offline.Exchange(bs)))
getLinks := dag.GetLinksWithDAG(DAG)
recPins := n.Pinning.RecursiveKeys()
recPins := n.Pinning.RecursivePins()

var checkPin func(root *cid.Cid) PinStatus
checkPin = func(root *cid.Cid) PinStatus {
key := root.String()
if status, ok := visited[key]; ok {
return status
}

if err := verifcid.ValidateCid(root); err != nil {
validateCid := func(c *cid.Cid) PinStatus {
if err := verifcid.ValidateCid(c); err != nil {
status := PinStatus{Ok: false}
if opts.explain {
status.BadNodes = []BadNode{BadNode{Cid: key, Err: err.Error()}}
status.BadNodes = []BadNode{BadNode{Cid: c.String(), Err: err.Error()}}
}
visited[key] = status
return status
}
return PinStatus{Ok: true}
}

var checkPinMaxDepth func(root *cid.Cid, maxDepth int) PinStatus
checkPinMaxDepth = func(root *cid.Cid, maxDepth int) PinStatus {
key := root.String()
// it was visited already, return last status
if !visited.Visit(root, maxDepth) {
return statuses[key]
}

status := validateCid(root)
if maxDepth == 0 || !status.Ok {
statuses[key] = status
return status
}

if maxDepth > 0 {
maxDepth--
}

links, err := getLinks(ctx, root)
if err != nil {
status := PinStatus{Ok: false}
if opts.explain {
status.BadNodes = []BadNode{BadNode{Cid: key, Err: err.Error()}}
}
visited[key] = status
statuses[key] = status
return status
}

status := PinStatus{Ok: true}
for _, lnk := range links {
res := checkPin(lnk.Cid)
res := checkPinMaxDepth(lnk.Cid, maxDepth)
if !res.Ok {
status.Ok = false
status.BadNodes = append(status.BadNodes, res.BadNodes...)
}
}

visited[key] = status
statuses[key] = status
return status
}

out := make(chan interface{})
go func() {
defer close(out)
for _, cid := range recPins {
pinStatus := checkPin(cid)
for _, recPin := range recPins {
pinStatus := checkPinMaxDepth(recPin.Cid, recPin.MaxDepth)
if !pinStatus.Ok || opts.includeOk {
select {
case out <- &PinVerifyRes{cid.String(), pinStatus}:
case out <- &PinVerifyRes{recPin.Cid.String(), pinStatus}:
case <-ctx.Done():
return
}
Expand Down
6 changes: 4 additions & 2 deletions core/coreapi/interface/options/pin.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
package options

type PinAddSettings struct {
Recursive bool
Recursive bool
DepthLimit int
}

type PinLsSettings struct {
Expand All @@ -18,7 +19,8 @@ type PinUpdateOption func(*PinUpdateSettings) error

func PinAddOptions(opts ...PinAddOption) (*PinAddSettings, error) {
options := &PinAddSettings{
Recursive: true,
Recursive: true,
DepthLimit: -1,
}

for _, opt := range opts {
Expand Down
62 changes: 47 additions & 15 deletions core/coreapi/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package coreapi

import (
"context"
"errors"
"fmt"

bserv "github.com/ipfs/go-ipfs/blockservice"
Expand All @@ -10,6 +11,7 @@ import (
corerepo "github.com/ipfs/go-ipfs/core/corerepo"
merkledag "github.com/ipfs/go-ipfs/merkledag"
pin "github.com/ipfs/go-ipfs/pin"
"github.com/ipfs/go-ipfs/thirdparty/recpinset"

offline "gx/ipfs/QmPf114DXfa6TqGKYhBGR7EtXRho4rCJgwyA1xkuMY5vwF/go-ipfs-exchange-offline"
ipld "gx/ipfs/QmWi2BYBL5gJ3CiAiQchg6rn1A8iBsrWy51EYxvHVjFvLb/go-ipld-format"
Expand All @@ -26,7 +28,15 @@ func (api *PinAPI) Add(ctx context.Context, p coreiface.Path, opts ...caopts.Pin

defer api.node.Blockstore.PinLock().Unlock()

_, err = corerepo.Pin(api.node, ctx, []string{p.String()}, settings.Recursive)
if !settings.Recursive && settings.DepthLimit != 0 {
return errors.New("Bad DepthLimit. Pin is not Recursive")
}

if settings.Recursive && settings.DepthLimit <= 0 {
settings.DepthLimit = -1
}

_, err = corerepo.Pin(api.node, ctx, []string{p.String()}, settings.DepthLimit)
if err != nil {
return err
}
Expand Down Expand Up @@ -96,45 +106,55 @@ func (n *badNode) Err() error {
}

func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, error) {
visited := make(map[string]*pinStatus)
statuses := make(map[string]*pinStatus)
visited := recpinset.New()
bs := api.node.Blocks.Blockstore()
DAG := merkledag.NewDAGService(bserv.New(bs, offline.Exchange(bs)))
getLinks := merkledag.GetLinksWithDAG(DAG)
recPins := api.node.Pinning.RecursiveKeys()
recPins := api.node.Pinning.RecursivePins()

var checkPin func(root *cid.Cid) *pinStatus
checkPin = func(root *cid.Cid) *pinStatus {
var checkPinMaxDepth func(root *cid.Cid, maxDepth int) *pinStatus
checkPinMaxDepth = func(root *cid.Cid, maxDepth int) *pinStatus {
key := root.String()
if status, ok := visited[key]; ok {
return status
// it was visited already, return last status
if !visited.Visit(root, maxDepth) {
return statuses[key]
}

if maxDepth == 0 {
return &pinStatus{ok: true, cid: root}
}

if maxDepth > 0 {
maxDepth--
}

links, err := getLinks(ctx, root)
if err != nil {
status := &pinStatus{ok: false, cid: root}
status.badNodes = []coreiface.BadPinNode{&badNode{cid: root, err: err}}
visited[key] = status
statuses[key] = status
return status
}

status := &pinStatus{ok: true, cid: root}
for _, lnk := range links {
res := checkPin(lnk.Cid)
res := checkPinMaxDepth(lnk.Cid, maxDepth)
if !res.ok {
status.ok = false
status.badNodes = append(status.badNodes, res.badNodes...)
}
}

visited[key] = status
statuses[key] = status
return status
}

out := make(chan coreiface.PinStatus)
go func() {
defer close(out)
for _, c := range recPins {
out <- checkPin(c)
out <- checkPinMaxDepth(c.Cid, c.MaxDepth)
}
}()

Expand Down Expand Up @@ -171,17 +191,29 @@ func pinLsAll(typeStr string, ctx context.Context, pinning pin.Pinner, dag ipld.
AddToResultKeys(pinning.DirectKeys(), "direct")
}
if typeStr == "indirect" || typeStr == "all" {
set := cid.NewSet()
for _, k := range pinning.RecursiveKeys() {
err := merkledag.EnumerateChildren(ctx, merkledag.GetLinksWithDAG(dag), k, set.Visit)
set := recpinset.New()
for _, recPin := range pinning.RecursivePins() {
err := merkledag.EnumerateChildrenMaxDepth(
ctx,
merkledag.GetLinksWithDAG(dag),
recPin.Cid,
recPin.MaxDepth,
set.Visit,
)
if err != nil {
return nil, err
}
}
AddToResultKeys(set.Keys(), "indirect")
}
if typeStr == "recursive" || typeStr == "all" {
AddToResultKeys(pinning.RecursiveKeys(), "recursive")
for _, recPin := range pinning.RecursivePins() {
mode, _ := pin.ModeToString(pin.MaxDepthToMode(recPin.MaxDepth))
keys[recPin.Cid.String()] = &pinInfo{
pinType: mode,
object: recPin.Cid,
}
}
}

out := make([]coreiface.Pin, 0, len(keys))
Expand Down
4 changes: 2 additions & 2 deletions core/corerepo/pinning.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid"
)

func Pin(n *core.IpfsNode, ctx context.Context, paths []string, recursive bool) ([]*cid.Cid, error) {
func Pin(n *core.IpfsNode, ctx context.Context, paths []string, depthLimit int) ([]*cid.Cid, error) {
out := make([]*cid.Cid, len(paths))

r := &resolver.Resolver{
Expand All @@ -43,7 +43,7 @@ func Pin(n *core.IpfsNode, ctx context.Context, paths []string, recursive bool)
if err != nil {
return nil, fmt.Errorf("pin: %s", err)
}
err = n.Pinning.Pin(ctx, dagnode, recursive)
err = n.Pinning.PinToDepth(ctx, dagnode, depthLimit)
if err != nil {
return nil, fmt.Errorf("pin: %s", err)
}
Expand Down
Loading

0 comments on commit 55d980a

Please sign in to comment.