From 55d980ac4de9938f57846e1d29ae93dd78de53c2 Mon Sep 17 00:00:00 2001 From: Hector Sanjuan Date: Mon, 18 Jun 2018 15:56:54 +0200 Subject: [PATCH] Feat: Arbitrary-depth recursive pin levels. This implements #5133 introducing an option to limit how deep we fetch and store the DAG associated to a recursive pin ("--max-depth"). This feature comes motivated by the need to fetch and pin partial DAGs in order to do DAG sharding with IPFS Cluster. This means that, when pinning something to --max-depth, the DAG will be fetched only to that depth and not more. In order to get this, the PR introduces new recursive pin types: "recursive1" means: the given CID is pinned along with its direct children (maxDepth=1) "recursive2" means: the given CID is pinned along with its direct children and its grandchildren. And so on... This required introducing "maxDepth" limits to all the functions walking down DAGs (in merkledag, pin, core/commands, core/coreapi, exchange/reprovide modules). maxDepth == -1 effectively acts as no-limit, and all these functions behave like they did before. In order to facilitate the task, a new CID Set type has been added: thirdparty/recpinset. This set carries the MaxDepth associated to every Cid. This allows to shortcut exploring already explored branches just like the original cid.Set does. It also allows to store the Recursive pinset (and replaces cid.Set). recpinset should be moved outside to a different repo eventually. TODO: tests TODO: refs -r with --max-depth License: MIT Signed-off-by: Hector Sanjuan --- core/commands/pin.go | 106 ++++++++---- core/coreapi/interface/options/pin.go | 6 +- core/coreapi/pin.go | 62 +++++-- core/corerepo/pinning.go | 4 +- exchange/reprovide/providers.go | 15 +- merkledag/merkledag.go | 114 +++++++++--- merkledag/merkledag_test.go | 152 ++++++++++++++-- pin/gc/gc.go | 51 +++++- pin/pin.go | 239 ++++++++++++++++++++------ thirdparty/recpinset/recpinset.go | 130 ++++++++++++++ 10 files changed, 740 insertions(+), 139 deletions(-) create mode 100644 thirdparty/recpinset/recpinset.go diff --git a/core/commands/pin.go b/core/commands/pin.go index 1829ac042725..327db5c8fca5 100644 --- a/core/commands/pin.go +++ b/core/commands/pin.go @@ -3,6 +3,7 @@ package commands import ( "bytes" "context" + "errors" "fmt" "io" "time" @@ -16,6 +17,7 @@ import ( path "github.com/ipfs/go-ipfs/path" resolver "github.com/ipfs/go-ipfs/path/resolver" pin "github.com/ipfs/go-ipfs/pin" + "github.com/ipfs/go-ipfs/thirdparty/recpinset" "github.com/ipfs/go-ipfs/thirdparty/verifcid" uio "github.com/ipfs/go-ipfs/unixfs/io" @@ -60,6 +62,7 @@ var addPinCmd = &cmds.Command{ Options: []cmdkit.Option{ cmdkit.BoolOption("recursive", "r", "Recursively pin the object linked to by the specified object(s).").WithDefault(true), cmdkit.BoolOption("progress", "Show progress"), + cmdkit.IntOption("depth-limit", "For recursive pins, only pin until the given DAG depth").WithDefault(0), }, Type: AddPinOutput{}, Run: func(req cmds.Request, res cmds.Response) { @@ -77,10 +80,28 @@ var addPinCmd = &cmds.Command{ res.SetError(err, cmdkit.ErrNormal) return } + depthLimit, _, err := req.Option("depth-limit").Int() + if err != nil { + res.SetError(err, cmdkit.ErrNormal) + return + } + + if !recursive && depthLimit != 0 { + res.SetError( + errors.New("wrong depth-limit option. Non recursive pin"), + cmdkit.ErrNormal, + ) + return + } + + if recursive && depthLimit <= 0 { + depthLimit = -1 + } + showProgress, _, _ := req.Option("progress").Bool() if !showProgress { - added, err := corerepo.Pin(n, req.Context(), req.Arguments(), recursive) + added, err := corerepo.Pin(n, req.Context(), req.Arguments(), depthLimit) if err != nil { res.SetError(err, cmdkit.ErrNormal) return @@ -100,7 +121,7 @@ var addPinCmd = &cmds.Command{ } ch := make(chan pinResult, 1) go func() { - added, err := corerepo.Pin(n, ctx, req.Arguments(), recursive) + added, err := corerepo.Pin(n, ctx, req.Arguments(), depthLimit) ch <- pinResult{pins: added, err: err} }() @@ -526,10 +547,13 @@ func pinLsKeys(ctx context.Context, args []string, typeStr string, n *core.IpfsN return nil, fmt.Errorf("path '%s' is not pinned", p) } - switch pinType { - case "direct", "indirect", "recursive", "internal": - default: - pinType = "indirect through " + pinType + mode, _ = pin.StringToMode(pinType) + if mode < pin.RecursiveN { + switch pinType { + case "direct", "indirect", "recursive", "internal": + default: + pinType = "indirect through " + pinType + } } keys[c.String()] = RefKeyObject{ Type: pinType, @@ -555,9 +579,15 @@ func pinLsAll(ctx context.Context, typeStr string, n *core.IpfsNode) (map[string AddToResultKeys(n.Pinning.DirectKeys(), "direct") } if typeStr == "indirect" || typeStr == "all" { - set := cid.NewSet() - for _, k := range n.Pinning.RecursiveKeys() { - err := dag.EnumerateChildren(ctx, dag.GetLinksWithDAG(n.DAG), k, set.Visit) + set := recpinset.New() + for _, recPin := range n.Pinning.RecursivePins() { + err := dag.EnumerateChildrenMaxDepth( + ctx, + dag.GetLinksWithDAG(n.DAG), + recPin.Cid, + recPin.MaxDepth, + set.Visit, + ) if err != nil { return nil, err } @@ -565,7 +595,12 @@ func pinLsAll(ctx context.Context, typeStr string, n *core.IpfsNode) (map[string AddToResultKeys(set.Keys(), "indirect") } if typeStr == "recursive" || typeStr == "all" { - AddToResultKeys(n.Pinning.RecursiveKeys(), "recursive") + for _, recPin := range n.Pinning.RecursivePins() { + mode, _ := pin.ModeToString(pin.MaxDepthToMode(recPin.MaxDepth)) + keys[recPin.Cid.String()] = RefKeyObject{ + Type: mode, + } + } } return keys, nil @@ -595,28 +630,42 @@ type pinVerifyOpts struct { } func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts) <-chan interface{} { - visited := make(map[string]PinStatus) + statuses := make(map[string]PinStatus) + visited := recpinset.New() bs := n.Blocks.Blockstore() DAG := dag.NewDAGService(bserv.New(bs, offline.Exchange(bs))) getLinks := dag.GetLinksWithDAG(DAG) - recPins := n.Pinning.RecursiveKeys() + recPins := n.Pinning.RecursivePins() - var checkPin func(root *cid.Cid) PinStatus - checkPin = func(root *cid.Cid) PinStatus { - key := root.String() - if status, ok := visited[key]; ok { - return status - } - - if err := verifcid.ValidateCid(root); err != nil { + validateCid := func(c *cid.Cid) PinStatus { + if err := verifcid.ValidateCid(c); err != nil { status := PinStatus{Ok: false} if opts.explain { - status.BadNodes = []BadNode{BadNode{Cid: key, Err: err.Error()}} + status.BadNodes = []BadNode{BadNode{Cid: c.String(), Err: err.Error()}} } - visited[key] = status return status } + return PinStatus{Ok: true} + } + + var checkPinMaxDepth func(root *cid.Cid, maxDepth int) PinStatus + checkPinMaxDepth = func(root *cid.Cid, maxDepth int) PinStatus { + key := root.String() + // it was visited already, return last status + if !visited.Visit(root, maxDepth) { + return statuses[key] + } + + status := validateCid(root) + if maxDepth == 0 || !status.Ok { + statuses[key] = status + return status + } + + if maxDepth > 0 { + maxDepth-- + } links, err := getLinks(ctx, root) if err != nil { @@ -624,31 +673,30 @@ func pinVerify(ctx context.Context, n *core.IpfsNode, opts pinVerifyOpts) <-chan if opts.explain { status.BadNodes = []BadNode{BadNode{Cid: key, Err: err.Error()}} } - visited[key] = status + statuses[key] = status return status } - status := PinStatus{Ok: true} for _, lnk := range links { - res := checkPin(lnk.Cid) + res := checkPinMaxDepth(lnk.Cid, maxDepth) if !res.Ok { status.Ok = false status.BadNodes = append(status.BadNodes, res.BadNodes...) } } - visited[key] = status + statuses[key] = status return status } out := make(chan interface{}) go func() { defer close(out) - for _, cid := range recPins { - pinStatus := checkPin(cid) + for _, recPin := range recPins { + pinStatus := checkPinMaxDepth(recPin.Cid, recPin.MaxDepth) if !pinStatus.Ok || opts.includeOk { select { - case out <- &PinVerifyRes{cid.String(), pinStatus}: + case out <- &PinVerifyRes{recPin.Cid.String(), pinStatus}: case <-ctx.Done(): return } diff --git a/core/coreapi/interface/options/pin.go b/core/coreapi/interface/options/pin.go index 9d1107f927db..29cc374ed9b8 100644 --- a/core/coreapi/interface/options/pin.go +++ b/core/coreapi/interface/options/pin.go @@ -1,7 +1,8 @@ package options type PinAddSettings struct { - Recursive bool + Recursive bool + DepthLimit int } type PinLsSettings struct { @@ -18,7 +19,8 @@ type PinUpdateOption func(*PinUpdateSettings) error func PinAddOptions(opts ...PinAddOption) (*PinAddSettings, error) { options := &PinAddSettings{ - Recursive: true, + Recursive: true, + DepthLimit: -1, } for _, opt := range opts { diff --git a/core/coreapi/pin.go b/core/coreapi/pin.go index 5f6119108347..89aef9255d5d 100644 --- a/core/coreapi/pin.go +++ b/core/coreapi/pin.go @@ -2,6 +2,7 @@ package coreapi import ( "context" + "errors" "fmt" bserv "github.com/ipfs/go-ipfs/blockservice" @@ -10,6 +11,7 @@ import ( corerepo "github.com/ipfs/go-ipfs/core/corerepo" merkledag "github.com/ipfs/go-ipfs/merkledag" pin "github.com/ipfs/go-ipfs/pin" + "github.com/ipfs/go-ipfs/thirdparty/recpinset" offline "gx/ipfs/QmPf114DXfa6TqGKYhBGR7EtXRho4rCJgwyA1xkuMY5vwF/go-ipfs-exchange-offline" ipld "gx/ipfs/QmWi2BYBL5gJ3CiAiQchg6rn1A8iBsrWy51EYxvHVjFvLb/go-ipld-format" @@ -26,7 +28,15 @@ func (api *PinAPI) Add(ctx context.Context, p coreiface.Path, opts ...caopts.Pin defer api.node.Blockstore.PinLock().Unlock() - _, err = corerepo.Pin(api.node, ctx, []string{p.String()}, settings.Recursive) + if !settings.Recursive && settings.DepthLimit != 0 { + return errors.New("Bad DepthLimit. Pin is not Recursive") + } + + if settings.Recursive && settings.DepthLimit <= 0 { + settings.DepthLimit = -1 + } + + _, err = corerepo.Pin(api.node, ctx, []string{p.String()}, settings.DepthLimit) if err != nil { return err } @@ -96,37 +106,47 @@ func (n *badNode) Err() error { } func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, error) { - visited := make(map[string]*pinStatus) + statuses := make(map[string]*pinStatus) + visited := recpinset.New() bs := api.node.Blocks.Blockstore() DAG := merkledag.NewDAGService(bserv.New(bs, offline.Exchange(bs))) getLinks := merkledag.GetLinksWithDAG(DAG) - recPins := api.node.Pinning.RecursiveKeys() + recPins := api.node.Pinning.RecursivePins() - var checkPin func(root *cid.Cid) *pinStatus - checkPin = func(root *cid.Cid) *pinStatus { + var checkPinMaxDepth func(root *cid.Cid, maxDepth int) *pinStatus + checkPinMaxDepth = func(root *cid.Cid, maxDepth int) *pinStatus { key := root.String() - if status, ok := visited[key]; ok { - return status + // it was visited already, return last status + if !visited.Visit(root, maxDepth) { + return statuses[key] + } + + if maxDepth == 0 { + return &pinStatus{ok: true, cid: root} + } + + if maxDepth > 0 { + maxDepth-- } links, err := getLinks(ctx, root) if err != nil { status := &pinStatus{ok: false, cid: root} status.badNodes = []coreiface.BadPinNode{&badNode{cid: root, err: err}} - visited[key] = status + statuses[key] = status return status } status := &pinStatus{ok: true, cid: root} for _, lnk := range links { - res := checkPin(lnk.Cid) + res := checkPinMaxDepth(lnk.Cid, maxDepth) if !res.ok { status.ok = false status.badNodes = append(status.badNodes, res.badNodes...) } } - visited[key] = status + statuses[key] = status return status } @@ -134,7 +154,7 @@ func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, erro go func() { defer close(out) for _, c := range recPins { - out <- checkPin(c) + out <- checkPinMaxDepth(c.Cid, c.MaxDepth) } }() @@ -171,9 +191,15 @@ func pinLsAll(typeStr string, ctx context.Context, pinning pin.Pinner, dag ipld. AddToResultKeys(pinning.DirectKeys(), "direct") } if typeStr == "indirect" || typeStr == "all" { - set := cid.NewSet() - for _, k := range pinning.RecursiveKeys() { - err := merkledag.EnumerateChildren(ctx, merkledag.GetLinksWithDAG(dag), k, set.Visit) + set := recpinset.New() + for _, recPin := range pinning.RecursivePins() { + err := merkledag.EnumerateChildrenMaxDepth( + ctx, + merkledag.GetLinksWithDAG(dag), + recPin.Cid, + recPin.MaxDepth, + set.Visit, + ) if err != nil { return nil, err } @@ -181,7 +207,13 @@ func pinLsAll(typeStr string, ctx context.Context, pinning pin.Pinner, dag ipld. AddToResultKeys(set.Keys(), "indirect") } if typeStr == "recursive" || typeStr == "all" { - AddToResultKeys(pinning.RecursiveKeys(), "recursive") + for _, recPin := range pinning.RecursivePins() { + mode, _ := pin.ModeToString(pin.MaxDepthToMode(recPin.MaxDepth)) + keys[recPin.Cid.String()] = &pinInfo{ + pinType: mode, + object: recPin.Cid, + } + } } out := make([]coreiface.Pin, 0, len(keys)) diff --git a/core/corerepo/pinning.go b/core/corerepo/pinning.go index 9c0dccda8f36..c0f64602ec29 100644 --- a/core/corerepo/pinning.go +++ b/core/corerepo/pinning.go @@ -25,7 +25,7 @@ import ( cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid" ) -func Pin(n *core.IpfsNode, ctx context.Context, paths []string, recursive bool) ([]*cid.Cid, error) { +func Pin(n *core.IpfsNode, ctx context.Context, paths []string, depthLimit int) ([]*cid.Cid, error) { out := make([]*cid.Cid, len(paths)) r := &resolver.Resolver{ @@ -43,7 +43,7 @@ func Pin(n *core.IpfsNode, ctx context.Context, paths []string, recursive bool) if err != nil { return nil, fmt.Errorf("pin: %s", err) } - err = n.Pinning.Pin(ctx, dagnode, recursive) + err = n.Pinning.PinToDepth(ctx, dagnode, depthLimit) if err != nil { return nil, fmt.Errorf("pin: %s", err) } diff --git a/exchange/reprovide/providers.go b/exchange/reprovide/providers.go index 51178620c308..b414ba811425 100644 --- a/exchange/reprovide/providers.go +++ b/exchange/reprovide/providers.go @@ -53,11 +53,20 @@ func pinSet(ctx context.Context, pinning pin.Pinner, dag ipld.DAGService, onlyRo set.add(key) } - for _, key := range pinning.RecursiveKeys() { - set.add(key) + for _, recPin := range pinning.RecursivePins() { + set.add(recPin.Cid) if !onlyRoots { - err := merkledag.EnumerateChildren(ctx, merkledag.GetLinksWithDAG(dag), key, set.add) + addF := func(c *cid.Cid, d int) bool { + return set.add(c) + } + err := merkledag.EnumerateChildrenMaxDepth( + ctx, + merkledag.GetLinksWithDAG(dag), + recPin.Cid, + recPin.MaxDepth, + addF, + ) if err != nil { log.Errorf("reprovide indirect pins: %s", err) return diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index a59e3469e4bf..e4febf2fd48c 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -7,6 +7,7 @@ import ( "sync" bserv "github.com/ipfs/go-ipfs/blockservice" + "github.com/ipfs/go-ipfs/thirdparty/recpinset" ipldcbor "gx/ipfs/QmSF1Ksgn5d7JCTBt4e1yp4wzs6tpYyweCZ4PcDYp3tNeK/go-ipld-cbor" blocks "gx/ipfs/QmTRCUvZLiir12Qr6MV3HKfKMHX8Nf1Vddn6t2g5nsQSb9/go-block-format" @@ -159,25 +160,32 @@ func (n *dagService) Session(ctx context.Context) ipld.NodeGetter { // FetchGraph fetches all nodes that are children of the given node func FetchGraph(ctx context.Context, root *cid.Cid, serv ipld.DAGService) error { + return FetchGraphMaxDepth(ctx, root, -1, serv) +} + +// FetchGraphMaxDepth fetches all nodes that are children to the given node +// down to the given depth. maxDetph=0 means "only fetch root", maxDepth=1 means +// fetch root and its direct children and so on... maxDepth=-1 means unlimited. +func FetchGraphMaxDepth(ctx context.Context, root *cid.Cid, maxDepth int, serv ipld.DAGService) error { var ng ipld.NodeGetter = serv ds, ok := serv.(*dagService) if ok { ng = &sesGetter{bserv.NewSession(ctx, ds.Blocks)} } + set := recpinset.New() v, _ := ctx.Value(progressContextKey).(*ProgressTracker) if v == nil { - return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, cid.NewSet().Visit) + return EnumerateChildrenAsyncMaxDepth(ctx, GetLinksDirect(ng), root, maxDepth, set.Visit) } - set := cid.NewSet() - visit := func(c *cid.Cid) bool { - if set.Visit(c) { + visit := func(c *cid.Cid, maxDepth int) bool { + if set.Visit(c, maxDepth) { v.Increment() return true } return false } - return EnumerateChildrenAsync(ctx, GetLinksDirect(ng), root, visit) + return EnumerateChildrenAsyncMaxDepth(ctx, GetLinksDirect(ng), root, maxDepth, visit) } // GetMany gets many nodes from the DAG at once. @@ -255,14 +263,39 @@ func GetLinksWithDAG(ng ipld.NodeGetter) GetLinks { // unseen children to the passed in set. // TODO: parallelize to avoid disk latency perf hits? func EnumerateChildren(ctx context.Context, getLinks GetLinks, root *cid.Cid, visit func(*cid.Cid) bool) error { + visitDepth := func(c *cid.Cid, maxDepth int) bool { + return visit(c) + } + return EnumerateChildrenMaxDepth(ctx, getLinks, root, -1, visitDepth) +} + +// EnumerateChildrenMaxDepth walks the dag below the given root to the given +// depth. The root is at level 0, the children are at level 1 and so on. +// Thus, setting depth to two, will walk the root, the children, and the +// children of the children. +// Setting depth to a negative number will walk the full tree. +func EnumerateChildrenMaxDepth(ctx context.Context, getLinks GetLinks, root *cid.Cid, maxDepth int, visit func(*cid.Cid, int) bool) error { + if maxDepth == 0 { + // Root nodes are not marked as visited in this implementation + // (they are in the async version) + return nil + } + + if maxDepth > 0 { + maxDepth-- + } + links, err := getLinks(ctx, root) if err != nil { return err } for _, lnk := range links { c := lnk.Cid - if visit(c) { - err = EnumerateChildren(ctx, getLinks, c, visit) + if visit(c, maxDepth) { + // Note, visit() returns true when maxDepth + // is > than existing value (meaning we have to + // go deeper than before. + err = EnumerateChildrenMaxDepth(ctx, getLinks, c, maxDepth, visit) if err != nil { return err } @@ -306,8 +339,24 @@ var FetchGraphConcurrency = 8 // // NOTE: It *does not* make multiple concurrent calls to the passed `visit` function. func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid, visit func(*cid.Cid) bool) error { - feed := make(chan *cid.Cid) - out := make(chan []*ipld.Link) + visitDepth := func(c *cid.Cid, maxDepth int) bool { + return visit(c) + } + return EnumerateChildrenAsyncMaxDepth(ctx, getLinks, c, -1, visitDepth) +} + +// EnumerateChildrenAsyncMaxDepth is equivalent to EnumerateChildrenMaxDepth *except* that +// it fetches children in parallel (down to a maximum depth in the graph). +// +// NOTE: It *does not* make multiple concurrent calls to the passed `visit` function. +func EnumerateChildrenAsyncMaxDepth(ctx context.Context, getLinks GetLinks, c *cid.Cid, maxDepth int, visit func(*cid.Cid, int) bool) error { + type linksDepth struct { + links []*ipld.Link + maxDepth int + } + + feed := make(chan *recpinset.RecPin) + out := make(chan *linksDepth) done := make(chan struct{}) var setlk sync.Mutex @@ -319,20 +368,37 @@ func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid, for i := 0; i < FetchGraphConcurrency; i++ { go func() { - for ic := range feed { + for recPin := range feed { + maxDepth := recPin.MaxDepth + setlk.Lock() - shouldVisit := visit(ic) + // Note, visit returns true when depth + // is > than existing value (meaning we have to + // go deeper than before. + shouldVisit := visit(recPin.Cid, maxDepth) setlk.Unlock() - if shouldVisit { - links, err := getLinks(ctx, ic) + switch { + case maxDepth == 0: + // done + case shouldVisit: + if maxDepth > 0 { + maxDepth-- + } + + links, err := getLinks(ctx, recPin.Cid) if err != nil { errChan <- err return } + outLinks := &linksDepth{ + links: links, + maxDepth: maxDepth, + } + select { - case out <- links: + case out <- outLinks: case <-fetchersCtx.Done(): return } @@ -347,10 +413,13 @@ func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid, defer close(feed) send := feed - var todobuffer []*cid.Cid + var todobuffer []*recpinset.RecPin var inProgress int - next := c + next := &recpinset.RecPin{ + Cid: c, + MaxDepth: maxDepth, + } for { select { case send <- next: @@ -367,13 +436,17 @@ func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid, if inProgress == 0 && next == nil { return nil } - case links := <-out: - for _, lnk := range links { + case outLinks := <-out: + for _, lnk := range outLinks.links { + cd := &recpinset.RecPin{ + Cid: lnk.Cid, + MaxDepth: outLinks.maxDepth, + } if next == nil { - next = lnk.Cid + next = cd send = feed } else { - todobuffer = append(todobuffer, lnk.Cid) + todobuffer = append(todobuffer, cd) } } case err := <-errChan: @@ -383,7 +456,6 @@ func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c *cid.Cid, return ctx.Err() } } - } var _ ipld.LinkGetter = &dagService{} diff --git a/merkledag/merkledag_test.go b/merkledag/merkledag_test.go index 460df81be76e..7f414079ca1e 100644 --- a/merkledag/merkledag_test.go +++ b/merkledag/merkledag_test.go @@ -18,6 +18,7 @@ import ( . "github.com/ipfs/go-ipfs/merkledag" mdpb "github.com/ipfs/go-ipfs/merkledag/pb" dstest "github.com/ipfs/go-ipfs/merkledag/test" + "github.com/ipfs/go-ipfs/thirdparty/recpinset" u "gx/ipfs/QmPdKqUcHGFdeSpvjVoaTRPPstGif9GBZb5Q56RVw9o69A/go-ipfs-util" offline "gx/ipfs/QmPf114DXfa6TqGKYhBGR7EtXRho4rCJgwyA1xkuMY5vwF/go-ipfs-exchange-offline" @@ -126,6 +127,36 @@ func TestBatchFetchDupBlock(t *testing.T) { runBatchFetchTest(t, read) } +// makeDepthTestingGraph makes a small DAG with two levels. The level-two +// nodes are both children of the root and of one of the level 1 nodes. +// This is meant to test the EnumerateChildren*MaxDepth functions. +func makeDepthTestingGraph(t *testing.T, ds ipld.DAGService) ipld.Node { + root := NodeWithData(nil) + l11 := NodeWithData([]byte("leve1_node1")) + l12 := NodeWithData([]byte("leve1_node1")) + l21 := NodeWithData([]byte("leve2_node1")) + l22 := NodeWithData([]byte("leve2_node2")) + l23 := NodeWithData([]byte("leve2_node3")) + + l11.AddNodeLink(l21.Cid().String(), l21) + l11.AddNodeLink(l23.Cid().String(), l22) + l11.AddNodeLink(l23.Cid().String(), l23) + + root.AddNodeLink(l11.Cid().String(), l11) + root.AddNodeLink(l12.Cid().String(), l12) + root.AddNodeLink(l23.Cid().String(), l23) + + ctx := context.Background() + for _, n := range []ipld.Node{l23, l22, l21, l12, l11, root} { + err := ds.Add(ctx, n) + if err != nil { + t.Fatal(err) + } + } + + return root +} + // makeTestDAG creates a simple DAG from the data in a reader. // First, a node is created from each 512 bytes of data from the reader // (like a the Size chunker would do). Then all nodes are added as children @@ -293,6 +324,22 @@ func TestFetchGraph(t *testing.T) { } } +// Check that all children of root are in the given set and in the datastore +func traverseAndCheck(t *testing.T, root ipld.Node, ds ipld.DAGService, set *cid.Set) { + // traverse dag and check + for _, lnk := range root.Links() { + c := lnk.Cid + if !set.Has(c) { + t.Fatal("missing key in set! ", lnk.Cid.String()) + } + child, err := ds.Get(context.Background(), c) + if err != nil { + t.Fatal(err) + } + traverseAndCheck(t, child, ds, set) + } +} + func TestEnumerateChildren(t *testing.T) { bsi := bstest.Mocks(1) ds := NewDAGService(bsi[0]) @@ -307,23 +354,100 @@ func TestEnumerateChildren(t *testing.T) { t.Fatal(err) } - var traverse func(n ipld.Node) - traverse = func(n ipld.Node) { - // traverse dag and check - for _, lnk := range n.Links() { - c := lnk.Cid - if !set.Has(c) { - t.Fatal("missing key in set! ", lnk.Cid.String()) - } - child, err := ds.Get(context.Background(), c) - if err != nil { - t.Fatal(err) - } - traverse(child) + traverseAndCheck(t, root, ds, set) +} + +func TestEnumerateChildrenMaxDepth(t *testing.T) { + bsi := bstest.Mocks(1) + ds := NewDAGService(bsi[0]) + root := makeDepthTestingGraph(t, ds) + + type testcase struct { + depth int + expectedLen int + } + + tests := []testcase{ + testcase{1, 3}, + testcase{0, 0}, + testcase{-1, 5}, + } + + testF := func(t *testing.T, tc testcase) { + set := recpinset.New() + err := EnumerateChildrenMaxDepth( + context.Background(), + ds.GetLinks, + root.Cid(), + tc.depth, + set.Visit, + ) + if err != nil { + t.Fatal(err) + } + if l := len(set.Keys()); l != tc.expectedLen { + t.Errorf("expected %d keys and got %d", tc.expectedLen, l) } } - traverse(root) + for _, tc := range tests { + testF(t, tc) + } +} + +func TestEnumerateChildrenAsync(t *testing.T) { + bsi := bstest.Mocks(1) + ds := NewDAGService(bsi[0]) + + read := io.LimitReader(u.NewTimeSeededRand(), 1024*1024) + root := makeTestDAG(t, read, ds) + + set := cid.NewSet() + + err := EnumerateChildrenAsync(context.Background(), ds.GetLinks, root.Cid(), set.Visit) + if err != nil { + t.Fatal(err) + } + + traverseAndCheck(t, root, ds, set) +} + +func TestEnumerateChildrenAsyncMaxDepth(t *testing.T) { + bsi := bstest.Mocks(1) + ds := NewDAGService(bsi[0]) + root := makeDepthTestingGraph(t, ds) + + type testcase struct { + depth int + expectedLen int + } + + tests := []testcase{ + testcase{1, 4}, + testcase{0, 1}, + testcase{-1, 6}, + } + + testF := func(t *testing.T, tc testcase) { + set := recpinset.New() + err := EnumerateChildrenAsyncMaxDepth( + context.Background(), + ds.GetLinks, + root.Cid(), + tc.depth, + set.Visit, + ) + if err != nil { + t.Fatal(err) + } + if l := len(set.Keys()); l != tc.expectedLen { + t.Errorf("expected %d keys and got %d", tc.expectedLen, l) + } + } + + for _, tc := range tests { + testF(t, tc) + } } func TestFetchFailure(t *testing.T) { diff --git a/pin/gc/gc.go b/pin/gc/gc.go index 69ca731d17ac..8ecddbbaef05 100644 --- a/pin/gc/gc.go +++ b/pin/gc/gc.go @@ -10,6 +10,7 @@ import ( bserv "github.com/ipfs/go-ipfs/blockservice" dag "github.com/ipfs/go-ipfs/merkledag" pin "github.com/ipfs/go-ipfs/pin" + "github.com/ipfs/go-ipfs/thirdparty/recpinset" "github.com/ipfs/go-ipfs/thirdparty/verifcid" offline "gx/ipfs/QmPf114DXfa6TqGKYhBGR7EtXRho4rCJgwyA1xkuMY5vwF/go-ipfs-exchange-offline" @@ -166,6 +167,54 @@ func Descendants(ctx context.Context, getLinks dag.GetLinks, set *cid.Set, roots return nil } +func DescendantsToDepth(ctx context.Context, getLinks dag.GetLinks, set *cid.Set, roots []*recpinset.RecPin) error { + verifyGetLinks := func(ctx context.Context, c *cid.Cid) ([]*ipld.Link, error) { + err := verifcid.ValidateCid(c) + if err != nil { + return nil, err + } + + return getLinks(ctx, c) + } + + verboseCidError := func(err error) error { + if strings.Contains(err.Error(), verifcid.ErrBelowMinimumHashLength.Error()) || + strings.Contains(err.Error(), verifcid.ErrPossiblyInsecureHashFunction.Error()) { + err = fmt.Errorf("\"%s\"\nPlease run 'ipfs pin verify'"+ + " to list insecure hashes. If you want to read them,"+ + " please downgrade your go-ipfs to 0.4.13\n", err) + log.Error(err) + } + return err + } + + recSet := recpinset.New() + + for _, recPin := range roots { + set.Add(recPin.Cid) + + // EnumerateChildren recursively walks the dag and adds the keys to the given set + err := dag.EnumerateChildrenMaxDepth( + ctx, + verifyGetLinks, + recPin.Cid, + recPin.MaxDepth, + recSet.Visit, + ) + + if err != nil { + err = verboseCidError(err) + return err + } + } + + for _, k := range recSet.Keys() { + set.Add(k) + } + + return nil +} + // ColoredSet computes the set of nodes in the graph that are pinned by the // pins in the given pinner. func ColoredSet(ctx context.Context, pn pin.Pinner, ng ipld.NodeGetter, bestEffortRoots []*cid.Cid, output chan<- Result) (*cid.Set, error) { @@ -181,7 +230,7 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ng ipld.NodeGetter, bestEffo } return links, nil } - err := Descendants(ctx, getLinks, gcs, pn.RecursiveKeys()) + err := DescendantsToDepth(ctx, getLinks, gcs, pn.RecursivePins()) if err != nil { errors = true output <- Result{Error: err} diff --git a/pin/pin.go b/pin/pin.go index 348ead7bf15a..e4cf18e79273 100644 --- a/pin/pin.go +++ b/pin/pin.go @@ -6,11 +6,15 @@ import ( "context" "fmt" "os" + "regexp" + "strconv" + "strings" "sync" "time" mdag "github.com/ipfs/go-ipfs/merkledag" dutils "github.com/ipfs/go-ipfs/merkledag/utils" + "github.com/ipfs/go-ipfs/thirdparty/recpinset" ipld "gx/ipfs/QmWi2BYBL5gJ3CiAiQchg6rn1A8iBsrWy51EYxvHVjFvLb/go-ipld-format" cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid" @@ -47,6 +51,8 @@ const ( // See the Pin Modes constants for a full list. type Mode int +var recursiveNRegexp *regexp.Regexp = regexp.MustCompile(fmt.Sprintf("%s([0-9]+)", linkRecursive)) + // Pin Modes const ( // Recursive pins pin the target cids along with any reachable children. @@ -66,6 +72,10 @@ const ( // Any refers to any pinned cid Any + + // RecursiveN are pins pinned up to the Nth level down the tree. + // RecursiveN == Recursive0. Recursive1 == RecursiveN+1 etc. + RecursiveN Mode = iota + 100 ) // ModeToString returns a human-readable name for the Mode. @@ -79,12 +89,49 @@ func ModeToString(mode Mode) (string, bool) { Any: linkAny, } s, ok := m[mode] + if !ok && mode >= RecursiveN { + s = fmt.Sprintf("%s%d", linkRecursive, ModeToMaxDepth(mode)) + ok = true + } + return s, ok } +// MaxDepthToMode converts a depth limit to the RecursiveN+depth mode. +func MaxDepthToMode(d int) Mode { + if d < 0 { + return Recursive + } + return RecursiveN + Mode(d) +} + +// ModeToMaxDepth converts a mode to the depth limit. +// It is either -1 for recursive or mode - RecursiveN for +// modes >= RecursiveN. For the rest, it's 0. +func ModeToMaxDepth(mode Mode) int { + switch { + case mode == Recursive: + return -1 + case mode >= RecursiveN: + return int(mode - RecursiveN) + default: + return 0 + } +} + // StringToMode parses the result of ModeToString() back to a Mode. // It returns a boolean which is set to false if the mode is unknown. func StringToMode(s string) (Mode, bool) { + // if s is like "recursive33", return RecursiveN+33 + recN := recursiveNRegexp.FindStringSubmatch(s) + if len(recN) == 2 { + m, err := strconv.Atoi(recN[1]) + if err != nil { + return 0, false + } + return MaxDepthToMode(m), true + } + m := map[string]Mode{ linkRecursive: Recursive, linkDirect: Direct, @@ -114,6 +161,9 @@ type Pinner interface { // Pin the given node, optionally recursively. Pin(ctx context.Context, node ipld.Node, recursive bool) error + // PinToDepth pins the given node recursively to the given depth + PinToDepth(ctx context.Context, node ipld.Node, depth int) error + // Unpin the given cid. If recursive is true, removes either a recursive or // a direct pin. If recursive is false, only removes a direct pin. Unpin(ctx context.Context, cid *cid.Cid, recursive bool) error @@ -143,8 +193,8 @@ type Pinner interface { // DirectKeys returns all directly pinned cids DirectKeys() []*cid.Cid - // DirectKeys returns all recursively pinned cids - RecursiveKeys() []*cid.Cid + // DirectKeys returns all recursively pinned cids and their MaxDepths + RecursivePins() []*recpinset.RecPin // InternalPins returns all cids kept pinned for the internal state of the // pinner @@ -182,7 +232,7 @@ func (p Pinned) String() string { // pinner implements the Pinner interface type pinner struct { lock sync.RWMutex - recursePin *cid.Set + recursePin *recpinset.Set directPin *cid.Set // Track the keys used for storing the pinning state, so gc does @@ -196,7 +246,7 @@ type pinner struct { // NewPinner creates a new pinner using the given datastore as a backend func NewPinner(dstore ds.Datastore, serv, internal ipld.DAGService) Pinner { - rcset := cid.NewSet() + rcset := recpinset.New() dirset := cid.NewSet() return &pinner{ @@ -211,6 +261,14 @@ func NewPinner(dstore ds.Datastore, serv, internal ipld.DAGService) Pinner { // Pin the given node, optionally recursive func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error { + depth := 0 + if recurse { + depth = -1 + } + return p.PinToDepth(ctx, node, depth) +} + +func (p *pinner) PinToDepth(ctx context.Context, node ipld.Node, depth int) error { p.lock.Lock() defer p.lock.Unlock() err := p.dserv.Add(ctx, node) @@ -220,8 +278,12 @@ func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error { c := node.Cid() - if recurse { - if p.recursePin.Has(c) { + // Pins with depth == 0 are "direct" + if depth < 0 || depth > 0 { + curDepth, ok := p.recursePin.MaxDepth(c) + + // only pin is something deeper isn't pinned already + if ok && !recpinset.IsDeeper(depth, curDepth) { return nil } @@ -229,13 +291,13 @@ func (p *pinner) Pin(ctx context.Context, node ipld.Node, recurse bool) error { p.directPin.Remove(c) } - // fetch entire graph - err := mdag.FetchGraph(ctx, c, p.dserv) + // fetch graph to needed depth + err := mdag.FetchGraphMaxDepth(ctx, c, depth, p.dserv) if err != nil { return err } - p.recursePin.Add(c) + p.recursePin.Add(c, depth) } else { if _, err := p.dserv.Get(ctx, c); err != nil { return err @@ -264,6 +326,11 @@ func (p *pinner) Unpin(ctx context.Context, c *cid.Cid, recursive bool) error { if !pinned { return ErrNotPinned } + + if strings.HasPrefix(reason, "recursive") { + reason = "recursive" + } + switch reason { case "recursive": if recursive { @@ -302,17 +369,22 @@ func (p *pinner) IsPinnedWithType(c *cid.Cid, mode Mode) (string, bool, error) { // isPinnedWithType is the implementation of IsPinnedWithType that does not lock. // intended for use by other pinned methods that already take locks func (p *pinner) isPinnedWithType(c *cid.Cid, mode Mode) (string, bool, error) { - switch mode { - case Any, Direct, Indirect, Recursive, Internal: - default: - err := fmt.Errorf("invalid Pin Mode '%d', must be one of {%d, %d, %d, %d, %d}", + modeStr, ok := ModeToString(mode) + if !ok { + err := fmt.Errorf( + "invalid Pin Mode '%d', must be one of {%d, %d, %d, RecursiveN, %d, %d}", mode, Direct, Indirect, Recursive, Internal, Any) return "", false, err } - if (mode == Recursive || mode == Any) && p.recursePin.Has(c) { - return linkRecursive, true, nil + + maxDepth, ok := p.recursePin.MaxDepth(c) + isRecursive := mode == Recursive || mode > RecursiveN + if (mode == Any || isRecursive) && ok { // some sort of recursive pin + modeStr, _ = ModeToString(MaxDepthToMode(maxDepth)) + return modeStr, true, nil } - if mode == Recursive { + + if isRecursive { return "", false, nil } @@ -332,13 +404,20 @@ func (p *pinner) isPinnedWithType(c *cid.Cid, mode Mode) (string, bool, error) { // Default is Indirect visitedSet := cid.NewSet() - for _, rc := range p.recursePin.Keys() { - has, err := hasChild(p.dserv, rc, c, visitedSet.Visit) + + for _, recPin := range p.recursePin.RecPins() { + has, err := hasChild( + p.dserv, + recPin.Cid, // root + c, // child + recPin.MaxDepth, + visitedSet.Visit, + ) if err != nil { return "", false, err } if has { - return rc.String(), true, nil + return recPin.Cid.String(), true, nil } } return "", false, nil @@ -354,8 +433,13 @@ func (p *pinner) CheckIfPinned(cids ...*cid.Cid) ([]Pinned, error) { // First check for non-Indirect pins directly for _, c := range cids { - if p.recursePin.Has(c) { - pinned = append(pinned, Pinned{Key: c, Mode: Recursive}) + maxDepth, ok := p.recursePin.MaxDepth(c) + if ok { + if maxDepth < 0 { + pinned = append(pinned, Pinned{Key: c, Mode: Recursive}) + } else { + pinned = append(pinned, Pinned{Key: c, Mode: MaxDepthToMode(maxDepth)}) + } } else if p.directPin.Has(c) { pinned = append(pinned, Pinned{Key: c, Mode: Direct}) } else if p.isInternalPin(c) { @@ -366,8 +450,16 @@ func (p *pinner) CheckIfPinned(cids ...*cid.Cid) ([]Pinned, error) { } // Now walk all recursive pins to check for indirect pins - var checkChildren func(*cid.Cid, *cid.Cid) error - checkChildren = func(rk, parentKey *cid.Cid) error { + var checkChildren func(*cid.Cid, *cid.Cid, int) error + checkChildren = func(rk, parentKey *cid.Cid, maxDepth int) error { + if maxDepth == 0 { + return nil + } + + if maxDepth > 0 { // ignore depth limit -1 + maxDepth-- + } + links, err := ipld.GetLinks(context.TODO(), p.dserv, parentKey) if err != nil { return err @@ -381,7 +473,7 @@ func (p *pinner) CheckIfPinned(cids ...*cid.Cid) ([]Pinned, error) { toCheck.Remove(c) } - err := checkChildren(rk, c) + err := checkChildren(rk, c, maxDepth) if err != nil { return err } @@ -393,8 +485,8 @@ func (p *pinner) CheckIfPinned(cids ...*cid.Cid) ([]Pinned, error) { return nil } - for _, rk := range p.recursePin.Keys() { - err := checkChildren(rk, rk) + for _, recPin := range p.recursePin.RecPins() { + err := checkChildren(recPin.Cid, recPin.Cid, recPin.MaxDepth) if err != nil { return nil, err } @@ -471,20 +563,34 @@ func LoadPinner(d ds.Datastore, dserv, internal ipld.DAGService) (Pinner, error) internalset.Add(rootCid) recordInternal := internalset.Add - { // load recursive set - recurseKeys, err := loadSet(ctx, internal, rootpb, linkRecursive, recordInternal) - if err != nil { - return nil, fmt.Errorf("cannot load recursive pins: %v", err) + p.recursePin = recpinset.New() + + for _, link := range rootpb.Links() { + mode, ok := StringToMode(link.Name) + if !ok { + continue } - p.recursePin = cidSetWithValues(recurseKeys) - } - { // load direct set - directKeys, err := loadSet(ctx, internal, rootpb, linkDirect, recordInternal) - if err != nil { - return nil, fmt.Errorf("cannot load direct pins: %v", err) + switch { + case mode == Recursive || mode >= RecursiveN: + depthLimit := -1 // Recursive + if mode >= RecursiveN { + depthLimit = ModeToMaxDepth(mode) + } + recurseKeys, err := loadSet(ctx, internal, rootpb, link.Name, recordInternal) + if err != nil { + return nil, fmt.Errorf("cannot load recursive pins: %v", err) + } + for _, c := range recurseKeys { + p.recursePin.Add(c, depthLimit) + } + case mode == Direct: + directKeys, err := loadSet(ctx, internal, rootpb, linkDirect, recordInternal) + if err != nil { + return nil, fmt.Errorf("cannot load direct pins: %v", err) + } + p.directPin = cidSetWithValues(directKeys) } - p.directPin = cidSetWithValues(directKeys) } p.internalPin = internalset @@ -502,9 +608,15 @@ func (p *pinner) DirectKeys() []*cid.Cid { return p.directPin.Keys() } -// RecursiveKeys returns a slice containing the recursively pinned keys -func (p *pinner) RecursiveKeys() []*cid.Cid { - return p.recursePin.Keys() +// RecursivePins returns a slice containing the recursively pinned keys +func (p *pinner) RecursivePins() []*recpinset.RecPin { + return p.recursePin.RecPins() +} + +// RecursiveWithLimitKeys returns a slice containing the recursively +// pinned keys along with their depth limit +func (p *pinner) RecursiveWithLimitKeys() []*recpinset.RecPin { + return p.recursePin.RecPins() } // Update updates a recursive pin from one cid to another @@ -523,7 +635,7 @@ func (p *pinner) Update(ctx context.Context, from, to *cid.Cid, unpin bool) erro return err } - p.recursePin.Add(to) + p.recursePin.Add(to, -1) if unpin { p.recursePin.Remove(from) } @@ -552,12 +664,25 @@ func (p *pinner) Flush() error { } { - n, err := storeSet(ctx, p.internal, p.recursePin.Keys(), recordInternal) - if err != nil { - return err + depthLimits := make(map[int][]*cid.Cid) + for _, recPin := range p.recursePin.RecPins() { + depth := recPin.MaxDepth + set := depthLimits[depth] + depthLimits[depth] = append(set, recPin.Cid) } - if err := root.AddNodeLink(linkRecursive, n); err != nil { - return err + + for depth, set := range depthLimits { + n, err := storeSet(ctx, p.internal, set, recordInternal) + if err != nil { + return err + } + linkName := linkRecursive + if depth >= 0 { + linkName, _ = ModeToString(MaxDepthToMode(depth)) + } + if err := root.AddNodeLink(linkName, n); err != nil { + return err + } } } @@ -597,17 +722,27 @@ func (p *pinner) InternalPins() []*cid.Cid { func (p *pinner) PinWithMode(c *cid.Cid, mode Mode) { p.lock.Lock() defer p.lock.Unlock() - switch mode { - case Recursive: - p.recursePin.Add(c) - case Direct: + switch { + case mode == Recursive: + p.recursePin.Add(c, -1) + case mode >= RecursiveN: + p.recursePin.Add(c, ModeToMaxDepth(mode)) + case mode == Direct: p.directPin.Add(c) } } // hasChild recursively looks for a Cid among the children of a root Cid. // The visit function can be used to shortcut already-visited branches. -func hasChild(ng ipld.NodeGetter, root *cid.Cid, child *cid.Cid, visit func(*cid.Cid) bool) (bool, error) { +func hasChild(ng ipld.NodeGetter, root *cid.Cid, child *cid.Cid, depthLimit int, visit func(*cid.Cid) bool) (bool, error) { + if depthLimit == 0 { + return false, nil + } + + if depthLimit > 0 { // ignore negative depthLimits + depthLimit-- + } + links, err := ipld.GetLinks(context.TODO(), ng, root) if err != nil { return false, err @@ -618,7 +753,7 @@ func hasChild(ng ipld.NodeGetter, root *cid.Cid, child *cid.Cid, visit func(*cid return true, nil } if visit(c) { - has, err := hasChild(ng, c, child, visit) + has, err := hasChild(ng, c, child, depthLimit, visit) if err != nil { return false, err } diff --git a/thirdparty/recpinset/recpinset.go b/thirdparty/recpinset/recpinset.go new file mode 100644 index 000000000000..d6b5fcbc7dc0 --- /dev/null +++ b/thirdparty/recpinset/recpinset.go @@ -0,0 +1,130 @@ +package recpinset + +import ( + cid "gx/ipfs/QmapdYm1b22Frv3k17fqrBYTFRxwiaVJkB299Mfn33edeB/go-cid" +) + +// Set stores a set of recursive pins (Cid,MaxDepth tuples), indexed by Cid. +// A MaxDepth of -1 means fully no-limit recursive pin. +type Set struct { + set map[string]int +} + +// RecPin represents a recursive Pin, that is, a CID and a MaxDepth that +// limits the depth to which the CID is pinned. A negative MaxDepth represents +// a unlimited-depth pin. +type RecPin struct { + Cid *cid.Cid + MaxDepth int +} + +// New initializes and returns a new Set. +func New() *Set { + return &Set{set: make(map[string]int)} +} + +// Add puts a Cid and the maxDepth associated to it in the Set. +func (s *Set) Add(c *cid.Cid, maxDepth int) { + s.set[string(c.Bytes())] = maxDepth +} + +// Has returns if the Set contains a given Cid. +func (s *Set) Has(c *cid.Cid) bool { + _, ok := s.set[string(c.Bytes())] + return ok +} + +// Get returns the RecPin associated to the given Cid. +func (s *Set) Get(c *cid.Cid) (*RecPin, bool) { + md, ok := s.set[string(c.Bytes())] + if !ok { + return nil, false + } + + return &RecPin{c, md}, ok +} + +// MaxDepth returns the MaxDepth associated to the given Cid. +func (s *Set) MaxDepth(c *cid.Cid) (int, bool) { + md, ok := s.set[string(c.Bytes())] + return md, ok +} + +// Remove deletes a Cid from the Set. +func (s *Set) Remove(c *cid.Cid) { + delete(s.set, string(c.Bytes())) +} + +// Len returns how many elements the Set has. +func (s *Set) Len() int { + return len(s.set) +} + +// Keys returns the Cids in the set. +func (s *Set) Keys() []*cid.Cid { + out := make([]*cid.Cid, 0, len(s.set)) + for k := range s.set { + c, _ := cid.Cast([]byte(k)) + out = append(out, c) + } + return out +} + +// RecPins returns all the Cid+MaxDepths stored in the set, +// in a slice of RecPins. +func (s *Set) RecPins() []*RecPin { + out := make([]*RecPin, 0, len(s.set)) + for k, v := range s.set { + c, _ := cid.Cast([]byte(k)) + out = append(out, &RecPin{c, v}) + } + return out +} + +// Visit adds a cid and maxDepth to the set if: +// - it is not already in the set +// - if it's in the set but the new maxDepth is greater than +// the existing. +// It returns true if the RecPin has been added. +func (s *Set) Visit(c *cid.Cid, maxDepth int) bool { + curMaxDepth, ok := s.set[string(c.Bytes())] + + if !ok || IsDeeper(maxDepth, curMaxDepth) { + s.Add(c, maxDepth) + return true + } + + return false +} + +// ForEach allows to run a custom function on each +// Cid in the set. +func (s *Set) ForEach(f func(c *cid.Cid, maxDepth int) error) error { + for cs, md := range s.set { + c, _ := cid.Cast([]byte(cs)) + err := f(c, md) + if err != nil { + return err + } + } + return nil +} + +// Returns true if d1 is deeper than d2 +// Takes into account that -1 is deeper than anything. +func IsDeeper(d1, d2 int) bool { + // if d2 is negative, nothing is deeper: no + if d2 < 0 { + return false + } + + // d2 is >= 0 here. + + // if d1 is negative, yes + if d1 < 0 { + return true + } + + // if d1 > d2, yes + return d1 > d2 +}