From 3899194cb0aec37617576c4c612f2301479aeb83 Mon Sep 17 00:00:00 2001 From: Kevin Atkinson Date: Fri, 19 Aug 2016 15:52:27 -0400 Subject: [PATCH 1/3] Add DAGService.GetLinks() method and use it in the GC and elsewhere. This method will use the (also new) LinkService if it is available to retrieving just the links for a MerkleDAG without necessary having to retrieve the underlying block. For now the main benefit is that the pinner will not break when a block becomes invalid due to a change in the backing file. This is possible because the metadata for a block (that includes the Links) is stored separately and thus always available even if the backing file changes. License: MIT Signed-off-by: Kevin Atkinson --- core/commands/pin.go | 5 ++--- core/core.go | 17 ++++++++-------- core/corerepo/gc.go | 4 ++-- core/coreunix/add_test.go | 4 ++-- merkledag/merkledag.go | 39 ++++++++++++++++++++++++++++++------- merkledag/merkledag_test.go | 4 ++-- pin/gc/gc.go | 7 ++++--- pin/pin.go | 16 +++++++-------- 8 files changed, 61 insertions(+), 35 deletions(-) diff --git a/core/commands/pin.go b/core/commands/pin.go index dba542abf1b..71f4a4706e0 100644 --- a/core/commands/pin.go +++ b/core/commands/pin.go @@ -328,12 +328,11 @@ func pinLsAll(typeStr string, ctx context.Context, n *core.IpfsNode) (map[string if typeStr == "indirect" || typeStr == "all" { set := cid.NewSet() for _, k := range n.Pinning.RecursiveKeys() { - nd, err := n.DAG.Get(ctx, k) + links, err := n.DAG.GetLinks(ctx, k) if err != nil { return nil, err } - - err = dag.EnumerateChildren(n.Context(), n.DAG, nd, set.Visit, false) + err = dag.EnumerateChildren(n.Context(), n.DAG, links, set.Visit, false) if err != nil { return nil, err } diff --git a/core/core.go b/core/core.go index b4ee113c93e..e99f582e577 100644 --- a/core/core.go +++ b/core/core.go @@ -94,14 +94,15 @@ type IpfsNode struct { PrivateKey ic.PrivKey // the local node's private Key // Services - Peerstore pstore.Peerstore // storage for other Peer instances - Blockstore bstore.GCBlockstore // the block store (lower level) - Blocks *bserv.BlockService // the block service, get/add blocks. - DAG merkledag.DAGService // the merkle dag service, get/add objects. - Resolver *path.Resolver // the path resolution system - Reporter metrics.Reporter - Discovery discovery.Service - FilesRoot *mfs.Root + Peerstore pstore.Peerstore // storage for other Peer instances + Blockstore bstore.GCBlockstore // the block store (lower level) + Blocks *bserv.BlockService // the block service, get/add blocks. + DAG merkledag.DAGService // the merkle dag service, get/add objects. + LinkService merkledag.LinkService + Resolver *path.Resolver // the path resolution system + Reporter metrics.Reporter + Discovery discovery.Service + FilesRoot *mfs.Root // Online PeerHost p2phost.Host // the network host (server+client) diff --git a/core/corerepo/gc.go b/core/corerepo/gc.go index 148515b754d..dd4927d58db 100644 --- a/core/corerepo/gc.go +++ b/core/corerepo/gc.go @@ -90,7 +90,7 @@ func GarbageCollect(n *core.IpfsNode, ctx context.Context) error { if err != nil { return err } - rmed, err := gc.GC(ctx, n.Blockstore, n.Pinning, roots) + rmed, err := gc.GC(ctx, n.Blockstore, n.LinkService, n.Pinning, roots) if err != nil { return err } @@ -113,7 +113,7 @@ func GarbageCollectAsync(n *core.IpfsNode, ctx context.Context) (<-chan *KeyRemo if err != nil { return nil, err } - rmed, err := gc.GC(ctx, n.Blockstore, n.Pinning, roots) + rmed, err := gc.GC(ctx, n.Blockstore, n.LinkService, n.Pinning, roots) if err != nil { return nil, err } diff --git a/core/coreunix/add_test.go b/core/coreunix/add_test.go index f2319bc2db2..135ee8dbf8e 100644 --- a/core/coreunix/add_test.go +++ b/core/coreunix/add_test.go @@ -98,7 +98,7 @@ func TestAddGCLive(t *testing.T) { gcstarted := make(chan struct{}) go func() { defer close(gcstarted) - gcchan, err := gc.GC(context.Background(), node.Blockstore, node.Pinning, nil) + gcchan, err := gc.GC(context.Background(), node.Blockstore, node.LinkService, node.Pinning, nil) if err != nil { log.Error("GC ERROR:", err) errs <- err @@ -162,7 +162,7 @@ func TestAddGCLive(t *testing.T) { } set := cid.NewSet() - err = dag.EnumerateChildren(ctx, node.DAG, root, set.Visit, false) + err = dag.EnumerateChildren(ctx, node.DAG, root.Links, set.Visit, false) if err != nil { t.Fatal(err) } diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index bd415f49f58..f32104d8684 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -23,6 +23,10 @@ type DAGService interface { Get(context.Context, *cid.Cid) (*Node, error) Remove(*Node) error + // Return all links for a node, may be more effect than + // calling Get + GetLinks(context.Context, *cid.Cid) ([]*Link, error) + // GetDAG returns, in order, all the single leve child // nodes of the passed in node. GetMany(context.Context, []*cid.Cid) <-chan *NodeOption @@ -30,8 +34,14 @@ type DAGService interface { Batch() *Batch } -func NewDAGService(bs *bserv.BlockService) DAGService { - return &dagService{bs} +// A LinkService returns the links for a node if they are available +// locally without having to retrieve the block from the datastore. +type LinkService interface { + Get(*cid.Cid) ([]*Link, error) +} + +func NewDAGService(bs *bserv.BlockService) *dagService { + return &dagService{Blocks: bs} } // dagService is an IPFS Merkle DAG service. @@ -40,7 +50,8 @@ func NewDAGService(bs *bserv.BlockService) DAGService { // TODO: should cache Nodes that are in memory, and be // able to free some of them when vm pressure is high type dagService struct { - Blocks *bserv.BlockService + Blocks *bserv.BlockService + LinkService LinkService } // Add adds a node to the dagService, storing the block in the BlockService @@ -93,6 +104,20 @@ func (n *dagService) Get(ctx context.Context, c *cid.Cid) (*Node, error) { return res, nil } +func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*Link, error) { + if n.LinkService != nil { + links, err := n.LinkService.Get(c) + if err == nil { + return links, nil + } + } + node, err := n.Get(ctx, c) + if err != nil { + return nil, err + } + return node.Links, nil +} + func (n *dagService) Remove(nd *Node) error { return n.Blocks.DeleteObject(nd) } @@ -366,11 +391,11 @@ func legacyCidFromLink(lnk *Link) *cid.Cid { // EnumerateChildren will walk the dag below the given root node and add all // unseen children to the passed in set. // TODO: parallelize to avoid disk latency perf hits? -func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, visit func(*cid.Cid) bool, bestEffort bool) error { - for _, lnk := range root.Links { +func EnumerateChildren(ctx context.Context, ds DAGService, links []*Link, visit func(*cid.Cid) bool, bestEffort bool) error { + for _, lnk := range links { c := legacyCidFromLink(lnk) if visit(c) { - child, err := ds.Get(ctx, c) + children, err := ds.GetLinks(ctx, c) if err != nil { if bestEffort && err == ErrNotFound { continue @@ -378,7 +403,7 @@ func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, visit fun return err } } - err = EnumerateChildren(ctx, ds, child, visit, bestEffort) + err = EnumerateChildren(ctx, ds, children, visit, bestEffort) if err != nil { return err } diff --git a/merkledag/merkledag_test.go b/merkledag/merkledag_test.go index 339c056090a..f58bc56bd56 100644 --- a/merkledag/merkledag_test.go +++ b/merkledag/merkledag_test.go @@ -241,7 +241,7 @@ func TestFetchGraph(t *testing.T) { offline_ds := NewDAGService(bs) - err = EnumerateChildren(context.Background(), offline_ds, root, func(_ *cid.Cid) bool { return true }, false) + err = EnumerateChildren(context.Background(), offline_ds, root.Links, func(_ *cid.Cid) bool { return true }, false) if err != nil { t.Fatal(err) } @@ -258,7 +258,7 @@ func TestEnumerateChildren(t *testing.T) { } set := cid.NewSet() - err = EnumerateChildren(context.Background(), ds, root, set.Visit, false) + err = EnumerateChildren(context.Background(), ds, root.Links, set.Visit, false) if err != nil { t.Fatal(err) } diff --git a/pin/gc/gc.go b/pin/gc/gc.go index 7bfde538c10..ef57cf6ad53 100644 --- a/pin/gc/gc.go +++ b/pin/gc/gc.go @@ -24,11 +24,12 @@ var log = logging.Logger("gc") // // The routine then iterates over every block in the blockstore and // deletes any block that is not found in the marked set. -func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner, bestEffortRoots []*cid.Cid) (<-chan key.Key, error) { +func GC(ctx context.Context, bs bstore.GCBlockstore, ls dag.LinkService, pn pin.Pinner, bestEffortRoots []*cid.Cid) (<-chan key.Key, error) { unlocker := bs.GCLock() bsrv := bserv.New(bs, offline.Exchange(bs)) ds := dag.NewDAGService(bsrv) + ds.LinkService = ls gcs, err := ColoredSet(ctx, pn, ds, bestEffortRoots) if err != nil { @@ -74,13 +75,13 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner, bestEffortRo func Descendants(ctx context.Context, ds dag.DAGService, set key.KeySet, roots []*cid.Cid, bestEffort bool) error { for _, c := range roots { set.Add(key.Key(c.Hash())) - nd, err := ds.Get(ctx, c) + links, err := ds.GetLinks(ctx, c) if err != nil { return err } // EnumerateChildren recursively walks the dag and adds the keys to the given set - err = dag.EnumerateChildren(ctx, ds, nd, func(c *cid.Cid) bool { + err = dag.EnumerateChildren(ctx, ds, links, func(c *cid.Cid) bool { k := key.Key(c.Hash()) seen := set.Has(k) if seen { diff --git a/pin/pin.go b/pin/pin.go index 6edd66abc29..ab949ec40ed 100644 --- a/pin/pin.go +++ b/pin/pin.go @@ -279,12 +279,12 @@ func (p *pinner) isPinnedWithType(c *cid.Cid, mode PinMode) (string, bool, error // Default is Indirect for _, rc := range p.recursePin.Keys() { - rnd, err := p.dserv.Get(context.Background(), rc) + links, err := p.dserv.GetLinks(context.Background(), rc) if err != nil { return "", false, err } - has, err := hasChild(p.dserv, rnd, k) + has, err := hasChild(p.dserv, links, k) if err != nil { return "", false, err } @@ -317,11 +317,11 @@ func (p *pinner) CheckIfPinned(cids ...*cid.Cid) ([]Pinned, error) { // Now walk all recursive pins to check for indirect pins var checkChildren func(*cid.Cid, *cid.Cid) error checkChildren = func(rk, parentKey *cid.Cid) error { - parent, err := p.dserv.Get(context.Background(), parentKey) + links, err := p.dserv.GetLinks(context.Background(), parentKey) if err != nil { return err } - for _, lnk := range parent.Links { + for _, lnk := range links { c := cid.NewCidV0(lnk.Hash) if toCheck.Has(c) { @@ -521,19 +521,19 @@ func (p *pinner) PinWithMode(c *cid.Cid, mode PinMode) { } } -func hasChild(ds mdag.DAGService, root *mdag.Node, child key.Key) (bool, error) { - for _, lnk := range root.Links { +func hasChild(ds mdag.DAGService, links []*mdag.Link, child key.Key) (bool, error) { + for _, lnk := range links { c := cid.NewCidV0(lnk.Hash) if key.Key(c.Hash()) == child { return true, nil } - nd, err := ds.Get(context.Background(), c) + children, err := ds.GetLinks(context.Background(), c) if err != nil { return false, err } - has, err := hasChild(ds, nd, child) + has, err := hasChild(ds, children, child) if err != nil { return false, err } From 721df367a27dfa416e1d93df23ca440fc2431f63 Mon Sep 17 00:00:00 2001 From: Kevin Atkinson Date: Fri, 30 Sep 2016 17:06:12 -0400 Subject: [PATCH 2/3] Don't use a separate LinkService for DAGService.GetLinks() Instead make LinkService a part of DAGService. The LinkService is now simply an interface that DAGService implements. Also provide a GetOfflineLinkService() method that the GC uses to get an offline instance. License: MIT Signed-off-by: Kevin Atkinson --- core/core.go | 17 ++++++++--------- core/corerepo/gc.go | 4 ++-- core/coreunix/add_test.go | 2 +- exchange/bitswap/bitswap.go | 4 ++++ exchange/interface.go | 2 ++ exchange/offline/offline.go | 4 ++++ merkledag/merkledag.go | 35 +++++++++++++++++++---------------- pin/gc/gc.go | 22 +++++++++------------- pin/pin.go | 2 +- 9 files changed, 50 insertions(+), 42 deletions(-) diff --git a/core/core.go b/core/core.go index e99f582e577..b4ee113c93e 100644 --- a/core/core.go +++ b/core/core.go @@ -94,15 +94,14 @@ type IpfsNode struct { PrivateKey ic.PrivKey // the local node's private Key // Services - Peerstore pstore.Peerstore // storage for other Peer instances - Blockstore bstore.GCBlockstore // the block store (lower level) - Blocks *bserv.BlockService // the block service, get/add blocks. - DAG merkledag.DAGService // the merkle dag service, get/add objects. - LinkService merkledag.LinkService - Resolver *path.Resolver // the path resolution system - Reporter metrics.Reporter - Discovery discovery.Service - FilesRoot *mfs.Root + Peerstore pstore.Peerstore // storage for other Peer instances + Blockstore bstore.GCBlockstore // the block store (lower level) + Blocks *bserv.BlockService // the block service, get/add blocks. + DAG merkledag.DAGService // the merkle dag service, get/add objects. + Resolver *path.Resolver // the path resolution system + Reporter metrics.Reporter + Discovery discovery.Service + FilesRoot *mfs.Root // Online PeerHost p2phost.Host // the network host (server+client) diff --git a/core/corerepo/gc.go b/core/corerepo/gc.go index dd4927d58db..a48bdd07467 100644 --- a/core/corerepo/gc.go +++ b/core/corerepo/gc.go @@ -90,7 +90,7 @@ func GarbageCollect(n *core.IpfsNode, ctx context.Context) error { if err != nil { return err } - rmed, err := gc.GC(ctx, n.Blockstore, n.LinkService, n.Pinning, roots) + rmed, err := gc.GC(ctx, n.Blockstore, n.DAG, n.Pinning, roots) if err != nil { return err } @@ -113,7 +113,7 @@ func GarbageCollectAsync(n *core.IpfsNode, ctx context.Context) (<-chan *KeyRemo if err != nil { return nil, err } - rmed, err := gc.GC(ctx, n.Blockstore, n.LinkService, n.Pinning, roots) + rmed, err := gc.GC(ctx, n.Blockstore, n.DAG, n.Pinning, roots) if err != nil { return nil, err } diff --git a/core/coreunix/add_test.go b/core/coreunix/add_test.go index 135ee8dbf8e..e4b575de083 100644 --- a/core/coreunix/add_test.go +++ b/core/coreunix/add_test.go @@ -98,7 +98,7 @@ func TestAddGCLive(t *testing.T) { gcstarted := make(chan struct{}) go func() { defer close(gcstarted) - gcchan, err := gc.GC(context.Background(), node.Blockstore, node.LinkService, node.Pinning, nil) + gcchan, err := gc.GC(context.Background(), node.Blockstore, node.DAG, node.Pinning, nil) if err != nil { log.Error("GC ERROR:", err) errs <- err diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index f832e078733..21e4e9bdfe4 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -422,3 +422,7 @@ func (bs *Bitswap) GetWantlist() []key.Key { } return out } + +func (bs *Bitswap) IsOnline() bool { + return true +} diff --git a/exchange/interface.go b/exchange/interface.go index f2edc569b0a..23d830466d1 100644 --- a/exchange/interface.go +++ b/exchange/interface.go @@ -22,5 +22,7 @@ type Interface interface { // type Exchanger interface // available on the network? HasBlock(blocks.Block) error + IsOnline() bool + io.Closer } diff --git a/exchange/offline/offline.go b/exchange/offline/offline.go index b483e1825ac..190d5bfa245 100644 --- a/exchange/offline/offline.go +++ b/exchange/offline/offline.go @@ -67,3 +67,7 @@ func (e *offlineExchange) GetBlocks(ctx context.Context, ks []key.Key) (<-chan b }() return out, nil } + +func (e *offlineExchange) IsOnline() bool { + return false +} diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index f32104d8684..5f7e55fc2b1 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -7,6 +7,7 @@ import ( "sync" bserv "github.com/ipfs/go-ipfs/blockservice" + offline "github.com/ipfs/go-ipfs/exchange/offline" key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" "context" @@ -23,21 +24,21 @@ type DAGService interface { Get(context.Context, *cid.Cid) (*Node, error) Remove(*Node) error - // Return all links for a node, may be more effect than - // calling Get - GetLinks(context.Context, *cid.Cid) ([]*Link, error) - // GetDAG returns, in order, all the single leve child // nodes of the passed in node. GetMany(context.Context, []*cid.Cid) <-chan *NodeOption Batch() *Batch + + LinkService } -// A LinkService returns the links for a node if they are available -// locally without having to retrieve the block from the datastore. type LinkService interface { - Get(*cid.Cid) ([]*Link, error) + // Return all links for a node, may be more effect than + // calling Get in DAGService + GetLinks(context.Context, *cid.Cid) ([]*Link, error) + + GetOfflineLinkService() LinkService } func NewDAGService(bs *bserv.BlockService) *dagService { @@ -50,8 +51,7 @@ func NewDAGService(bs *bserv.BlockService) *dagService { // TODO: should cache Nodes that are in memory, and be // able to free some of them when vm pressure is high type dagService struct { - Blocks *bserv.BlockService - LinkService LinkService + Blocks *bserv.BlockService } // Add adds a node to the dagService, storing the block in the BlockService @@ -105,12 +105,6 @@ func (n *dagService) Get(ctx context.Context, c *cid.Cid) (*Node, error) { } func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*Link, error) { - if n.LinkService != nil { - links, err := n.LinkService.Get(c) - if err == nil { - return links, nil - } - } node, err := n.Get(ctx, c) if err != nil { return nil, err @@ -118,6 +112,15 @@ func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*Link, error) return node.Links, nil } +func (n *dagService) GetOfflineLinkService() LinkService { + if n.Blocks.Exchange.IsOnline() { + bsrv := bserv.New(n.Blocks.Blockstore, offline.Exchange(n.Blocks.Blockstore)) + return NewDAGService(bsrv) + } else { + return n + } +} + func (n *dagService) Remove(nd *Node) error { return n.Blocks.DeleteObject(nd) } @@ -391,7 +394,7 @@ func legacyCidFromLink(lnk *Link) *cid.Cid { // EnumerateChildren will walk the dag below the given root node and add all // unseen children to the passed in set. // TODO: parallelize to avoid disk latency perf hits? -func EnumerateChildren(ctx context.Context, ds DAGService, links []*Link, visit func(*cid.Cid) bool, bestEffort bool) error { +func EnumerateChildren(ctx context.Context, ds LinkService, links []*Link, visit func(*cid.Cid) bool, bestEffort bool) error { for _, lnk := range links { c := legacyCidFromLink(lnk) if visit(c) { diff --git a/pin/gc/gc.go b/pin/gc/gc.go index ef57cf6ad53..dac5e48bacb 100644 --- a/pin/gc/gc.go +++ b/pin/gc/gc.go @@ -2,8 +2,6 @@ package gc import ( bstore "github.com/ipfs/go-ipfs/blocks/blockstore" - bserv "github.com/ipfs/go-ipfs/blockservice" - offline "github.com/ipfs/go-ipfs/exchange/offline" dag "github.com/ipfs/go-ipfs/merkledag" pin "github.com/ipfs/go-ipfs/pin" key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" @@ -27,11 +25,9 @@ var log = logging.Logger("gc") func GC(ctx context.Context, bs bstore.GCBlockstore, ls dag.LinkService, pn pin.Pinner, bestEffortRoots []*cid.Cid) (<-chan key.Key, error) { unlocker := bs.GCLock() - bsrv := bserv.New(bs, offline.Exchange(bs)) - ds := dag.NewDAGService(bsrv) - ds.LinkService = ls + ls = ls.GetOfflineLinkService() - gcs, err := ColoredSet(ctx, pn, ds, bestEffortRoots) + gcs, err := ColoredSet(ctx, pn, ls, bestEffortRoots) if err != nil { return nil, err } @@ -72,16 +68,16 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, ls dag.LinkService, pn pin. return output, nil } -func Descendants(ctx context.Context, ds dag.DAGService, set key.KeySet, roots []*cid.Cid, bestEffort bool) error { +func Descendants(ctx context.Context, ls dag.LinkService, set key.KeySet, roots []*cid.Cid, bestEffort bool) error { for _, c := range roots { set.Add(key.Key(c.Hash())) - links, err := ds.GetLinks(ctx, c) + links, err := ls.GetLinks(ctx, c) if err != nil { return err } // EnumerateChildren recursively walks the dag and adds the keys to the given set - err = dag.EnumerateChildren(ctx, ds, links, func(c *cid.Cid) bool { + err = dag.EnumerateChildren(ctx, ls, links, func(c *cid.Cid) bool { k := key.Key(c.Hash()) seen := set.Has(k) if seen { @@ -98,16 +94,16 @@ func Descendants(ctx context.Context, ds dag.DAGService, set key.KeySet, roots [ return nil } -func ColoredSet(ctx context.Context, pn pin.Pinner, ds dag.DAGService, bestEffortRoots []*cid.Cid) (key.KeySet, error) { +func ColoredSet(ctx context.Context, pn pin.Pinner, ls dag.LinkService, bestEffortRoots []*cid.Cid) (key.KeySet, error) { // KeySet currently implemented in memory, in the future, may be bloom filter or // disk backed to conserve memory. gcs := key.NewKeySet() - err := Descendants(ctx, ds, gcs, pn.RecursiveKeys(), false) + err := Descendants(ctx, ls, gcs, pn.RecursiveKeys(), false) if err != nil { return nil, err } - err = Descendants(ctx, ds, gcs, bestEffortRoots, true) + err = Descendants(ctx, ls, gcs, bestEffortRoots, true) if err != nil { return nil, err } @@ -116,7 +112,7 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ds dag.DAGService, bestEffor gcs.Add(key.Key(k.Hash())) } - err = Descendants(ctx, ds, gcs, pn.InternalPins(), false) + err = Descendants(ctx, ls, gcs, pn.InternalPins(), false) if err != nil { return nil, err } diff --git a/pin/pin.go b/pin/pin.go index ab949ec40ed..cd55aaa9918 100644 --- a/pin/pin.go +++ b/pin/pin.go @@ -521,7 +521,7 @@ func (p *pinner) PinWithMode(c *cid.Cid, mode PinMode) { } } -func hasChild(ds mdag.DAGService, links []*mdag.Link, child key.Key) (bool, error) { +func hasChild(ds mdag.LinkService, links []*mdag.Link, child key.Key) (bool, error) { for _, lnk := range links { c := cid.NewCidV0(lnk.Hash) if key.Key(c.Hash()) == child { From 772164cc7a792498ed8da1b31756420a9cbff95c Mon Sep 17 00:00:00 2001 From: Kevin Atkinson Date: Mon, 3 Oct 2016 21:38:47 -0400 Subject: [PATCH 3/3] Fix EnumerateChildren & hasChild to take a *cid.Cid instead of []*mdag.Link Author: Kevin Atkinson Fix EnumerateChildren & hasChild to take a *cid.Cid instead of []*mdag.Link Author: Jeromy Johnson make FetchGraph use a cid pin: fix TestPinRecursiveFail License: MIT Signed-off-by: Jeromy License: MIT Signed-off-by: Kevin Atkinson --- core/commands/dht.go | 6 +----- core/commands/pin.go | 6 +----- core/coreunix/add_test.go | 6 +----- merkledag/merkledag.go | 29 ++++++++++++++++------------- merkledag/merkledag_test.go | 8 ++++---- pin/gc/gc.go | 6 +----- pin/pin.go | 22 ++++++++-------------- pin/pin_test.go | 5 +++++ 8 files changed, 37 insertions(+), 51 deletions(-) diff --git a/core/commands/dht.go b/core/commands/dht.go index d9311faa559..f0613fccf16 100644 --- a/core/commands/dht.go +++ b/core/commands/dht.go @@ -370,12 +370,8 @@ func provideKeysRec(ctx context.Context, r routing.IpfsRouting, dserv dag.DAGSer provided := cid.NewSet() for _, c := range cids { kset := cid.NewSet() - node, err := dserv.Get(ctx, c) - if err != nil { - return err - } - err = dag.EnumerateChildrenAsync(ctx, dserv, node, kset.Visit) + err := dag.EnumerateChildrenAsync(ctx, dserv, c, kset.Visit) if err != nil { return err } diff --git a/core/commands/pin.go b/core/commands/pin.go index 71f4a4706e0..bf6b86210e7 100644 --- a/core/commands/pin.go +++ b/core/commands/pin.go @@ -328,11 +328,7 @@ func pinLsAll(typeStr string, ctx context.Context, n *core.IpfsNode) (map[string if typeStr == "indirect" || typeStr == "all" { set := cid.NewSet() for _, k := range n.Pinning.RecursiveKeys() { - links, err := n.DAG.GetLinks(ctx, k) - if err != nil { - return nil, err - } - err = dag.EnumerateChildren(n.Context(), n.DAG, links, set.Visit, false) + err := dag.EnumerateChildren(n.Context(), n.DAG, k, set.Visit, false) if err != nil { return nil, err } diff --git a/core/coreunix/add_test.go b/core/coreunix/add_test.go index e4b575de083..c7b78439004 100644 --- a/core/coreunix/add_test.go +++ b/core/coreunix/add_test.go @@ -156,13 +156,9 @@ func TestAddGCLive(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) defer cancel() - root, err := node.DAG.Get(ctx, last) - if err != nil { - t.Fatal(err) - } set := cid.NewSet() - err = dag.EnumerateChildren(ctx, node.DAG, root.Links, set.Visit, false) + err = dag.EnumerateChildren(ctx, node.DAG, last, set.Visit, false) if err != nil { t.Fatal(err) } diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index 5f7e55fc2b1..c6a7e2654f6 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -126,8 +126,8 @@ func (n *dagService) Remove(nd *Node) error { } // FetchGraph fetches all nodes that are children of the given node -func FetchGraph(ctx context.Context, root *Node, serv DAGService) error { - return EnumerateChildrenAsync(ctx, serv, root, cid.NewSet().Visit) +func FetchGraph(ctx context.Context, c *cid.Cid, serv DAGService) error { + return EnumerateChildrenAsync(ctx, serv, c, cid.NewSet().Visit) } // FindLinks searches this nodes links for the given key, @@ -394,19 +394,17 @@ func legacyCidFromLink(lnk *Link) *cid.Cid { // EnumerateChildren will walk the dag below the given root node and add all // unseen children to the passed in set. // TODO: parallelize to avoid disk latency perf hits? -func EnumerateChildren(ctx context.Context, ds LinkService, links []*Link, visit func(*cid.Cid) bool, bestEffort bool) error { +func EnumerateChildren(ctx context.Context, ds LinkService, root *cid.Cid, visit func(*cid.Cid) bool, bestEffort bool) error { + links, err := ds.GetLinks(ctx, root) + if bestEffort && err == ErrNotFound { + return nil + } else if err != nil { + return err + } for _, lnk := range links { c := legacyCidFromLink(lnk) if visit(c) { - children, err := ds.GetLinks(ctx, c) - if err != nil { - if bestEffort && err == ErrNotFound { - continue - } else { - return err - } - } - err = EnumerateChildren(ctx, ds, children, visit, bestEffort) + err = EnumerateChildren(ctx, ds, c, visit, bestEffort) if err != nil { return err } @@ -415,7 +413,7 @@ func EnumerateChildren(ctx context.Context, ds LinkService, links []*Link, visit return nil } -func EnumerateChildrenAsync(ctx context.Context, ds DAGService, root *Node, visit func(*cid.Cid) bool) error { +func EnumerateChildrenAsync(ctx context.Context, ds DAGService, c *cid.Cid, visit func(*cid.Cid) bool) error { toprocess := make(chan []*cid.Cid, 8) nodes := make(chan *NodeOption, 8) @@ -425,6 +423,11 @@ func EnumerateChildrenAsync(ctx context.Context, ds DAGService, root *Node, visi go fetchNodes(ctx, ds, toprocess, nodes) + root, err := ds.Get(ctx, c) + if err != nil { + return err + } + nodes <- &NodeOption{Node: root} live := 1 diff --git a/merkledag/merkledag_test.go b/merkledag/merkledag_test.go index f58bc56bd56..006c8b5ca07 100644 --- a/merkledag/merkledag_test.go +++ b/merkledag/merkledag_test.go @@ -231,7 +231,7 @@ func TestFetchGraph(t *testing.T) { t.Fatal(err) } - err = FetchGraph(context.TODO(), root, dservs[1]) + err = FetchGraph(context.TODO(), root.Cid(), dservs[1]) if err != nil { t.Fatal(err) } @@ -241,7 +241,7 @@ func TestFetchGraph(t *testing.T) { offline_ds := NewDAGService(bs) - err = EnumerateChildren(context.Background(), offline_ds, root.Links, func(_ *cid.Cid) bool { return true }, false) + err = EnumerateChildren(context.Background(), offline_ds, root.Cid(), func(_ *cid.Cid) bool { return true }, false) if err != nil { t.Fatal(err) } @@ -258,7 +258,7 @@ func TestEnumerateChildren(t *testing.T) { } set := cid.NewSet() - err = EnumerateChildren(context.Background(), ds, root.Links, set.Visit, false) + err = EnumerateChildren(context.Background(), ds, root.Cid(), set.Visit, false) if err != nil { t.Fatal(err) } @@ -269,7 +269,7 @@ func TestEnumerateChildren(t *testing.T) { for _, lnk := range n.Links { c := cid.NewCidV0(lnk.Hash) if !set.Has(c) { - t.Fatal("missing key in set!") + t.Fatal("missing key in set! ", lnk.Hash.B58String()) } child, err := ds.Get(context.Background(), c) if err != nil { diff --git a/pin/gc/gc.go b/pin/gc/gc.go index dac5e48bacb..32b611f6544 100644 --- a/pin/gc/gc.go +++ b/pin/gc/gc.go @@ -71,13 +71,9 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, ls dag.LinkService, pn pin. func Descendants(ctx context.Context, ls dag.LinkService, set key.KeySet, roots []*cid.Cid, bestEffort bool) error { for _, c := range roots { set.Add(key.Key(c.Hash())) - links, err := ls.GetLinks(ctx, c) - if err != nil { - return err - } // EnumerateChildren recursively walks the dag and adds the keys to the given set - err = dag.EnumerateChildren(ctx, ls, links, func(c *cid.Cid) bool { + err := dag.EnumerateChildren(ctx, ls, c, func(c *cid.Cid) bool { k := key.Key(c.Hash()) seen := set.Has(k) if seen { diff --git a/pin/pin.go b/pin/pin.go index cd55aaa9918..6e73929a6c1 100644 --- a/pin/pin.go +++ b/pin/pin.go @@ -178,7 +178,7 @@ func (p *pinner) Pin(ctx context.Context, node *mdag.Node, recurse bool) error { } // fetch entire graph - err := mdag.FetchGraph(ctx, node, p.dserv) + err := mdag.FetchGraph(ctx, c, p.dserv) if err != nil { return err } @@ -279,12 +279,7 @@ func (p *pinner) isPinnedWithType(c *cid.Cid, mode PinMode) (string, bool, error // Default is Indirect for _, rc := range p.recursePin.Keys() { - links, err := p.dserv.GetLinks(context.Background(), rc) - if err != nil { - return "", false, err - } - - has, err := hasChild(p.dserv, links, k) + has, err := hasChild(p.dserv, rc, k) if err != nil { return "", false, err } @@ -521,19 +516,18 @@ func (p *pinner) PinWithMode(c *cid.Cid, mode PinMode) { } } -func hasChild(ds mdag.LinkService, links []*mdag.Link, child key.Key) (bool, error) { +func hasChild(ds mdag.LinkService, root *cid.Cid, child key.Key) (bool, error) { + links, err := ds.GetLinks(context.Background(), root) + if err != nil { + return false, err + } for _, lnk := range links { c := cid.NewCidV0(lnk.Hash) if key.Key(c.Hash()) == child { return true, nil } - children, err := ds.GetLinks(context.Background(), c) - if err != nil { - return false, err - } - - has, err := hasChild(ds, children, child) + has, err := hasChild(ds, c, child) if err != nil { return false, err } diff --git a/pin/pin_test.go b/pin/pin_test.go index 185b27a4619..01cbeb79e93 100644 --- a/pin/pin_test.go +++ b/pin/pin_test.go @@ -225,6 +225,11 @@ func TestPinRecursiveFail(t *testing.T) { t.Fatal(err) } + _, err = dserv.Add(a) + if err != nil { + t.Fatal(err) + } + // this one is time based... but shouldnt cause any issues mctx, _ = context.WithTimeout(ctx, time.Second) err = p.Pin(mctx, a, true)