diff --git a/core/core.go b/core/core.go index e99f582e577..b4ee113c93e 100644 --- a/core/core.go +++ b/core/core.go @@ -94,15 +94,14 @@ type IpfsNode struct { PrivateKey ic.PrivKey // the local node's private Key // Services - Peerstore pstore.Peerstore // storage for other Peer instances - Blockstore bstore.GCBlockstore // the block store (lower level) - Blocks *bserv.BlockService // the block service, get/add blocks. - DAG merkledag.DAGService // the merkle dag service, get/add objects. - LinkService merkledag.LinkService - Resolver *path.Resolver // the path resolution system - Reporter metrics.Reporter - Discovery discovery.Service - FilesRoot *mfs.Root + Peerstore pstore.Peerstore // storage for other Peer instances + Blockstore bstore.GCBlockstore // the block store (lower level) + Blocks *bserv.BlockService // the block service, get/add blocks. + DAG merkledag.DAGService // the merkle dag service, get/add objects. + Resolver *path.Resolver // the path resolution system + Reporter metrics.Reporter + Discovery discovery.Service + FilesRoot *mfs.Root // Online PeerHost p2phost.Host // the network host (server+client) diff --git a/core/corerepo/gc.go b/core/corerepo/gc.go index dd4927d58db..a48bdd07467 100644 --- a/core/corerepo/gc.go +++ b/core/corerepo/gc.go @@ -90,7 +90,7 @@ func GarbageCollect(n *core.IpfsNode, ctx context.Context) error { if err != nil { return err } - rmed, err := gc.GC(ctx, n.Blockstore, n.LinkService, n.Pinning, roots) + rmed, err := gc.GC(ctx, n.Blockstore, n.DAG, n.Pinning, roots) if err != nil { return err } @@ -113,7 +113,7 @@ func GarbageCollectAsync(n *core.IpfsNode, ctx context.Context) (<-chan *KeyRemo if err != nil { return nil, err } - rmed, err := gc.GC(ctx, n.Blockstore, n.LinkService, n.Pinning, roots) + rmed, err := gc.GC(ctx, n.Blockstore, n.DAG, n.Pinning, roots) if err != nil { return nil, err } diff --git a/core/coreunix/add_test.go b/core/coreunix/add_test.go index 135ee8dbf8e..e4b575de083 100644 --- a/core/coreunix/add_test.go +++ b/core/coreunix/add_test.go @@ -98,7 +98,7 @@ func TestAddGCLive(t *testing.T) { gcstarted := make(chan struct{}) go func() { defer close(gcstarted) - gcchan, err := gc.GC(context.Background(), node.Blockstore, node.LinkService, node.Pinning, nil) + gcchan, err := gc.GC(context.Background(), node.Blockstore, node.DAG, node.Pinning, nil) if err != nil { log.Error("GC ERROR:", err) errs <- err diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index f832e078733..21e4e9bdfe4 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -422,3 +422,7 @@ func (bs *Bitswap) GetWantlist() []key.Key { } return out } + +func (bs *Bitswap) IsOnline() bool { + return true +} diff --git a/exchange/interface.go b/exchange/interface.go index f2edc569b0a..23d830466d1 100644 --- a/exchange/interface.go +++ b/exchange/interface.go @@ -22,5 +22,7 @@ type Interface interface { // type Exchanger interface // available on the network? HasBlock(blocks.Block) error + IsOnline() bool + io.Closer } diff --git a/exchange/offline/offline.go b/exchange/offline/offline.go index b483e1825ac..190d5bfa245 100644 --- a/exchange/offline/offline.go +++ b/exchange/offline/offline.go @@ -67,3 +67,7 @@ func (e *offlineExchange) GetBlocks(ctx context.Context, ks []key.Key) (<-chan b }() return out, nil } + +func (e *offlineExchange) IsOnline() bool { + return false +} diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index f32104d8684..5f7e55fc2b1 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -7,6 +7,7 @@ import ( "sync" bserv "github.com/ipfs/go-ipfs/blockservice" + offline "github.com/ipfs/go-ipfs/exchange/offline" key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" "context" @@ -23,21 +24,21 @@ type DAGService interface { Get(context.Context, *cid.Cid) (*Node, error) Remove(*Node) error - // Return all links for a node, may be more effect than - // calling Get - GetLinks(context.Context, *cid.Cid) ([]*Link, error) - // GetDAG returns, in order, all the single leve child // nodes of the passed in node. GetMany(context.Context, []*cid.Cid) <-chan *NodeOption Batch() *Batch + + LinkService } -// A LinkService returns the links for a node if they are available -// locally without having to retrieve the block from the datastore. type LinkService interface { - Get(*cid.Cid) ([]*Link, error) + // Return all links for a node, may be more effect than + // calling Get in DAGService + GetLinks(context.Context, *cid.Cid) ([]*Link, error) + + GetOfflineLinkService() LinkService } func NewDAGService(bs *bserv.BlockService) *dagService { @@ -50,8 +51,7 @@ func NewDAGService(bs *bserv.BlockService) *dagService { // TODO: should cache Nodes that are in memory, and be // able to free some of them when vm pressure is high type dagService struct { - Blocks *bserv.BlockService - LinkService LinkService + Blocks *bserv.BlockService } // Add adds a node to the dagService, storing the block in the BlockService @@ -105,12 +105,6 @@ func (n *dagService) Get(ctx context.Context, c *cid.Cid) (*Node, error) { } func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*Link, error) { - if n.LinkService != nil { - links, err := n.LinkService.Get(c) - if err == nil { - return links, nil - } - } node, err := n.Get(ctx, c) if err != nil { return nil, err @@ -118,6 +112,15 @@ func (n *dagService) GetLinks(ctx context.Context, c *cid.Cid) ([]*Link, error) return node.Links, nil } +func (n *dagService) GetOfflineLinkService() LinkService { + if n.Blocks.Exchange.IsOnline() { + bsrv := bserv.New(n.Blocks.Blockstore, offline.Exchange(n.Blocks.Blockstore)) + return NewDAGService(bsrv) + } else { + return n + } +} + func (n *dagService) Remove(nd *Node) error { return n.Blocks.DeleteObject(nd) } @@ -391,7 +394,7 @@ func legacyCidFromLink(lnk *Link) *cid.Cid { // EnumerateChildren will walk the dag below the given root node and add all // unseen children to the passed in set. // TODO: parallelize to avoid disk latency perf hits? -func EnumerateChildren(ctx context.Context, ds DAGService, links []*Link, visit func(*cid.Cid) bool, bestEffort bool) error { +func EnumerateChildren(ctx context.Context, ds LinkService, links []*Link, visit func(*cid.Cid) bool, bestEffort bool) error { for _, lnk := range links { c := legacyCidFromLink(lnk) if visit(c) { diff --git a/pin/gc/gc.go b/pin/gc/gc.go index ef57cf6ad53..dac5e48bacb 100644 --- a/pin/gc/gc.go +++ b/pin/gc/gc.go @@ -2,8 +2,6 @@ package gc import ( bstore "github.com/ipfs/go-ipfs/blocks/blockstore" - bserv "github.com/ipfs/go-ipfs/blockservice" - offline "github.com/ipfs/go-ipfs/exchange/offline" dag "github.com/ipfs/go-ipfs/merkledag" pin "github.com/ipfs/go-ipfs/pin" key "gx/ipfs/QmYEoKZXHoAToWfhGF3vryhMn3WWhE1o2MasQ8uzY5iDi9/go-key" @@ -27,11 +25,9 @@ var log = logging.Logger("gc") func GC(ctx context.Context, bs bstore.GCBlockstore, ls dag.LinkService, pn pin.Pinner, bestEffortRoots []*cid.Cid) (<-chan key.Key, error) { unlocker := bs.GCLock() - bsrv := bserv.New(bs, offline.Exchange(bs)) - ds := dag.NewDAGService(bsrv) - ds.LinkService = ls + ls = ls.GetOfflineLinkService() - gcs, err := ColoredSet(ctx, pn, ds, bestEffortRoots) + gcs, err := ColoredSet(ctx, pn, ls, bestEffortRoots) if err != nil { return nil, err } @@ -72,16 +68,16 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, ls dag.LinkService, pn pin. return output, nil } -func Descendants(ctx context.Context, ds dag.DAGService, set key.KeySet, roots []*cid.Cid, bestEffort bool) error { +func Descendants(ctx context.Context, ls dag.LinkService, set key.KeySet, roots []*cid.Cid, bestEffort bool) error { for _, c := range roots { set.Add(key.Key(c.Hash())) - links, err := ds.GetLinks(ctx, c) + links, err := ls.GetLinks(ctx, c) if err != nil { return err } // EnumerateChildren recursively walks the dag and adds the keys to the given set - err = dag.EnumerateChildren(ctx, ds, links, func(c *cid.Cid) bool { + err = dag.EnumerateChildren(ctx, ls, links, func(c *cid.Cid) bool { k := key.Key(c.Hash()) seen := set.Has(k) if seen { @@ -98,16 +94,16 @@ func Descendants(ctx context.Context, ds dag.DAGService, set key.KeySet, roots [ return nil } -func ColoredSet(ctx context.Context, pn pin.Pinner, ds dag.DAGService, bestEffortRoots []*cid.Cid) (key.KeySet, error) { +func ColoredSet(ctx context.Context, pn pin.Pinner, ls dag.LinkService, bestEffortRoots []*cid.Cid) (key.KeySet, error) { // KeySet currently implemented in memory, in the future, may be bloom filter or // disk backed to conserve memory. gcs := key.NewKeySet() - err := Descendants(ctx, ds, gcs, pn.RecursiveKeys(), false) + err := Descendants(ctx, ls, gcs, pn.RecursiveKeys(), false) if err != nil { return nil, err } - err = Descendants(ctx, ds, gcs, bestEffortRoots, true) + err = Descendants(ctx, ls, gcs, bestEffortRoots, true) if err != nil { return nil, err } @@ -116,7 +112,7 @@ func ColoredSet(ctx context.Context, pn pin.Pinner, ds dag.DAGService, bestEffor gcs.Add(key.Key(k.Hash())) } - err = Descendants(ctx, ds, gcs, pn.InternalPins(), false) + err = Descendants(ctx, ls, gcs, pn.InternalPins(), false) if err != nil { return nil, err } diff --git a/pin/pin.go b/pin/pin.go index ab949ec40ed..cd55aaa9918 100644 --- a/pin/pin.go +++ b/pin/pin.go @@ -521,7 +521,7 @@ func (p *pinner) PinWithMode(c *cid.Cid, mode PinMode) { } } -func hasChild(ds mdag.DAGService, links []*mdag.Link, child key.Key) (bool, error) { +func hasChild(ds mdag.LinkService, links []*mdag.Link, child key.Key) (bool, error) { for _, lnk := range links { c := cid.NewCidV0(lnk.Hash) if key.Key(c.Hash()) == child {