diff --git a/core/commands/pin.go b/core/commands/pin.go index 1829ac042725..327db5c8fca5 100644 --- a/core/commands/pin.go +++ b/core/commands/pin.go @@ -3,6 +3,7 @@ package commands import ( "bytes" "context" + "errors" "fmt" "io" "time" @@ -16,6 +17,7 @@ import ( path "github.com/ipfs/go-ipfs/path" resolver "github.com/ipfs/go-ipfs/path/resolver" pin "github.com/ipfs/go-ipfs/pin" + "github.com/ipfs/go-ipfs/thirdparty/recpinset" "github.com/ipfs/go-ipfs/thirdparty/verifcid" uio "github.com/ipfs/go-ipfs/unixfs/io" @@ -60,6 +62,7 @@ var addPinCmd = &cmds.Command{ Options: []cmdkit.Option{ cmdkit.BoolOption("recursive", "r", "Recursively pin the object linked to by the specified object(s).").WithDefault(true), cmdkit.BoolOption("progress", "Show progress"), + cmdkit.IntOption("depth-limit", "For recursive pins, only pin until the given DAG depth").WithDefault(0), }, Type: AddPinOutput{}, Run: func(req cmds.Request, res cmds.Response) { @@ -77,10 +80,28 @@ var addPinCmd = &cmds.Command{ res.SetError(err, cmdkit.ErrNormal) return } + depthLimit, _, err := req.Option("depth-limit").Int() + if err != nil { + res.SetError(err, cmdkit.ErrNormal) + return + } + + if !recursive && depthLimit != 0 { + res.SetError( + errors.New("wrong depth-limit option. Non recursive pin"), + cmdkit.ErrNormal, + ) + return + } + + if recursive && depthLimit <= 0 { + depthLimit = -1 + } + showProgress, _, _ := req.Option("progress").Bool() if !showProgress { - added, err := corerepo.Pin(n, req.Context(), req.Arguments(), recursive) + added, err := corerepo.Pin(n, req.Context(), req.Arguments(), depthLimit) if err != nil { res.SetError(err, cmdkit.ErrNormal) return @@ -100,7 +121,7 @@ var addPinCmd = &cmds.Command{ } ch := make(chan pinResult, 1) go func() { - added, err := corerepo.Pin(n, ctx, req.Arguments(), recursive) + added, err := corerepo.Pin(n, ctx, req.Arguments(), depthLimit) ch <- pinResult{pins: added, err: err} }() @@ -526,10 +547,13 @@ func pinLsKeys(ctx context.Context, args []string, typeStr string, n *core.IpfsN return nil, fmt.Errorf("path '%s' is not pinned", p) } - switch pinType { - case "direct", "indirect", "recursive", "internal": - default: - pinType = "indirect through " + pinType + mode, _ = pin.StringToMode(pinType) + if mode < pin.RecursiveN { + switch pinType { + case "direct", "indirect", "recursive", "internal": + default: + pinType = "indirect through " + pinType + } } keys[c.String()] = RefKeyObject{ Type: pinType, @@ -555,9 +579,15 @@ func pinLsAll(ctx context.Context, typeStr string, n *core.IpfsNode) (map[string AddToResultKeys(n.Pinning.DirectKeys(), "direct") } if typeStr == "indirect" || typeStr == "all" { - set := cid.NewSet() - for _, k := range n.Pinning.RecursiveKeys() { - err := dag.EnumerateChildren(ctx, dag.GetLinksWithDAG(n.DAG), k, set.Visit) + set := recpinset.New() + for _, recPin := range n.Pinning.RecursivePins() { + err := dag.EnumerateChildrenMaxDepth( + ctx, + dag.GetLinksWithDAG(n.DAG), + recPin.Cid, + recPin.MaxDepth, + set.Visit, + ) if err != nil { return nil, err } @@ -565,7 +595,12 @@ func pinLsAll(ctx context.Context, typeStr string, n *core.IpfsNode) (map[string AddToResultKeys(set.Keys(), "indirect") } if typeStr == "recursive" || typeStr == "all" { - AddToResultKeys(n.Pinning.RecursiveKeys(), "recursive") + for _, recPin := range n.Pinning.RecursivePins() { + mode, _ := pin.ModeToString(pin.MaxDepthToMode(recPin.MaxDepth)) + keys[recPin.Cid.String()] = RefKeyObject{ + Type: mode, + } + } } return keys, nil @@ -595,28 +630,42 @@ type pinVerifyOpts struct { } func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts) <-chan interface{} { - visited := make(map[string]PinStatus) + statuses := make(map[string]PinStatus) + visited := recpinset.New() bs := n.Blocks.Blockstore() DAG := dag.NewDAGService(bserv.New(bs, offline.Exchange(bs))) getLinks := dag.GetLinksWithDAG(DAG) - recPins := n.Pinning.RecursiveKeys() + recPins := n.Pinning.RecursivePins() - var checkPin func(root *cid.Cid) PinStatus - checkPin = func(root *cid.Cid) PinStatus { - key := root.String() - if status, ok := visited[key]; ok { - return status - } - - if err := verifcid.ValidateCid(root); err != nil { + validateCid := func(c *cid.Cid) PinStatus { + if err := verifcid.ValidateCid(c); err != nil { status := PinStatus{Ok: false} if opts.explain { - status.BadNodes = []BadNode{BadNode{Cid: key, Err: err.Error()}} + status.BadNodes = []BadNode{BadNode{Cid: c.String(), Err: err.Error()}} } - visited[key] = status return status } + return PinStatus{Ok: true} + } + + var checkPinMaxDepth func(root *cid.Cid, maxDepth int) PinStatus + checkPinMaxDepth = func(root *cid.Cid, maxDepth int) PinStatus { + key := root.String() + // it was visited already, return last status + if !visited.Visit(root, maxDepth) { + return statuses[key] + } + + status := validateCid(root) + if maxDepth == 0 || !status.Ok { + statuses[key] = status + return status + } + + if maxDepth > 0 { + maxDepth-- + } links, err := getLinks(ctx, root) if err != nil { @@ -624,31 +673,30 @@ func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts) <-chan if opts.explain { status.BadNodes = []BadNode{BadNode{Cid: key, Err: err.Error()}} } - visited[key] = status + statuses[key] = status return status } - status := PinStatus{Ok: true} for _, lnk := range links { - res := checkPin(lnk.Cid) + res := checkPinMaxDepth(lnk.Cid, maxDepth) if !res.Ok { status.Ok = false status.BadNodes = append(status.BadNodes, res.BadNodes...) } } - visited[key] = status + statuses[key] = status return status } out := make(chan interface{}) go func() { defer close(out) - for _, cid := range recPins { - pinStatus := checkPin(cid) + for _, recPin := range recPins { + pinStatus := checkPinMaxDepth(recPin.Cid, recPin.MaxDepth) if !pinStatus.Ok || opts.includeOk { select { - case out <- &PinVerifyRes{cid.String(), pinStatus}: + case out <- &PinVerifyRes{recPin.Cid.String(), pinStatus}: case <-ctx.Done(): return } diff --git a/core/coreapi/interface/options/pin.go b/core/coreapi/interface/options/pin.go index 9d1107f927db..29cc374ed9b8 100644 --- a/core/coreapi/interface/options/pin.go +++ b/core/coreapi/interface/options/pin.go @@ -1,7 +1,8 @@ package options type PinAddSettings struct { - Recursive bool + Recursive bool + DepthLimit int } type PinLsSettings struct { @@ -18,7 +19,8 @@ type PinUpdateOption func(*PinUpdateSettings) error func PinAddOptions(opts ...PinAddOption) (*PinAddSettings, error) { options := &PinAddSettings{ - Recursive: true, + Recursive: true, + DepthLimit: -1, } for _, opt := range opts { diff --git a/core/coreapi/pin.go b/core/coreapi/pin.go index 5f6119108347..89aef9255d5d 100644 --- a/core/coreapi/pin.go +++ b/core/coreapi/pin.go @@ -2,6 +2,7 @@ package coreapi import ( "context" + "errors" "fmt" bserv "github.com/ipfs/go-ipfs/blockservice" @@ -10,6 +11,7 @@ import ( corerepo "github.com/ipfs/go-ipfs/core/corerepo" merkledag "github.com/ipfs/go-ipfs/merkledag" pin "github.com/ipfs/go-ipfs/pin" + "github.com/ipfs/go-ipfs/thirdparty/recpinset" offline "gx/ipfs/QmPf114DXfa6TqGKYhBGR7EtXRho4rCJgwyA1xkuMY5vwF/go-ipfs-exchange-offline" ipld "gx/ipfs/QmWi2BYBL5gJ3CiAiQchg6rn1A8iBsrWy51EYxvHVjFvLb/go-ipld-format" @@ -26,7 +28,15 @@ func (api *PinAPI) Add(ctx context.Context, p coreiface.Path, opts ...caopts.Pin defer api.node.Blockstore.PinLock().Unlock() - _, err = corerepo.Pin(api.node, ctx, []string{p.String()}, settings.Recursive) + if !settings.Recursive && settings.DepthLimit != 0 { + return errors.New("Bad DepthLimit. Pin is not Recursive") + } + + if settings.Recursive && settings.DepthLimit <= 0 { + settings.DepthLimit = -1 + } + + _, err = corerepo.Pin(api.node, ctx, []string{p.String()}, settings.DepthLimit) if err != nil { return err } @@ -96,37 +106,47 @@ func (n *badNode) Err() error { } func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, error) { - visited := make(map[string]*pinStatus) + statuses := make(map[string]*pinStatus) + visited := recpinset.New() bs := api.node.Blocks.Blockstore() DAG := merkledag.NewDAGService(bserv.New(bs, offline.Exchange(bs))) getLinks := merkledag.GetLinksWithDAG(DAG) - recPins := api.node.Pinning.RecursiveKeys() + recPins := api.node.Pinning.RecursivePins() - var checkPin func(root *cid.Cid) *pinStatus - checkPin = func(root *cid.Cid) *pinStatus { + var checkPinMaxDepth func(root *cid.Cid, maxDepth int) *pinStatus + checkPinMaxDepth = func(root *cid.Cid, maxDepth int) *pinStatus { key := root.String() - if status, ok := visited[key]; ok { - return status + // it was visited already, return last status + if !visited.Visit(root, maxDepth) { + return statuses[key] + } + + if maxDepth == 0 { + return &pinStatus{ok: true, cid: root} + } + + if maxDepth > 0 { + maxDepth-- } links, err := getLinks(ctx, root) if err != nil { status := &pinStatus{ok: false, cid: root} status.badNodes = []coreiface.BadPinNode{&badNode{cid: root, err: err}} - visited[key] = status + statuses[key] = status return status } status := &pinStatus{ok: true, cid: root} for _, lnk := range links { - res := checkPin(lnk.Cid) + res := checkPinMaxDepth(lnk.Cid, maxDepth) if !res.ok { status.ok = false status.badNodes = append(status.badNodes, res.badNodes...) } } - visited[key] = status + statuses[key] = status return status } @@ -134,7 +154,7 @@ func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, erro go func() { defer close(out) for _, c := range recPins { - out <- checkPin(c) + out <- checkPinMaxDepth(c.Cid, c.MaxDepth) } }() @@ -171,9 +191,15 @@ func pinLsAll(typeStr string, ctx context.Context, pinning pin.Pinner, dag ipld. AddToResultKeys(pinning.DirectKeys(), "direct") } if typeStr == "indirect" || typeStr == "all" { - set := cid.NewSet() - for _, k := range pinning.RecursiveKeys() { - err := merkledag.EnumerateChildren(ctx, merkledag.GetLinksWithDAG(dag), k, set.Visit) + set := recpinset.New() + for _, recPin := range pinning.RecursivePins() { + err := merkledag.EnumerateChildrenMaxDepth( + ctx, + merkledag.GetLinksWithDAG(dag), + recPin.Cid, + recPin.MaxDepth, + set.Visit, + ) if err != nil { return nil, err } @@ -181,7 +207,13 @@ func pinLsAll(typeStr string, ctx context.Context, pinning pin.Pinner, dag ipld. AddToResultKeys(set.Keys(), "indirect") } if typeStr == "recursive" || typeStr == "all" { - AddToResultKeys(pinning.RecursiveKeys(), "recursive") + for _, recPin := range pinning.RecursivePins() { + mode, _ := pin.ModeToString(pin.MaxDepthToMode(recPin.MaxDepth)) + keys[recPin.Cid.String()] = &pinInfo{ + pinType: mode, + object: recPin.Cid, + } + } } out := make([]coreiface.Pin, 0, len(keys)) diff --git a/core/corerepo/pinning.go b/core/corerepo/pinning.go index 9c0dccda8f36..c0f64602ec29 100644 --- a/core/corerepo/pinning.go +++ b/core/corerepo/pinning.go @@ -25,7 +25,7 @@ import ( cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid" ) -func Pin(n *core.IpfsNode, ctx context.Context, paths []string, recursive bool) ([]*cid.Cid, error) { +func Pin(n *core.IpfsNode, ctx context.Context, paths []string, depthLimit int) ([]*cid.Cid, error) { out := make([]*cid.Cid, len(paths)) r := &resolver.Resolver{ @@ -43,7 +43,7 @@ func Pin(n *core.IpfsNode, ctx context.Context, paths []string, recursive bool) if err != nil { return nil, fmt.Errorf("pin: %s", err) } - err = n.Pinning.Pin(ctx, dagnode, recursive) + err = n.Pinning.PinToDepth(ctx, dagnode, depthLimit) if err != nil { return nil, fmt.Errorf("pin: %s", err) } diff --git a/exchange/reprovide/providers.go b/exchange/reprovide/providers.go index 51178620c308..b414ba811425 100644 --- a/exchange/reprovide/providers.go +++ b/exchange/reprovide/providers.go @@ -53,11 +53,20 @@ func pinSet(ctx context.Context, pinning pin.Pinner, dag ipld.DAGService, onlyRo set.add(key) } - for _, key := range pinning.RecursiveKeys() { - set.add(key) + for _, recPin := range pinning.RecursivePins() { + set.add(recPin.Cid) if !onlyRoots { - err := merkledag.EnumerateChildren(ctx, merkledag.GetLinksWithDAG(dag), key, set.add) + addF := func(c *cid.Cid, d int) bool { + return set.add(c) + } + err := merkledag.EnumerateChildrenMaxDepth( + ctx, + merkledag.GetLinksWithDAG(dag), + recPin.Cid, + recPin.MaxDepth, + addF, + ) if err != nil { log.Errorf("reprovide indirect pins: %s", err) return diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index a59e3469e4bf..e4febf2fd48c 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -7,6 +7,7 @@ import ( "sync" bserv "github.com/ipfs/go-ipfs/blockservice" + "github.com/ipfs/go-ipfs/thirdparty/recpinset" ipldcbor "gx/ipfs/QmSF1Ksgn5d7JCTBt4e1yp4wzs6tpYyweCZ4PcDYp3tNeK/go-ipld-cbor" blocks "gx/ipfs/QmTRCUvZLiir12Qr6MV3HKfKMHX8Nf1Vddn6t2g5nsQSb9/go-block-format" @@ -159,25 +160,32 @@ func (n *dagService) Session(ctx context.Context) ipld.NodeGetter { // FetchGraph fetches all nodes that are children of the given node func FetchGraph(ctx context.Context, root *cid.Cid, serv ipld.DAGService) error { + return FetchGraphMaxDepth(ctx, root, -1, serv) +} + +// FetchGraphMaxDepth fetches all nodes that are children to the given node +// down to the given depth. maxDetph=0 means "only fetch root", maxDepth=1 means +// fetch root and its direct children and so on... maxDepth=-1 means unlimited. +func FetchGraphMaxDepth(ctx context.Context, root *cid.Cid, maxDepth int, serv ipld.DAGService) error { var ng ipld.NodeGetter = serv ds, ok := serv.(*dagService) if ok { ng = &sesGetter{bserv.NewSession(ctx, ds.Blocks)} } + set := recpinset.New() v, _ := ctx.Value(progressContextKey).(*ProgressTracker) if v == nil { - return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, cid.NewSet().Visit) + return EnumerateChildrenAsyncMaxDepth(ctx, GetLinksDirect(ng), root, maxDepth, set.Visit) } - set := cid.NewSet() - visit := func(c *cid.Cid) bool { - if set.Visit(c) { + visit := func(c *cid.Cid, maxDepth int) bool { + if set.Visit(c, maxDepth) { v.Increment() return true } return false } - return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, visit) + return EnumerateChildrenAsyncMaxDepth(ctx, GetLinksDirect(ng), root, maxDepth, visit) } // GetMany gets many nodes from the DAG at once. @@ -255,14 +263,39 @@ func GetLinksWithDAG(ng ipld.NodeGetter) GetLinks { // unseen children to the passed in set. // TODO: parallelize to avoid disk latency perf hits? func EnumerateChildren(ctx context.Context, getLinks GetLinks, root *cid.Cid, visit func(*cid.Cid) bool) error { + visitDepth := func(c *cid.Cid, maxDepth int) bool { + return visit(c) + } + return EnumerateChildrenMaxDepth(ctx, getLinks, root, -1, visitDepth) +} + +// EnumerateChildrenMaxDepth walks the dag below the given root to the given +// depth. The root is at level 0, the children are at level 1 and so on. +// Thus, setting depth to two, will walk the root, the children, and the +// children of the children. +// Setting depth to a negative number will walk the full tree. +func EnumerateChildrenMaxDepth(ctx context.Context, getLinks GetLinks, root *cid.Cid, maxDepth int, visit func(*cid.Cid, int) bool) error { + if maxDepth == 0 { + // Root nodes are not marked as visited in this implementation + // (they are in the async version) + return nil + } + + if maxDepth > 0 { + maxDepth-- + } + links, err := getLinks(ctx, root) if err != nil { return err } for _, lnk := range links { c := lnk.Cid - if visit(c) { - err = EnumerateChildren(ctx, getLinks, c, visit) + if visit(c, maxDepth) { + // Note, visit() returns true when maxDepth + // is > than existing value (meaning we have to + // go deeper than before. + err = EnumerateChildrenMaxDepth(ctx, getLinks, c, maxDepth, visit) if err != nil { return err } @@ -306,8 +339,24 @@ var FetchGraphConcurrency = 8 // // NOTE: It *does not* make multiple concurrent calls to the passed `visit` function. func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid, visit func(*cid.Cid) bool) error { - feed := make(chan *cid.Cid) - out := make(chan []*ipld.Link) + visitDepth := func(c *cid.Cid, maxDepth int) bool { + return visit(c) + } + return EnumerateChildrenAsyncMaxDepth(ctx, getLinks, c, -1, visitDepth) +} + +// EnumerateChildrenAsyncMaxDepth is equivalent to EnumerateChildrenMaxDepth *except* that +// it fetches children in parallel (down to a maximum depth in the graph). +// +// NOTE: It *does not* make multiple concurrent calls to the passed `visit` function. +func EnumerateChildrenAsyncMaxDepth(ctx context.Context, getLinks GetLinks, c *cid.Cid, maxDepth int, visit func(*cid.Cid, int) bool) error { + type linksDepth struct { + links []*ipld.Link + maxDepth int + } + + feed := make(chan *recpinset.RecPin) + out := make(chan *linksDepth) done := make(chan struct{}) var setlk sync.Mutex @@ -319,20 +368,37 @@ func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid, for i := 0; i < FetchGraphConcurrency; i++ { go func() { - for ic := range feed { + for recPin := range feed { + maxDepth := recPin.MaxDepth + setlk.Lock() - shouldVisit := visit(ic) + // Note, visit returns true when depth + // is > than existing value (meaning we have to + // go deeper than before. + shouldVisit := visit(recPin.Cid, maxDepth) setlk.Unlock() - if shouldVisit { - links, err := getLinks(ctx, ic) + switch { + case maxDepth == 0: + // done + case shouldVisit: + if maxDepth > 0 { + maxDepth-- + } + + links, err := getLinks(ctx, recPin.Cid) if err != nil { errChan <- err return } + outLinks := &linksDepth{ + links: links, + maxDepth: maxDepth, + } + select { - case out <- links: + case out <- outLinks: case <-fetchersCtx.Done(): return } @@ -347,10 +413,13 @@ func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid, defer close(feed) send := feed - var todobuffer []*cid.Cid + var todobuffer []*recpinset.RecPin var inProgress int - next := c + next := &recpinset.RecPin{ + Cid: c, + MaxDepth: maxDepth, + } for { select { case send <- next: @@ -367,13 +436,17 @@ func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid, if inProgress == 0 && next == nil { return nil } - case links := <-out: - for _, lnk := range links { + case outLinks := <-out: + for _, lnk := range outLinks.links { + cd := &recpinset.RecPin{ + Cid: lnk.Cid, + MaxDepth: outLinks.maxDepth, + } if next == nil { - next = lnk.Cid + next = cd send = feed } else { - todobuffer = append(todobuffer, lnk.Cid) + todobuffer = append(todobuffer, cd) } } case err := <-errChan: @@ -383,7 +456,6 @@ func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid, return ctx.Err() } } - } var _ ipld.LinkGetter = &dagService{} diff --git a/merkledag/merkledag_test.go b/merkledag/merkledag_test.go index 460df81be76e..7f414079ca1e 100644 --- a/merkledag/merkledag_test.go +++ b/merkledag/merkledag_test.go @@ -18,6 +18,7 @@ import ( . "github.com/ipfs/go-ipfs/merkledag" mdpb "github.com/ipfs/go-ipfs/merkledag/pb" dstest "github.com/ipfs/go-ipfs/merkledag/test" + "github.com/ipfs/go-ipfs/thirdparty/recpinset" u "gx/ipfs/QmPdKqUcHGFdeSpvjVoaTRPPstGif9GBZb5Q56RVw9o69A/go-ipfs-util" offline "gx/ipfs/QmPf114DXfa6TqGKYhBGR7EtXRho4rCJgwyA1xkuMY5vwF/go-ipfs-exchange-offline" @@ -126,6 +127,36 @@ func TestBatchFetchDupBlock(t *testing.T) { runBatchFetchTest(t, read) } +// makeDepthTestingGraph makes a small DAG with two levels. The level-two +// nodes are both children of the root and of one of the level 1 nodes. +// This is meant to test the EnumerateChildren*MaxDepth functions. +func makeDepthTestingGraph(t *testing.T, ds ipld.DAGService) ipld.Node { + root := NodeWithData(nil) + l11 := NodeWithData([]byte("leve1_node1")) + l12 := NodeWithData([]byte("leve1_node1")) + l21 := NodeWithData([]byte("leve2_node1")) + l22 := NodeWithData([]byte("leve2_node2")) + l23 := NodeWithData([]byte("leve2_node3")) + + l11.AddNodeLink(l21.Cid().String(), l21) + l11.AddNodeLink(l23.Cid().String(), l22) + l11.AddNodeLink(l23.Cid().String(), l23) + + root.AddNodeLink(l11.Cid().String(), l11) + root.AddNodeLink(l12.Cid().String(), l12) + root.AddNodeLink(l23.Cid().String(), l23) + + ctx := context.Background() + for _, n := range []ipld.Node{l23, l22, l21, l12, l11, root} { + err := ds.Add(ctx, n) + if err != nil { + t.Fatal(err) + } + } + + return root +} + // makeTestDAG creates a simple DAG from the data in a reader. // First, a node is created from each 512 bytes of data from the reader // (like a the Size chunker would do). Then all nodes are added as children @@ -293,6 +324,22 @@ func TestFetchGraph(t *testing.T) { } } +// Check that all children of root are in the given set and in the datastore +func traverseAndCheck(t *testing.T, root ipld.Node, ds ipld.DAGService, set *cid.Set) { + // traverse dag and check + for _, lnk := range root.Links() { + c := lnk.Cid + if !set.Has(c) { + t.Fatal("missing key in set! ", lnk.Cid.String()) + } + child, err := ds.Get(context.Background(), c) + if err != nil { + t.Fatal(err) + } + traverseAndCheck(t, child, ds, set) + } +} + func TestEnumerateChildren(t *testing.T) { bsi := bstest.Mocks(1) ds := NewDAGService(bsi[0]) @@ -307,23 +354,100 @@ func TestEnumerateChildren(t *testing.T) { t.Fatal(err) } - var traverse func(n ipld.Node) - traverse = func(n ipld.Node) { - // traverse dag and check - for _, lnk := range n.Links() { - c := lnk.Cid - if !set.Has(c) { - t.Fatal("missing key in set! ", lnk.Cid.String()) - } - child, err := ds.Get(context.Background(), c) - if err != nil { - t.Fatal(err) - } - traverse(child) + traverseAndCheck(t, root, ds, set) +} + +func TestEnumerateChildrenMaxDepth(t *testing.T) { + bsi := bstest.Mocks(1) + ds := NewDAGService(bsi[0]) + root := makeDepthTestingGraph(t, ds) + + type testcase struct { + depth int + expectedLen int + } + + tests := []testcase{ + testcase{1, 3}, + testcase{0, 0}, + testcase{-1, 5}, + } + + testF := func(t *testing.T, tc testcase) { + set := recpinset.New() + err := EnumerateChildrenMaxDepth( + context.Background(), + ds.GetLinks, + root.Cid(), + tc.depth, + set.Visit, + ) + if err != nil { + t.Fatal(err) + } + if l := len(set.Keys()); l != tc.expectedLen { + t.Errorf("expected %d keys and got %d", tc.expectedLen, l) } } - traverse(root) + for _, tc := range tests { + testF(t, tc) + } +} + +func TestEnumerateChildrenAsync(t *testing.T) { + bsi := bstest.Mocks(1) + ds := NewDAGService(bsi[0]) + + read := io.LimitReader(u.NewTimeSeededRand(), 1024*1024) + root := makeTestDAG(t, read, ds) + + set := cid.NewSet() + + err := EnumerateChildrenAsync(context.Background(), ds.GetLinks, root.Cid(), set.Visit) + if err != nil { + t.Fatal(err) + } + + traverseAndCheck(t, root, ds, set) +} + +func TestEnumerateChildrenAsyncMaxDepth(t *testing.T) { + bsi := bstest.Mocks(1) + ds := NewDAGService(bsi[0]) + root := makeDepthTestingGraph(t, ds) + + type testcase struct { + depth int + expectedLen int + } + + tests := []testcase{ + testcase{1, 4}, + testcase{0, 1}, + testcase{-1, 6}, + } + + testF := func(t *testing.T, tc testcase) { + set := recpinset.New() + err := EnumerateChildrenAsyncMaxDepth( + context.Background(), + ds.GetLinks, + root.Cid(), + tc.depth, + set.Visit, + ) + if err != nil { + t.Fatal(err) + } + if l := len(set.Keys()); l != tc.expectedLen { + t.Errorf("expected %d keys and got %d", tc.expectedLen, l) + } + } + + for _, tc := range tests { + testF(t, tc) + } } func TestFetchFailure(t *testing.T) { diff --git a/pin/gc/gc.go b/pin/gc/gc.go index 69ca731d17ac..8ecddbbaef05 100644 --- a/pin/gc/gc.go +++ b/pin/gc/gc.go @@ -10,6 +10,7 @@ import ( bserv "github.com/ipfs/go-ipfs/blockservice" dag "github.com/ipfs/go-ipfs/merkledag" pin "github.com/ipfs/go-ipfs/pin" + "github.com/ipfs/go-ipfs/thirdparty/recpinset" "github.com/ipfs/go-ipfs/thirdparty/verifcid" offline "gx/ipfs/QmPf114DXfa6TqGKYhBGR7EtXRho4rCJgwyA1xkuMY5vwF/go-ipfs-exchange-offline" @@ -166,6 +167,54 @@ func Descendants(ctx context.Context, getLinks dag.GetLinks, set *cid.Set, roots return nil } +func DescendantsToDepth(ctx context.Context, getLinks dag.GetLinks, set *cid.Set, roots []*recpinset.RecPin) error { + verifyGetLinks := func(ctx context.Context, c *cid.Cid) ([]*ipld.Link, error) { + err := verifcid.ValidateCid(c) + if err != nil { + return nil, err + } + + return getLinks(ctx, c) + } + + verboseCidError := func(err error) error { + if strings.Contains(err.Error(), verifcid.ErrBelowMinimumHashLength.Error()) || + strings.Contains(err.Error(), verifcid.ErrPossiblyInsecureHashFunction.Error()) { + err = fmt.Errorf("\"%s\"\nPlease run 'ipfs pin verify'"+ + " to list insecure hashes. If you want to read them,"+ + " please downgrade your go-ipfs to 0.4.13\n", err) + log.Error(err) + } + return err + } + + recSet := recpinset.New() + + for _, recPin := range roots { + set.Add(recPin.Cid) + + // EnumerateChildren recursively walks the dag and adds the keys to the given set + err := dag.EnumerateChildrenMaxDepth( + ctx, + verifyGetLinks, + recPin.Cid, + recPin.MaxDepth, + recSet.Visit, + ) + + if err != nil { + err = verboseCidError(err) + return err + } + } + + for _, k := range recSet.Keys() { + set.Add(k) + } + + return nil +} + // ColoredSet computes the set of nodes in the graph that are pinned by the // pins in the given pinner. func ColoredSet(ctx context.Context, pn pin.Pinner, ng ipld.NodeGetter, bestEffortRoots []*cid.Cid, output chan<- Result) (*cid.Set, error) { @@ -181,7 +230,7 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ng ipld.NodeGetter, bestEffo } return links, nil } - err := Descendants(ctx, getLinks, gcs, pn.RecursiveKeys()) + err := DescendantsToDepth(ctx, getLinks, gcs, pn.RecursivePins()) if err != nil { errors = true output <- Result{Error: err} diff --git a/pin/pin.go b/pin/pin.go index 348ead7bf15a..e4cf18e79273 100644 --- a/pin/pin.go +++ b/pin/pin.go @@ -6,11 +6,15 @@ import ( "context" "fmt" "os" + "regexp" + "strconv" + "strings" "sync" "time" mdag "github.com/ipfs/go-ipfs/merkledag" dutils "github.com/ipfs/go-ipfs/merkledag/utils" + "github.com/ipfs/go-ipfs/thirdparty/recpinset" ipld "gx/ipfs/QmWi2BYBL5gJ3CiAiQchg6rn1A8iBsrWy51EYxvHVjFvLb/go-ipld-format" cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid" @@ -47,6 +51,8 @@ const ( // See the Pin Modes constants for a full list. type Mode int +var recursiveNRegexp *regexp.Regexp = regexp.MustCompile(fmt.Sprintf("%s([0-9]+)", linkRecursive)) + // Pin Modes const ( // Recursive pins pin the target cids along with any reachable children. @@ -66,6 +72,10 @@ const ( // Any refers to any pinned cid Any + + // RecursiveN are pins pinned up to the Nth level down the tree. + // RecursiveN == Recursive0. Recursive1 == RecursiveN+1 etc. + RecursiveN Mode = iota + 100 ) // ModeToString returns a human-readable name for the Mode. @@ -79,12 +89,49 @@ func ModeToString(mode Mode) (string, bool) { Any: linkAny, } s, ok := m[mode] + if !ok && mode >= RecursiveN { + s = fmt.Sprintf("%s%d", linkRecursive, ModeToMaxDepth(mode)) + ok = true + } + return s, ok } +// MaxDepthToMode converts a depth limit to the RecursiveN+depth mode. +func MaxDepthToMode(d int) Mode { + if d < 0 { + return Recursive + } + return RecursiveN + Mode(d) +} + +// ModeToMaxDepth converts a mode to the depth limit. +// It is either -1 for recursive or mode - RecursiveN for +// modes >= RecursiveN. For the rest, it's 0. +func ModeToMaxDepth(mode Mode) int { + switch { + case mode == Recursive: + return -1 + case mode >= RecursiveN: + return int(mode - RecursiveN) + default: + return 0 + } +} + // StringToMode parses the result of ModeToString() back to a Mode. // It returns a boolean which is set to false if the mode is unknown. func StringToMode(s string) (Mode, bool) { + // if s is like "recursive33", return RecursiveN+33 + recN := recursiveNRegexp.FindStringSubmatch(s) + if len(recN) == 2 { + m, err := strconv.Atoi(recN[1]) + if err != nil { + return 0, false + } + return MaxDepthToMode(m), true + } + m := map[string]Mode{ linkRecursive: Recursive, linkDirect: Direct, @@ -114,6 +161,9 @@ type Pinner interface { // Pin the given node, optionally recursively. Pin(ctx context.Context, node ipld.Node, recursive bool) error + // PinToDepth pins the given node recursively to the given depth + PinToDepth(ctx context.Context, node ipld.Node, depth int) error + // Unpin the given cid. If recursive is true, removes either a recursive or // a direct pin. If recursive is false, only removes a direct pin. Unpin(ctx context.Context, cid *cid.Cid, recursive bool) error @@ -143,8 +193,8 @@ type Pinner interface { // DirectKeys returns all directly pinned cids DirectKeys() []*cid.Cid - // DirectKeys returns all recursively pinned cids - RecursiveKeys() []*cid.Cid + // DirectKeys returns all recursively pinned cids and their MaxDepths + RecursivePins() []*recpinset.RecPin // InternalPins returns all cids kept pinned for the internal state of the // pinner @@ -182,7 +232,7 @@ func (p Pinned) String() string { // pinner implements the Pinner interface type pinner struct { lock sync.RWMutex - recursePin *cid.Set + recursePin *recpinset.Set directPin *cid.Set // Track the keys used for storing the pinning state, so gc does @@ -196,7 +246,7 @@ type pinner struct { // NewPinner creates a new pinner using the given datastore as a backend func NewPinner(dstore ds.Datastore, serv, internal ipld.DAGService) Pinner { - rcset := cid.NewSet() + rcset := recpinset.New() dirset := cid.NewSet() return &pinner{ @@ -211,6 +261,14 @@ func NewPinner(dstore ds.Datastore, serv, internal ipld.DAGService) Pinner { // Pin the given node, optionally recursive func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error { + depth := 0 + if recurse { + depth = -1 + } + return p.PinToDepth(ctx, node, depth) +} + +func (p *pinner) PinToDepth(ctx context.Context, node ipld.Node, depth int) error { p.lock.Lock() defer p.lock.Unlock() err := p.dserv.Add(ctx, node) @@ -220,8 +278,12 @@ func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error { c := node.Cid() - if recurse { - if p.recursePin.Has(c) { + // Pins with depth == 0 are "direct" + if depth < 0 || depth > 0 { + curDepth, ok := p.recursePin.MaxDepth(c) + + // only pin is something deeper isn't pinned already + if ok && !recpinset.IsDeeper(depth, curDepth) { return nil } @@ -229,13 +291,13 @@ func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error { p.directPin.Remove(c) } - // fetch entire graph - err := mdag.FetchGraph(ctx, c, p.dserv) + // fetch graph to needed depth + err := mdag.FetchGraphMaxDepth(ctx, c, depth, p.dserv) if err != nil { return err } - p.recursePin.Add(c) + p.recursePin.Add(c, depth) } else { if _, err := p.dserv.Get(ctx, c); err != nil { return err @@ -264,6 +326,11 @@ func (p *pinner) Unpin(ctx context.Context, c *cid.Cid, recursive bool) error { if !pinned { return ErrNotPinned } + + if strings.HasPrefix(reason, "recursive") { + reason = "recursive" + } + switch reason { case "recursive": if recursive { @@ -302,17 +369,22 @@ func (p *pinner) IsPinnedWithType(c *cid.Cid, mode Mode) (string, bool, error) { // isPinnedWithType is the implementation of IsPinnedWithType that does not lock. // intended for use by other pinned methods that already take locks func (p *pinner) isPinnedWithType(c *cid.Cid, mode Mode) (string, bool, error) { - switch mode { - case Any, Direct, Indirect, Recursive, Internal: - default: - err := fmt.Errorf("invalid Pin Mode '%d', must be one of {%d, %d, %d, %d, %d}", + modeStr, ok := ModeToString(mode) + if !ok { + err := fmt.Errorf( + "invalid Pin Mode '%d', must be one of {%d, %d, %d, RecursiveN, %d, %d}", mode, Direct, Indirect, Recursive, Internal, Any) return "", false, err } - if (mode == Recursive || mode == Any) && p.recursePin.Has(c) { - return linkRecursive, true, nil + + maxDepth, ok := p.recursePin.MaxDepth(c) + isRecursive := mode == Recursive || mode > RecursiveN + if (mode == Any || isRecursive) && ok { // some sort of recursive pin + modeStr, _ = ModeToString(MaxDepthToMode(maxDepth)) + return modeStr, true, nil } - if mode == Recursive { + + if isRecursive { return "", false, nil } @@ -332,13 +404,20 @@ func (p *pinner) isPinnedWithType(c *cid.Cid, mode Mode) (string, bool, error) { // Default is Indirect visitedSet := cid.NewSet() - for _, rc := range p.recursePin.Keys() { - has, err := hasChild(p.dserv, rc, c, visitedSet.Visit) + + for _, recPin := range p.recursePin.RecPins() { + has, err := hasChild( + p.dserv, + recPin.Cid, // root + c, // child + recPin.MaxDepth, + visitedSet.Visit, + ) if err != nil { return "", false, err } if has { - return rc.String(), true, nil + return recPin.Cid.String(), true, nil } } return "", false, nil @@ -354,8 +433,13 @@ func (p *pinner) CheckIfPinned(cids ...*cid.Cid) ([]Pinned, error) { // First check for non-Indirect pins directly for _, c := range cids { - if p.recursePin.Has(c) { - pinned = append(pinned, Pinned{Key: c, Mode: Recursive}) + maxDepth, ok := p.recursePin.MaxDepth(c) + if ok { + if maxDepth < 0 { + pinned = append(pinned, Pinned{Key: c, Mode: Recursive}) + } else { + pinned = append(pinned, Pinned{Key: c, Mode: MaxDepthToMode(maxDepth)}) + } } else if p.directPin.Has(c) { pinned = append(pinned, Pinned{Key: c, Mode: Direct}) } else if p.isInternalPin(c) { @@ -366,8 +450,16 @@ func (p *pinner) CheckIfPinned(cids ...*cid.Cid) ([]Pinned, error) { } // Now walk all recursive pins to check for indirect pins - var checkChildren func(*cid.Cid, *cid.Cid) error - checkChildren = func(rk, parentKey *cid.Cid) error { + var checkChildren func(*cid.Cid, *cid.Cid, int) error + checkChildren = func(rk, parentKey *cid.Cid, maxDepth int) error { + if maxDepth == 0 { + return nil + } + + if maxDepth > 0 { // ignore depth limit -1 + maxDepth-- + } + links, err := ipld.GetLinks(context.TODO(), p.dserv, parentKey) if err != nil { return err @@ -381,7 +473,7 @@ func (p *pinner) CheckIfPinned(cids ...*cid.Cid) ([]Pinned, error) { toCheck.Remove(c) } - err := checkChildren(rk, c) + err := checkChildren(rk, c, maxDepth) if err != nil { return err } @@ -393,8 +485,8 @@ func (p *pinner) CheckIfPinned(cids ...*cid.Cid) ([]Pinned, error) { return nil } - for _, rk := range p.recursePin.Keys() { - err := checkChildren(rk, rk) + for _, recPin := range p.recursePin.RecPins() { + err := checkChildren(recPin.Cid, recPin.Cid, recPin.MaxDepth) if err != nil { return nil, err } @@ -471,20 +563,34 @@ func LoadPinner(d ds.Datastore, dserv, internal ipld.DAGService) (Pinner, error) internalset.Add(rootCid) recordInternal := internalset.Add - { // load recursive set - recurseKeys, err := loadSet(ctx, internal, rootpb, linkRecursive, recordInternal) - if err != nil { - return nil, fmt.Errorf("cannot load recursive pins: %v", err) + p.recursePin = recpinset.New() + + for _, link := range rootpb.Links() { + mode, ok := StringToMode(link.Name) + if !ok { + continue } - p.recursePin = cidSetWithValues(recurseKeys) - } - { // load direct set - directKeys, err := loadSet(ctx, internal, rootpb, linkDirect, recordInternal) - if err != nil { - return nil, fmt.Errorf("cannot load direct pins: %v", err) + switch { + case mode == Recursive || mode >= RecursiveN: + depthLimit := -1 // Recursive + if mode >= RecursiveN { + depthLimit = ModeToMaxDepth(mode) + } + recurseKeys, err := loadSet(ctx, internal, rootpb, link.Name, recordInternal) + if err != nil { + return nil, fmt.Errorf("cannot load recursive pins: %v", err) + } + for _, c := range recurseKeys { + p.recursePin.Add(c, depthLimit) + } + case mode == Direct: + directKeys, err := loadSet(ctx, internal, rootpb, linkDirect, recordInternal) + if err != nil { + return nil, fmt.Errorf("cannot load direct pins: %v", err) + } + p.directPin = cidSetWithValues(directKeys) } - p.directPin = cidSetWithValues(directKeys) } p.internalPin = internalset @@ -502,9 +608,15 @@ func (p *pinner) DirectKeys() []*cid.Cid { return p.directPin.Keys() } -// RecursiveKeys returns a slice containing the recursively pinned keys -func (p *pinner) RecursiveKeys() []*cid.Cid { - return p.recursePin.Keys() +// RecursivePins returns a slice containing the recursively pinned keys +func (p *pinner) RecursivePins() []*recpinset.RecPin { + return p.recursePin.RecPins() +} + +// RecursiveWithLimitKeys returns a slice containing the recursively +// pinned keys along with their depth limit +func (p *pinner) RecursiveWithLimitKeys() []*recpinset.RecPin { + return p.recursePin.RecPins() } // Update updates a recursive pin from one cid to another @@ -523,7 +635,7 @@ func (p *pinner) Update(ctx context.Context, from, to *cid.Cid, unpin bool) erro return err } - p.recursePin.Add(to) + p.recursePin.Add(to, -1) if unpin { p.recursePin.Remove(from) } @@ -552,12 +664,25 @@ func (p *pinner) Flush() error { } { - n, err := storeSet(ctx, p.internal, p.recursePin.Keys(), recordInternal) - if err != nil { - return err + depthLimits := make(map[int][]*cid.Cid) + for _, recPin := range p.recursePin.RecPins() { + depth := recPin.MaxDepth + set := depthLimits[depth] + depthLimits[depth] = append(set, recPin.Cid) } - if err := root.AddNodeLink(linkRecursive, n); err != nil { - return err + + for depth, set := range depthLimits { + n, err := storeSet(ctx, p.internal, set, recordInternal) + if err != nil { + return err + } + linkName := linkRecursive + if depth >= 0 { + linkName, _ = ModeToString(MaxDepthToMode(depth)) + } + if err := root.AddNodeLink(linkName, n); err != nil { + return err + } } } @@ -597,17 +722,27 @@ func (p *pinner) InternalPins() []*cid.Cid { func (p *pinner) PinWithMode(c *cid.Cid, mode Mode) { p.lock.Lock() defer p.lock.Unlock() - switch mode { - case Recursive: - p.recursePin.Add(c) - case Direct: + switch { + case mode == Recursive: + p.recursePin.Add(c, -1) + case mode >= RecursiveN: + p.recursePin.Add(c, ModeToMaxDepth(mode)) + case mode == Direct: p.directPin.Add(c) } } // hasChild recursively looks for a Cid among the children of a root Cid. // The visit function can be used to shortcut already-visited branches. -func hasChild(ng ipld.NodeGetter, root *cid.Cid, child *cid.Cid, visit func(*cid.Cid) bool) (bool, error) { +func hasChild(ng ipld.NodeGetter, root *cid.Cid, child *cid.Cid, depthLimit int, visit func(*cid.Cid) bool) (bool, error) { + if depthLimit == 0 { + return false, nil + } + + if depthLimit > 0 { // ignore negative depthLimits + depthLimit-- + } + links, err := ipld.GetLinks(context.TODO(), ng, root) if err != nil { return false, err @@ -618,7 +753,7 @@ func hasChild(ng ipld.NodeGetter, root *cid.Cid, child *cid.Cid, visit func(*cid return true, nil } if visit(c) { - has, err := hasChild(ng, c, child, visit) + has, err := hasChild(ng, c, child, depthLimit, visit) if err != nil { return false, err } diff --git a/thirdparty/recpinset/recpinset.go b/thirdparty/recpinset/recpinset.go new file mode 100644 index 000000000000..d6b5fcbc7dc0 --- /dev/null +++ b/thirdparty/recpinset/recpinset.go @@ -0,0 +1,130 @@ +package recpinset + +import ( + cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid" +) + +// Set stores a set of recursive pins (Cid,MaxDepth tuples), indexed by Cid. +// A MaxDepth of -1 means fully no-limit recursive pin. +type Set struct { + set map[string]int +} + +// RecPin represents a recursive Pin, that is, a CID and a MaxDepth that +// limits the depth to which the CID is pinned. A negative MaxDepth represents +// a unlimited-depth pin. +type RecPin struct { + Cid *cid.Cid + MaxDepth int +} + +// New initializes and returns a new Set. +func New() *Set { + return &Set{set: make(map[string]int)} +} + +// Add puts a Cid and the maxDepth associated to it in the Set. +func (s *Set) Add(c *cid.Cid, maxDepth int) { + s.set[string(c.Bytes())] = maxDepth +} + +// Has returns if the Set contains a given Cid. +func (s *Set) Has(c *cid.Cid) bool { + _, ok := s.set[string(c.Bytes())] + return ok +} + +// Get returns the RecPin associated to the given Cid. +func (s *Set) Get(c *cid.Cid) (*RecPin, bool) { + md, ok := s.set[string(c.Bytes())] + if !ok { + return nil, false + } + + return &RecPin{c, md}, ok +} + +// MaxDepth returns the MaxDepth associated to the given Cid. +func (s *Set) MaxDepth(c *cid.Cid) (int, bool) { + md, ok := s.set[string(c.Bytes())] + return md, ok +} + +// Remove deletes a Cid from the Set. +func (s *Set) Remove(c *cid.Cid) { + delete(s.set, string(c.Bytes())) +} + +// Len returns how many elements the Set has. +func (s *Set) Len() int { + return len(s.set) +} + +// Keys returns the Cids in the set. +func (s *Set) Keys() []*cid.Cid { + out := make([]*cid.Cid, 0, len(s.set)) + for k := range s.set { + c, _ := cid.Cast([]byte(k)) + out = append(out, c) + } + return out +} + +// RecPins returns all the Cid+MaxDepths stored in the set, +// in a slice of RecPins. +func (s *Set) RecPins() []*RecPin { + out := make([]*RecPin, 0, len(s.set)) + for k, v := range s.set { + c, _ := cid.Cast([]byte(k)) + out = append(out, &RecPin{c, v}) + } + return out +} + +// Visit adds a cid and maxDepth to the set if: +// - it is not already in the set +// - if it's in the set but the new maxDepth is greater than +// the existing. +// It returns true if the RecPin has been added. +func (s *Set) Visit(c *cid.Cid, maxDepth int) bool { + curMaxDepth, ok := s.set[string(c.Bytes())] + + if !ok || IsDeeper(maxDepth, curMaxDepth) { + s.Add(c, maxDepth) + return true + } + + return false +} + +// ForEach allows to run a custom function on each +// Cid in the set. +func (s *Set) ForEach(f func(c *cid.Cid, maxDepth int) error) error { + for cs, md := range s.set { + c, _ := cid.Cast([]byte(cs)) + err := f(c, md) + if err != nil { + return err + } + } + return nil +} + +// Returns true if d1 is deeper than d2 +// Takes into account that -1 is deeper than anything. +func IsDeeper(d1, d2 int) bool { + // if d2 is negative, nothing is deeper: no + if d2 < 0 { + return false + } + + // d2 is >= 0 here. + + // if d1 is negative, yes + if d1 < 0 { + return true + } + + // if d1 > d2, yes + return d1 > d2 +}