From 268239d6119ba6a5a9ae67654adfa430c1224cf2 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Tue, 23 Jun 2015 16:01:32 -0700 Subject: [PATCH 1/5] implement mark and sweep GC License: MIT Signed-off-by: Jeromy dont GC blocks used by pinner License: MIT Signed-off-by: Jeromy comment GC algo License: MIT Signed-off-by: Jeromy add lock to blockstore to prevent GC from eating wanted blocks License: MIT Signed-off-by: Jeromy improve FetchGraph License: MIT Signed-off-by: Jeromy separate interfaces for blockstore and GCBlockstore License: MIT Signed-off-by: Jeromy reintroduce indirect pinning, add enumerateChildren dag method License: MIT Signed-off-by: Jeromy --- core/commands/add.go | 8 +-- core/commands/pin.go | 61 ++++++++---------- core/corehttp/gateway_handler.go | 2 +- core/corerepo/gc.go | 47 +++++--------- core/coreunix/add.go | 9 --- core/coreunix/metadata_test.go | 2 +- importer/helpers/dagbuilder.go | 27 -------- importer/helpers/helpers.go | 12 ---- importer/importer.go | 40 ++---------- importer/importer_test.go | 6 +- merkledag/merkledag_test.go | 6 +- pin/gc/gc.go | 102 +++++++++++++++++++++++++++++ pin/pin.go | 107 ++++++------------------------- pin/pin_test.go | 65 +------------------ test/sharness/t0080-repo.sh | 10 +-- unixfs/mod/dagmodifier.go | 9 --- unixfs/mod/dagmodifier_test.go | 26 ++------ 17 files changed, 189 insertions(+), 350 deletions(-) create mode 100644 pin/gc/gc.go diff --git a/core/commands/add.go b/core/commands/add.go index 01ac60f4321..a7ba4d9052f 100644 --- a/core/commands/add.go +++ b/core/commands/add.go @@ -137,9 +137,7 @@ remains to be implemented. } if !hash { - n.Pinning.RemovePinWithMode(rnk, pin.Indirect) n.Pinning.PinWithMode(rnk, pin.Recursive) - err = n.Pinning.Flush() if err != nil { res.SetError(err, cmds.ErrNormal) @@ -243,14 +241,12 @@ func add(n *core.IpfsNode, reader io.Reader, useTrickle bool) (*dag.Node, error) reader, n.DAG, chunk.DefaultSplitter, - importer.PinIndirectCB(n.Pinning), ) } else { node, err = importer.BuildDagFromReader( reader, n.DAG, chunk.DefaultSplitter, - importer.PinIndirectCB(n.Pinning), ) } @@ -329,13 +325,11 @@ func addDir(n *core.IpfsNode, dir files.File, out chan interface{}, progress boo return nil, err } - k, err := n.DAG.Add(tree) + _, err = n.DAG.Add(tree) if err != nil { return nil, err } - n.Pinning.PinWithMode(k, pin.Indirect) - return tree, nil } diff --git a/core/commands/pin.go b/core/commands/pin.go index 627260a07a0..0f29b9b2589 100644 --- a/core/commands/pin.go +++ b/core/commands/pin.go @@ -8,6 +8,7 @@ import ( key "github.com/ipfs/go-ipfs/blocks/key" cmds "github.com/ipfs/go-ipfs/commands" corerepo "github.com/ipfs/go-ipfs/core/corerepo" + dag "github.com/ipfs/go-ipfs/merkledag" u "github.com/ipfs/go-ipfs/util" ) @@ -169,14 +170,12 @@ Use --type= to specify the type of pinned keys to list. Valid values are: * "indirect": pinned indirectly by an ancestor (like a refcount) * "all" -To see the ref count on indirect pins, pass the -count option flag. Defaults to "direct". `, }, Options: []cmds.Option{ cmds.StringOption("type", "t", "The type of pinned keys to list. Can be \"direct\", \"indirect\", \"recursive\", or \"all\". Defaults to \"direct\""), - cmds.BoolOption("count", "n", "Show refcount when listing indirect pins"), cmds.BoolOption("quiet", "q", "Write just hashes of objects"), }, Run: func(req cmds.Request, res cmds.Response) { @@ -206,24 +205,35 @@ Defaults to "direct". if typeStr == "direct" || typeStr == "all" { for _, k := range n.Pinning.DirectKeys() { keys[k.B58String()] = RefKeyObject{ - Type: "direct", - Count: 1, + Type: "direct", } } } if typeStr == "indirect" || typeStr == "all" { - for k, v := range n.Pinning.IndirectKeys() { + ks := key.NewKeySet() + for _, k := range n.Pinning.RecursiveKeys() { + nd, err := n.DAG.Get(n.Context(), k) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + err = dag.EnumerateChildren(n.Context(), n.DAG, nd, ks) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } + + } + for _, k := range ks.Keys() { keys[k.B58String()] = RefKeyObject{ - Type: "indirect", - Count: v, + Type: "indirect", } } } if typeStr == "recursive" || typeStr == "all" { for _, k := range n.Pinning.RecursiveKeys() { keys[k.B58String()] = RefKeyObject{ - Type: "recursive", - Count: 1, + Type: "recursive", } } } @@ -233,16 +243,6 @@ Defaults to "direct". Type: RefKeyList{}, Marshalers: cmds.MarshalerMap{ cmds.Text: func(res cmds.Response) (io.Reader, error) { - typeStr, _, err := res.Request().Option("type").String() - if err != nil { - return nil, err - } - - count, _, err := res.Request().Option("count").Bool() - if err != nil { - return nil, err - } - quiet, _, err := res.Request().Option("quiet").Bool() if err != nil { return nil, err @@ -253,21 +253,11 @@ Defaults to "direct". return nil, u.ErrCast() } out := new(bytes.Buffer) - if typeStr == "indirect" && count { - for k, v := range keys.Keys { - if quiet { - fmt.Fprintf(out, "%s %d\n", k, v.Count) - } else { - fmt.Fprintf(out, "%s %s %d\n", k, v.Type, v.Count) - } - } - } else { - for k, v := range keys.Keys { - if quiet { - fmt.Fprintf(out, "%s\n", k) - } else { - fmt.Fprintf(out, "%s %s\n", k, v.Type) - } + for k, v := range keys.Keys { + if quiet { + fmt.Fprintf(out, "%s\n", k) + } else { + fmt.Fprintf(out, "%s %s\n", k, v.Type) } } return out, nil @@ -276,8 +266,7 @@ Defaults to "direct". } type RefKeyObject struct { - Type string - Count uint64 + Type string } type RefKeyList struct { diff --git a/core/corehttp/gateway_handler.go b/core/corehttp/gateway_handler.go index f95380c63b0..186e38d045c 100644 --- a/core/corehttp/gateway_handler.go +++ b/core/corehttp/gateway_handler.go @@ -72,7 +72,7 @@ func (i *gatewayHandler) newDagFromReader(r io.Reader) (*dag.Node, error) { // TODO(cryptix): change and remove this helper once PR1136 is merged // return ufs.AddFromReader(i.node, r.Body) return importer.BuildDagFromReader( - r, i.node.DAG, chunk.DefaultSplitter, importer.BasicPinnerCB(i.node.Pinning)) + r, i.node.DAG, chunk.DefaultSplitter) } // TODO(btc): break this apart into separate handlers using a more expressive muxer diff --git a/core/corerepo/gc.go b/core/corerepo/gc.go index 6b284d03f3e..02230ef12cc 100644 --- a/core/corerepo/gc.go +++ b/core/corerepo/gc.go @@ -4,6 +4,7 @@ import ( context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" key "github.com/ipfs/go-ipfs/blocks/key" "github.com/ipfs/go-ipfs/core" + gc "github.com/ipfs/go-ipfs/pin/gc" eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog" ) @@ -15,54 +16,40 @@ type KeyRemoved struct { } func GarbageCollect(n *core.IpfsNode, ctx context.Context) error { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() // in case error occurs during operation - keychan, err := n.Blockstore.AllKeysChan(ctx) + rmed, err := gc.GC(ctx, n.Blockstore, n.Pinning) if err != nil { return err } - for k := range keychan { // rely on AllKeysChan to close chan - if !n.Pinning.IsPinned(k) { - err := n.Blockstore.DeleteBlock(k) - if err != nil { - return err + + for { + select { + case _, ok := <-rmed: + if !ok { + return nil } + case <-ctx.Done(): + return ctx.Err() } } - return nil + } func GarbageCollectAsync(n *core.IpfsNode, ctx context.Context) (<-chan *KeyRemoved, error) { - - keychan, err := n.Blockstore.AllKeysChan(ctx) + rmed, err := gc.GC(ctx, n.Blockstore, n.Pinning) if err != nil { return nil, err } - output := make(chan *KeyRemoved) + out := make(chan *KeyRemoved) go func() { - defer close(output) - for { + defer close(out) + for k := range rmed { select { - case k, ok := <-keychan: - if !ok { - return - } - if !n.Pinning.IsPinned(k) { - err := n.Blockstore.DeleteBlock(k) - if err != nil { - log.Debugf("Error removing key from blockstore: %s", err) - continue - } - select { - case output <- &KeyRemoved{k}: - case <-ctx.Done(): - } - } + case out <- &KeyRemoved{k}: case <-ctx.Done(): return } } }() - return output, nil + return out, nil } diff --git a/core/coreunix/add.go b/core/coreunix/add.go index ea95df86d2e..a3d4bb10711 100644 --- a/core/coreunix/add.go +++ b/core/coreunix/add.go @@ -14,7 +14,6 @@ import ( importer "github.com/ipfs/go-ipfs/importer" chunk "github.com/ipfs/go-ipfs/importer/chunk" merkledag "github.com/ipfs/go-ipfs/merkledag" - "github.com/ipfs/go-ipfs/pin" "github.com/ipfs/go-ipfs/thirdparty/eventlog" unixfs "github.com/ipfs/go-ipfs/unixfs" ) @@ -32,7 +31,6 @@ func Add(n *core.IpfsNode, r io.Reader) (string, error) { r, n.DAG, chunk.DefaultSplitter, - importer.BasicPinnerCB(n.Pinning), ) if err != nil { return "", err @@ -71,12 +69,6 @@ func AddR(n *core.IpfsNode, root string) (key string, err error) { return "", err } - n.Pinning.RemovePinWithMode(k, pin.Indirect) - err = n.Pinning.Flush() - if err != nil { - return "", err - } - return k.String(), nil } @@ -106,7 +98,6 @@ func add(n *core.IpfsNode, reader io.Reader) (*merkledag.Node, error) { reader, n.DAG, chunk.DefaultSplitter, - importer.PinIndirectCB(n.Pinning), ) if err != nil { return nil, err diff --git a/core/coreunix/metadata_test.go b/core/coreunix/metadata_test.go index d2cb579154c..444fc8c73f2 100644 --- a/core/coreunix/metadata_test.go +++ b/core/coreunix/metadata_test.go @@ -38,7 +38,7 @@ func TestMetadata(t *testing.T) { data := make([]byte, 1000) u.NewTimeSeededRand().Read(data) r := bytes.NewReader(data) - nd, err := importer.BuildDagFromReader(r, ds, chunk.DefaultSplitter, nil) + nd, err := importer.BuildDagFromReader(r, ds, chunk.DefaultSplitter) if err != nil { t.Fatal(err) } diff --git a/importer/helpers/dagbuilder.go b/importer/helpers/dagbuilder.go index 32d30ea311d..a8b6e8880f9 100644 --- a/importer/helpers/dagbuilder.go +++ b/importer/helpers/dagbuilder.go @@ -2,26 +2,15 @@ package helpers import ( dag "github.com/ipfs/go-ipfs/merkledag" - "github.com/ipfs/go-ipfs/pin" ) -// NodeCB is callback function for dag generation -// the `last` flag signifies whether or not this is the last -// (top-most root) node being added. useful for things like -// only pinning the first node recursively. -type NodeCB func(node *dag.Node, last bool) error - -var nilFunc NodeCB = func(_ *dag.Node, _ bool) error { return nil } - // DagBuilderHelper wraps together a bunch of objects needed to // efficiently create unixfs dag trees type DagBuilderHelper struct { dserv dag.DAGService - mp pin.Pinner in <-chan []byte nextData []byte // the next item to return. maxlinks int - ncb NodeCB } type DagBuilderParams struct { @@ -30,24 +19,15 @@ type DagBuilderParams struct { // DAGService to write blocks to (required) Dagserv dag.DAGService - - // Callback for each block added - NodeCB NodeCB } // Generate a new DagBuilderHelper from the given params, using 'in' as a // data source func (dbp *DagBuilderParams) New(in <-chan []byte) *DagBuilderHelper { - ncb := dbp.NodeCB - if ncb == nil { - ncb = nilFunc - } - return &DagBuilderHelper{ dserv: dbp.Dagserv, in: in, maxlinks: dbp.Maxlinks, - ncb: ncb, } } @@ -100,7 +80,6 @@ func (db *DagBuilderHelper) GetDagServ() dag.DAGService { // FillNodeLayer will add datanodes as children to the give node until // at most db.indirSize ndoes are added // -// warning: **children** pinned indirectly, but input node IS NOT pinned. func (db *DagBuilderHelper) FillNodeLayer(node *UnixfsNode) error { // while we have room AND we're not done @@ -144,12 +123,6 @@ func (db *DagBuilderHelper) Add(node *UnixfsNode) (*dag.Node, error) { return nil, err } - // node callback - err = db.ncb(dn, true) - if err != nil { - return nil, err - } - return dn, nil } diff --git a/importer/helpers/helpers.go b/importer/helpers/helpers.go index 412b4e39757..ac9bba158c2 100644 --- a/importer/helpers/helpers.go +++ b/importer/helpers/helpers.go @@ -5,10 +5,8 @@ import ( "time" "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" - key "github.com/ipfs/go-ipfs/blocks/key" chunk "github.com/ipfs/go-ipfs/importer/chunk" dag "github.com/ipfs/go-ipfs/merkledag" - "github.com/ipfs/go-ipfs/pin" ft "github.com/ipfs/go-ipfs/unixfs" ) @@ -112,21 +110,11 @@ func (n *UnixfsNode) AddChild(child *UnixfsNode, db *DagBuilderHelper) error { return err } - // Pin the child node indirectly - err = db.ncb(childnode, false) - if err != nil { - return err - } - return nil } // Removes the child node at the given index func (n *UnixfsNode) RemoveChild(index int, dbh *DagBuilderHelper) { - k := key.Key(n.node.Links[index].Hash) - if dbh.mp != nil { - dbh.mp.RemovePinWithMode(k, pin.Indirect) - } n.ufmt.RemoveBlockSize(index) n.node.Links = append(n.node.Links[:index], n.node.Links[index+1:]...) } diff --git a/importer/importer.go b/importer/importer.go index 30fd5f82f22..5fc72da1b28 100644 --- a/importer/importer.go +++ b/importer/importer.go @@ -12,7 +12,6 @@ import ( h "github.com/ipfs/go-ipfs/importer/helpers" trickle "github.com/ipfs/go-ipfs/importer/trickle" dag "github.com/ipfs/go-ipfs/merkledag" - "github.com/ipfs/go-ipfs/pin" u "github.com/ipfs/go-ipfs/util" ) @@ -20,7 +19,7 @@ var log = u.Logger("importer") // Builds a DAG from the given file, writing created blocks to disk as they are // created -func BuildDagFromFile(fpath string, ds dag.DAGService, mp pin.Pinner) (*dag.Node, error) { +func BuildDagFromFile(fpath string, ds dag.DAGService) (*dag.Node, error) { stat, err := os.Stat(fpath) if err != nil { return nil, err @@ -36,60 +35,29 @@ func BuildDagFromFile(fpath string, ds dag.DAGService, mp pin.Pinner) (*dag.Node } defer f.Close() - return BuildDagFromReader(f, ds, chunk.DefaultSplitter, BasicPinnerCB(mp)) + return BuildDagFromReader(f, ds, chunk.DefaultSplitter) } -func BuildDagFromReader(r io.Reader, ds dag.DAGService, spl chunk.BlockSplitter, ncb h.NodeCB) (*dag.Node, error) { +func BuildDagFromReader(r io.Reader, ds dag.DAGService, spl chunk.BlockSplitter) (*dag.Node, error) { // Start the splitter blkch := spl.Split(r) dbp := h.DagBuilderParams{ Dagserv: ds, Maxlinks: h.DefaultLinksPerBlock, - NodeCB: ncb, } return bal.BalancedLayout(dbp.New(blkch)) } -func BuildTrickleDagFromReader(r io.Reader, ds dag.DAGService, spl chunk.BlockSplitter, ncb h.NodeCB) (*dag.Node, error) { +func BuildTrickleDagFromReader(r io.Reader, ds dag.DAGService, spl chunk.BlockSplitter) (*dag.Node, error) { // Start the splitter blkch := spl.Split(r) dbp := h.DagBuilderParams{ Dagserv: ds, Maxlinks: h.DefaultLinksPerBlock, - NodeCB: ncb, } return trickle.TrickleLayout(dbp.New(blkch)) } - -func BasicPinnerCB(p pin.Pinner) h.NodeCB { - return func(n *dag.Node, last bool) error { - k, err := n.Key() - if err != nil { - return err - } - - if last { - p.PinWithMode(k, pin.Recursive) - return p.Flush() - } else { - p.PinWithMode(k, pin.Indirect) - return nil - } - } -} - -func PinIndirectCB(p pin.Pinner) h.NodeCB { - return func(n *dag.Node, last bool) error { - k, err := n.Key() - if err != nil { - return err - } - - p.PinWithMode(k, pin.Indirect) - return nil - } -} diff --git a/importer/importer_test.go b/importer/importer_test.go index 3641fb1b09b..234379adc64 100644 --- a/importer/importer_test.go +++ b/importer/importer_test.go @@ -17,7 +17,7 @@ import ( func getBalancedDag(t testing.TB, size int64, blksize int) (*dag.Node, dag.DAGService) { ds := mdtest.Mock(t) r := io.LimitReader(u.NewTimeSeededRand(), size) - nd, err := BuildDagFromReader(r, ds, &chunk.SizeSplitter{blksize}, nil) + nd, err := BuildDagFromReader(r, ds, &chunk.SizeSplitter{blksize}) if err != nil { t.Fatal(err) } @@ -27,7 +27,7 @@ func getBalancedDag(t testing.TB, size int64, blksize int) (*dag.Node, dag.DAGSe func getTrickleDag(t testing.TB, size int64, blksize int) (*dag.Node, dag.DAGService) { ds := mdtest.Mock(t) r := io.LimitReader(u.NewTimeSeededRand(), size) - nd, err := BuildTrickleDagFromReader(r, ds, &chunk.SizeSplitter{blksize}, nil) + nd, err := BuildTrickleDagFromReader(r, ds, &chunk.SizeSplitter{blksize}) if err != nil { t.Fatal(err) } @@ -40,7 +40,7 @@ func TestBalancedDag(t *testing.T) { u.NewTimeSeededRand().Read(buf) r := bytes.NewReader(buf) - nd, err := BuildDagFromReader(r, ds, chunk.DefaultSplitter, nil) + nd, err := BuildDagFromReader(r, ds, chunk.DefaultSplitter) if err != nil { t.Fatal(err) } diff --git a/merkledag/merkledag_test.go b/merkledag/merkledag_test.go index a7d7c658c51..09644410426 100644 --- a/merkledag/merkledag_test.go +++ b/merkledag/merkledag_test.go @@ -157,7 +157,7 @@ func runBatchFetchTest(t *testing.T, read io.Reader) { spl := &chunk.SizeSplitter{512} - root, err := imp.BuildDagFromReader(read, dagservs[0], spl, nil) + root, err := imp.BuildDagFromReader(read, dagservs[0], spl) if err != nil { t.Fatal(err) } @@ -225,7 +225,7 @@ func TestFetchGraph(t *testing.T) { read := io.LimitReader(u.NewTimeSeededRand(), 1024*32) spl := &chunk.SizeSplitter{512} - root, err := imp.BuildDagFromReader(read, dservs[0], spl, nil) + root, err := imp.BuildDagFromReader(read, dservs[0], spl) if err != nil { t.Fatal(err) } @@ -258,7 +258,7 @@ func TestEnumerateChildren(t *testing.T) { read := io.LimitReader(u.NewTimeSeededRand(), 1024*1024) - root, err := imp.BuildDagFromReader(read, ds, spl, nil) + root, err := imp.BuildDagFromReader(read, ds, spl) if err != nil { t.Fatal(err) } diff --git a/pin/gc/gc.go b/pin/gc/gc.go new file mode 100644 index 00000000000..a9cb326ec37 --- /dev/null +++ b/pin/gc/gc.go @@ -0,0 +1,102 @@ +package gc + +import ( + bstore "github.com/ipfs/go-ipfs/blocks/blockstore" + key "github.com/ipfs/go-ipfs/blocks/key" + bserv "github.com/ipfs/go-ipfs/blockservice" + offline "github.com/ipfs/go-ipfs/exchange/offline" + dag "github.com/ipfs/go-ipfs/merkledag" + pin "github.com/ipfs/go-ipfs/pin" + + context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" + eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog" +) + +var log = eventlog.Logger("gc") + +// GC performs a mark and sweep garbage collection of the blocks in the blockstore +// first, it creates a 'marked' set and adds to it the following: +// - all recursively pinned blocks, plus all of their descendants (recursively) +// - all directly pinned blocks +// - all blocks utilized internally by the pinner +// +// The routine then iterates over every block in the blockstore and +// deletes any block that is not found in the marked set. +func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner) (<-chan key.Key, error) { + unlock := bs.GCLock() + defer unlock() + + bsrv, err := bserv.New(bs, offline.Exchange(bs)) + if err != nil { + return nil, err + } + ds := dag.NewDAGService(bsrv) + + // KeySet currently implemented in memory, in the future, may be bloom filter or + // disk backed to conserve memory. + gcs := key.NewKeySet() + for _, k := range pn.RecursiveKeys() { + gcs.Add(k) + nd, err := ds.Get(ctx, k) + if err != nil { + return nil, err + } + + // EnumerateChildren recursively walks the dag and adds the keys to the given set + err = dag.EnumerateChildren(ctx, ds, nd, gcs) + if err != nil { + return nil, err + } + } + for _, k := range pn.DirectKeys() { + gcs.Add(k) + } + for _, k := range pn.InternalPins() { + gcs.Add(k) + + nd, err := ds.Get(ctx, k) + if err != nil { + return nil, err + } + + // EnumerateChildren recursively walks the dag and adds the keys to the given set + err = dag.EnumerateChildren(ctx, ds, nd, gcs) + if err != nil { + return nil, err + } + } + + keychan, err := bs.AllKeysChan(ctx) + if err != nil { + return nil, err + } + + output := make(chan key.Key) + go func() { + defer close(output) + for { + select { + case k, ok := <-keychan: + if !ok { + return + } + if !gcs.Has(k) { + err := bs.DeleteBlock(k) + if err != nil { + log.Debugf("Error removing key from blockstore: %s", err) + return + } + select { + case output <- k: + case <-ctx.Done(): + return + } + } + case <-ctx.Done(): + return + } + } + }() + + return output, nil +} diff --git a/pin/pin.go b/pin/pin.go index c0a14a59332..a5c9df37756 100644 --- a/pin/pin.go +++ b/pin/pin.go @@ -24,7 +24,6 @@ var emptyKey = key.B58KeyDecode("QmdfTbBqBPQ7VNxZEYEj14VmRuZBkqFbiwReogJgS1zR1n" const ( linkDirect = "direct" linkRecursive = "recursive" - linkIndirect = "indirect" ) type PinMode int @@ -32,7 +31,6 @@ type PinMode int const ( Recursive PinMode = iota Direct - Indirect NotPinned ) @@ -52,8 +50,8 @@ type Pinner interface { Flush() error DirectKeys() []key.Key - IndirectKeys() map[key.Key]uint64 RecursiveKeys() []key.Key + InternalPins() []key.Key } // pinner implements the Pinner interface @@ -61,7 +59,7 @@ type pinner struct { lock sync.RWMutex recursePin set.BlockSet directPin set.BlockSet - indirPin *indirectPin + // Track the keys used for storing the pinning state, so gc does // not delete them. internalPin map[key.Key]struct{} @@ -80,7 +78,6 @@ func NewPinner(dstore ds.ThreadSafeDatastore, serv mdag.DAGService) Pinner { return &pinner{ recursePin: rcset, directPin: dirset, - indirPin: newIndirectPin(), dserv: serv, dstore: dstore, } @@ -104,7 +101,8 @@ func (p *pinner) Pin(ctx context.Context, node *mdag.Node, recurse bool) error { p.directPin.RemoveBlock(k) } - err := p.pinLinks(ctx, node) + // fetch entire graph + err := mdag.FetchGraph(ctx, node, p.dserv) if err != nil { return err } @@ -132,72 +130,18 @@ func (p *pinner) Unpin(ctx context.Context, k key.Key, recursive bool) error { if p.recursePin.HasKey(k) { if recursive { p.recursePin.RemoveBlock(k) - node, err := p.dserv.Get(ctx, k) - if err != nil { - return err - } - - return p.unpinLinks(ctx, node) + return nil } else { return fmt.Errorf("%s is pinned recursively", k) } } else if p.directPin.HasKey(k) { p.directPin.RemoveBlock(k) return nil - } else if p.indirPin.HasKey(k) { - return fmt.Errorf("%s is pinned indirectly. indirect pins cannot be removed directly", k) } else { return fmt.Errorf("%s is not pinned", k) } } -func (p *pinner) unpinLinks(ctx context.Context, node *mdag.Node) error { - for _, l := range node.Links { - node, err := l.GetNode(ctx, p.dserv) - if err != nil { - return err - } - - k, err := node.Key() - if err != nil { - return err - } - - p.indirPin.Decrement(k) - - err = p.unpinLinks(ctx, node) - if err != nil { - return err - } - } - return nil -} - -func (p *pinner) pinIndirectRecurse(ctx context.Context, node *mdag.Node) error { - k, err := node.Key() - if err != nil { - return err - } - - p.indirPin.Increment(k) - return p.pinLinks(ctx, node) -} - -func (p *pinner) pinLinks(ctx context.Context, node *mdag.Node) error { - for _, ng := range p.dserv.GetDAG(ctx, node) { - subnode, err := ng.Get(ctx) - if err != nil { - // TODO: Maybe just log and continue? - return err - } - err = p.pinIndirectRecurse(ctx, subnode) - if err != nil { - return err - } - } - return nil -} - func (p *pinner) isInternalPin(key key.Key) bool { _, ok := p.internalPin[key] return ok @@ -209,7 +153,6 @@ func (p *pinner) IsPinned(key key.Key) bool { defer p.lock.RUnlock() return p.recursePin.HasKey(key) || p.directPin.HasKey(key) || - p.indirPin.HasKey(key) || p.isInternalPin(key) } @@ -219,8 +162,6 @@ func (p *pinner) RemovePinWithMode(key key.Key, mode PinMode) { switch mode { case Direct: p.directPin.RemoveBlock(key) - case Indirect: - p.indirPin.Decrement(key) case Recursive: p.recursePin.RemoveBlock(key) default: @@ -275,14 +216,6 @@ func LoadPinner(d ds.ThreadSafeDatastore, dserv mdag.DAGService) (Pinner, error) p.directPin = set.SimpleSetFromKeys(directKeys) } - { // load indirect set - refcnt, err := loadMultiset(ctx, dserv, root, linkIndirect, recordInternal) - if err != nil { - return nil, fmt.Errorf("cannot load indirect pins: %v", err) - } - p.indirPin = &indirectPin{refCounts: refcnt} - } - p.internalPin = internalPin // assign services @@ -297,11 +230,6 @@ func (p *pinner) DirectKeys() []key.Key { return p.directPin.GetKeys() } -// IndirectKeys returns a slice containing the indirectly pinned keys -func (p *pinner) IndirectKeys() map[key.Key]uint64 { - return p.indirPin.GetRefs() -} - // RecursiveKeys returns a slice containing the recursively pinned keys func (p *pinner) RecursiveKeys() []key.Key { return p.recursePin.GetKeys() @@ -340,20 +268,17 @@ func (p *pinner) Flush() error { } } - { - n, err := storeMultiset(ctx, p.dserv, p.indirPin.GetRefs(), recordInternal) - if err != nil { - return err - } - if err := root.AddNodeLink(linkIndirect, n); err != nil { - return err - } + // add the empty node, its referenced by the pin sets but never created + _, err := p.dserv.Add(new(mdag.Node)) + if err != nil { + return err } k, err := p.dserv.Add(root) if err != nil { return err } + internalPin[k] = struct{}{} if err := p.dstore.Put(pinDatastoreKey, []byte(k)); err != nil { return fmt.Errorf("cannot store pin state: %v", err) @@ -362,6 +287,16 @@ func (p *pinner) Flush() error { return nil } +func (p *pinner) InternalPins() []key.Key { + p.lock.Lock() + defer p.lock.Unlock() + var out []key.Key + for k, _ := range p.internalPin { + out = append(out, k) + } + return out +} + // PinWithMode allows the user to have fine grained control over pin // counts func (p *pinner) PinWithMode(k key.Key, mode PinMode) { @@ -372,7 +307,5 @@ func (p *pinner) PinWithMode(k key.Key, mode PinMode) { p.recursePin.AddBlock(k) case Direct: p.directPin.AddBlock(k) - case Indirect: - p.indirPin.Increment(k) } } diff --git a/pin/pin_test.go b/pin/pin_test.go index c2d5bdfd850..7b17290d6d2 100644 --- a/pin/pin_test.go +++ b/pin/pin_test.go @@ -2,7 +2,6 @@ package pin import ( "testing" - "time" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" @@ -56,7 +55,7 @@ func TestPinnerBasic(t *testing.T) { } // create new node c, to be indirectly pinned through b - c, ck := randNode() + c, _ := randNode() _, err = dserv.Add(c) if err != nil { t.Fatal(err) @@ -85,10 +84,6 @@ func TestPinnerBasic(t *testing.T) { t.Fatal(err) } - if !p.IsPinned(ck) { - t.Fatal("Child of recursively pinned node not found") - } - bk, _ := b.Key() if !p.IsPinned(bk) { t.Fatal("Recursively pinned node not found..") @@ -98,7 +93,7 @@ func TestPinnerBasic(t *testing.T) { d.AddNodeLink("a", a) d.AddNodeLink("c", c) - e, ek := randNode() + e, _ := randNode() d.AddNodeLink("e", e) // Must be in dagserv for unpin to work @@ -113,10 +108,6 @@ func TestPinnerBasic(t *testing.T) { t.Fatal(err) } - if !p.IsPinned(ek) { - t.Fatal(err) - } - dk, _ := d.Key() if !p.IsPinned(dk) { t.Fatal("pinned node not found.") @@ -128,11 +119,6 @@ func TestPinnerBasic(t *testing.T) { t.Fatal(err) } - // c should still be pinned under b - if !p.IsPinned(ck) { - t.Fatal("Recursive / indirect unpin fail.") - } - err = p.Flush() if err != nil { t.Fatal(err) @@ -148,11 +134,6 @@ func TestPinnerBasic(t *testing.T) { t.Fatal("Could not find pinned node!") } - // Test indirectly pinned - if !np.IsPinned(ck) { - t.Fatal("could not find indirectly pinned node") - } - // Test recursively pinned if !np.IsPinned(bk) { t.Fatal("could not find recursively pinned node") @@ -210,7 +191,7 @@ func TestFlush(t *testing.T) { p := NewPinner(dstore, dserv) _, k := randNode() - p.PinWithMode(k, Indirect) + p.PinWithMode(k, Recursive) if err := p.Flush(); err != nil { t.Fatal(err) } @@ -218,43 +199,3 @@ func TestFlush(t *testing.T) { t.Fatal("expected key to still be pinned") } } - -func TestPinRecursiveFail(t *testing.T) { - ctx := context.Background() - dstore := dssync.MutexWrap(ds.NewMapDatastore()) - bstore := blockstore.NewBlockstore(dstore) - bserv, err := bs.New(bstore, offline.Exchange(bstore)) - if err != nil { - t.Fatal(err) - } - - dserv := mdag.NewDAGService(bserv) - - p := NewPinner(dstore, dserv) - - a, _ := randNode() - b, _ := randNode() - err = a.AddNodeLinkClean("child", b) - if err != nil { - t.Fatal(err) - } - - // Note: this isnt a time based test, we expect the pin to fail - mctx, _ := context.WithTimeout(ctx, time.Millisecond) - err = p.Pin(mctx, a, true) - if err == nil { - t.Fatal("should have failed to pin here") - } - - _, err = dserv.Add(b) - if err != nil { - t.Fatal(err) - } - - // this one is time based... but shouldnt cause any issues - mctx, _ = context.WithTimeout(ctx, time.Second) - err = p.Pin(mctx, a, true) - if err != nil { - t.Fatal(err) - } -} diff --git a/test/sharness/t0080-repo.sh b/test/sharness/t0080-repo.sh index 1db6b55f54a..c4e8af274d6 100755 --- a/test/sharness/t0080-repo.sh +++ b/test/sharness/t0080-repo.sh @@ -40,12 +40,8 @@ test_expect_success "'ipfs pin rm' output looks good" ' ' test_expect_success "file no longer pinned" ' - # we expect the welcome files to show up here - echo "$HASH_WELCOME_DOCS" >expected2 && - ipfs refs -r "$HASH_WELCOME_DOCS" >>expected2 && - echo QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn >> expected2 && ipfs pin ls --type=recursive --quiet >actual2 && - test_sort_cmp expected2 actual2 + test_expect_code 1 grep $HASH actual2 ' test_expect_success "recursively pin afile" ' @@ -97,8 +93,7 @@ test_expect_success "adding multiblock random file succeeds" ' MBLOCKHASH=`ipfs add -q multiblock` ' -# TODO: this starts to fail with the pinning rewrite, for unclear reasons -test_expect_failure "'ipfs pin ls --type=indirect' is correct" ' +test_expect_success "'ipfs pin ls --type=indirect' is correct" ' ipfs refs "$MBLOCKHASH" >refsout && ipfs refs -r "$HASH_WELCOME_DOCS" >>refsout && sed -i"~" "s/\(.*\)/\1 indirect/g" refsout && @@ -128,7 +123,6 @@ test_expect_success "'ipfs pin ls --type=recursive' is correct" ' echo "$MBLOCKHASH" >rp_expected && echo "$HASH_WELCOME_DOCS" >>rp_expected && echo QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn >>rp_expected && - ipfs refs -r "$HASH_WELCOME_DOCS" >>rp_expected && sed -i"~" "s/\(.*\)/\1 recursive/g" rp_expected && ipfs pin ls --type=recursive >rp_actual && test_sort_cmp rp_expected rp_actual diff --git a/unixfs/mod/dagmodifier.go b/unixfs/mod/dagmodifier.go index e367a71c904..8ad7360dcb1 100644 --- a/unixfs/mod/dagmodifier.go +++ b/unixfs/mod/dagmodifier.go @@ -12,7 +12,6 @@ import ( context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" key "github.com/ipfs/go-ipfs/blocks/key" - imp "github.com/ipfs/go-ipfs/importer" chunk "github.com/ipfs/go-ipfs/importer/chunk" help "github.com/ipfs/go-ipfs/importer/helpers" trickle "github.com/ipfs/go-ipfs/importer/trickle" @@ -267,10 +266,6 @@ func (dm *DagModifier) modifyDag(node *mdag.Node, offset uint64, data io.Reader) for i, bs := range f.GetBlocksizes() { // We found the correct child to write into if cur+bs > offset { - // Unpin block - ckey := key.Key(node.Links[i].Hash) - dm.mp.RemovePinWithMode(ckey, pin.Indirect) - child, err := node.Links[i].GetNode(dm.ctx, dm.dagserv) if err != nil { return "", false, err @@ -280,9 +275,6 @@ func (dm *DagModifier) modifyDag(node *mdag.Node, offset uint64, data io.Reader) return "", false, err } - // pin the new node - dm.mp.PinWithMode(k, pin.Indirect) - offset += bs node.Links[i].Hash = mh.Multihash(k) @@ -311,7 +303,6 @@ func (dm *DagModifier) appendData(node *mdag.Node, blks <-chan []byte) (*mdag.No dbp := &help.DagBuilderParams{ Dagserv: dm.dagserv, Maxlinks: help.DefaultLinksPerBlock, - NodeCB: imp.BasicPinnerCB(dm.mp), } return trickle.TrickleAppend(node, dbp.New(blks)) diff --git a/unixfs/mod/dagmodifier_test.go b/unixfs/mod/dagmodifier_test.go index 0191fce8188..41ef3f2fb84 100644 --- a/unixfs/mod/dagmodifier_test.go +++ b/unixfs/mod/dagmodifier_test.go @@ -19,6 +19,7 @@ import ( trickle "github.com/ipfs/go-ipfs/importer/trickle" mdag "github.com/ipfs/go-ipfs/merkledag" pin "github.com/ipfs/go-ipfs/pin" + gc "github.com/ipfs/go-ipfs/pin/gc" ft "github.com/ipfs/go-ipfs/unixfs" uio "github.com/ipfs/go-ipfs/unixfs/io" u "github.com/ipfs/go-ipfs/util" @@ -39,7 +40,7 @@ func getMockDagServ(t testing.TB) (mdag.DAGService, pin.Pinner) { return dserv, pin.NewPinner(tsds, dserv) } -func getMockDagServAndBstore(t testing.TB) (mdag.DAGService, blockstore.Blockstore, pin.Pinner) { +func getMockDagServAndBstore(t testing.TB) (mdag.DAGService, blockstore.GCBlockstore, pin.Pinner) { dstore := ds.NewMapDatastore() tsds := sync.MutexWrap(dstore) bstore := blockstore.NewBlockstore(tsds) @@ -53,7 +54,7 @@ func getMockDagServAndBstore(t testing.TB) (mdag.DAGService, blockstore.Blocksto func getNode(t testing.TB, dserv mdag.DAGService, size int64, pinner pin.Pinner) ([]byte, *mdag.Node) { in := io.LimitReader(u.NewTimeSeededRand(), size) - node, err := imp.BuildTrickleDagFromReader(in, dserv, &chunk.SizeSplitter{500}, imp.BasicPinnerCB(pinner)) + node, err := imp.BuildTrickleDagFromReader(in, dserv, &chunk.SizeSplitter{500}) if err != nil { t.Fatal(err) } @@ -469,22 +470,17 @@ func TestSparseWrite(t *testing.T) { } } -func basicGC(t *testing.T, bs blockstore.Blockstore, pins pin.Pinner) { +func basicGC(t *testing.T, bs blockstore.GCBlockstore, pins pin.Pinner) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() // in case error occurs during operation - keychan, err := bs.AllKeysChan(ctx) + out, err := gc.GC(ctx, bs, pins) if err != nil { t.Fatal(err) } - for k := range keychan { // rely on AllKeysChan to close chan - if !pins.IsPinned(k) { - err := bs.DeleteBlock(k) - if err != nil { - t.Fatal(err) - } - } + for range out { } } + func TestCorrectPinning(t *testing.T) { dserv, bstore, pins := getMockDagServAndBstore(t) b, n := getNode(t, dserv, 50000, pins) @@ -566,14 +562,6 @@ func TestCorrectPinning(t *testing.T) { t.Fatal("Incorrect node recursively pinned") } - indirpins := pins.IndirectKeys() - children := enumerateChildren(t, nd, dserv) - // TODO this is not true if the contents happen to be identical - if len(indirpins) != len(children) { - t.Log(len(indirpins), len(children)) - t.Fatal("Incorrect number of indirectly pinned blocks") - } - } func enumerateChildren(t *testing.T, nd *mdag.Node, ds mdag.DAGService) []key.Key { From 2d82a203a1fcc5b7c77c4c3c8140fc32146cf2d9 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Thu, 9 Jul 2015 16:03:48 -0700 Subject: [PATCH 2/5] break up GC logic License: MIT Signed-off-by: Jeromy --- pin/gc/gc.go | 74 +++++++++++++++++++++++++++++----------------------- 1 file changed, 42 insertions(+), 32 deletions(-) diff --git a/pin/gc/gc.go b/pin/gc/gc.go index a9cb326ec37..2407952b177 100644 --- a/pin/gc/gc.go +++ b/pin/gc/gc.go @@ -32,38 +32,9 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner) (<-chan key. } ds := dag.NewDAGService(bsrv) - // KeySet currently implemented in memory, in the future, may be bloom filter or - // disk backed to conserve memory. - gcs := key.NewKeySet() - for _, k := range pn.RecursiveKeys() { - gcs.Add(k) - nd, err := ds.Get(ctx, k) - if err != nil { - return nil, err - } - - // EnumerateChildren recursively walks the dag and adds the keys to the given set - err = dag.EnumerateChildren(ctx, ds, nd, gcs) - if err != nil { - return nil, err - } - } - for _, k := range pn.DirectKeys() { - gcs.Add(k) - } - for _, k := range pn.InternalPins() { - gcs.Add(k) - - nd, err := ds.Get(ctx, k) - if err != nil { - return nil, err - } - - // EnumerateChildren recursively walks the dag and adds the keys to the given set - err = dag.EnumerateChildren(ctx, ds, nd, gcs) - if err != nil { - return nil, err - } + gcs, err := ColoredSet(pn, ds) + if err != nil { + return nil, err } keychan, err := bs.AllKeysChan(ctx) @@ -100,3 +71,42 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner) (<-chan key. return output, nil } + +func Descendants(ds dag.DAGService, set key.KeySet, roots []key.Key) error { + for _, k := range roots { + set.Add(k) + nd, err := ds.Get(context.Background(), k) + if err != nil { + return err + } + + // EnumerateChildren recursively walks the dag and adds the keys to the given set + err = dag.EnumerateChildren(context.Background(), ds, nd, set) + if err != nil { + return err + } + } + + return nil +} + +func ColoredSet(pn pin.Pinner, ds dag.DAGService) (key.KeySet, error) { + // KeySet currently implemented in memory, in the future, may be bloom filter or + // disk backed to conserve memory. + gcs := key.NewKeySet() + err := Descendants(ds, gcs, pn.RecursiveKeys()) + if err != nil { + return nil, err + } + + for _, k := range pn.DirectKeys() { + gcs.Add(k) + } + + err = Color(ds, gcs, pn.InternalPins()) + if err != nil { + return nil, err + } + + return gcs, nil +} From aa1be6b0db595b2889278a619ca77fb29545cc2c Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 10 Jul 2015 10:49:19 -0700 Subject: [PATCH 3/5] addressing comments from CR License: MIT Signed-off-by: Jeromy --- pin/gc/gc.go | 2 +- pin/pin.go | 60 +++++++++++++++++++++++--- pin/pin_test.go | 78 ++++++++++++++++++++++++++-------- unixfs/mod/dagmodifier_test.go | 15 ------- 4 files changed, 116 insertions(+), 39 deletions(-) diff --git a/pin/gc/gc.go b/pin/gc/gc.go index 2407952b177..f510228d2d1 100644 --- a/pin/gc/gc.go +++ b/pin/gc/gc.go @@ -103,7 +103,7 @@ func ColoredSet(pn pin.Pinner, ds dag.DAGService) (key.KeySet, error) { gcs.Add(k) } - err = Color(ds, gcs, pn.InternalPins()) + err = Descendants(ds, gcs, pn.InternalPins()) if err != nil { return nil, err } diff --git a/pin/pin.go b/pin/pin.go index a5c9df37756..6075d510ede 100644 --- a/pin/pin.go +++ b/pin/pin.go @@ -35,7 +35,7 @@ const ( ) type Pinner interface { - IsPinned(key.Key) bool + IsPinned(key.Key) (string, bool, error) Pin(context.Context, *mdag.Node, bool) error Unpin(context.Context, key.Key, bool) error @@ -148,12 +148,38 @@ func (p *pinner) isInternalPin(key key.Key) bool { } // IsPinned returns whether or not the given key is pinned -func (p *pinner) IsPinned(key key.Key) bool { +// and an explanation of why its pinned +func (p *pinner) IsPinned(k key.Key) (string, bool, error) { p.lock.RLock() defer p.lock.RUnlock() - return p.recursePin.HasKey(key) || - p.directPin.HasKey(key) || - p.isInternalPin(key) + if p.recursePin.HasKey(k) { + return "recursive", true, nil + } + if p.directPin.HasKey(k) { + return "direct", true, nil + } + if p.isInternalPin(k) { + return "internal", true, nil + } + + for _, rk := range p.recursePin.GetKeys() { + ss := &searchSet{target: k} + + rnd, err := p.dserv.Get(context.Background(), rk) + if err != nil { + return "", false, err + } + + err = mdag.EnumerateChildren(context.Background(), p.dserv, rnd, ss) + if err != nil { + return "", false, err + } + + if ss.found { + return rk.B58String(), true, nil + } + } + return "", false, nil } func (p *pinner) RemovePinWithMode(key key.Key, mode PinMode) { @@ -309,3 +335,27 @@ func (p *pinner) PinWithMode(k key.Key, mode PinMode) { p.directPin.AddBlock(k) } } + +// searchSet implements key.KeySet in +type searchSet struct { + target key.Key + found bool +} + +func (ss *searchSet) Add(k key.Key) { + if ss.target == k { + ss.found = true + } +} + +func (ss *searchSet) Has(k key.Key) bool { + // returning true to all Has queries will cause EnumerateChildren to return + // almost immediately + return ss.found +} + +func (ss *searchSet) Keys() []key.Key { + return nil +} + +func (ss *searchSet) Remove(key.Key) {} diff --git a/pin/pin_test.go b/pin/pin_test.go index 7b17290d6d2..b137a535369 100644 --- a/pin/pin_test.go +++ b/pin/pin_test.go @@ -2,6 +2,7 @@ package pin import ( "testing" + "time" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" @@ -23,6 +24,17 @@ func randNode() (*mdag.Node, key.Key) { return nd, k } +func assertPinned(t *testing.T, p Pinner, k key.Key, failmsg string) { + _, pinned, err := p.IsPinned(k) + if err != nil { + t.Fatal(err) + } + + if !pinned { + t.Fatal(failmsg) + } +} + func TestPinnerBasic(t *testing.T) { ctx := context.Background() @@ -50,13 +62,11 @@ func TestPinnerBasic(t *testing.T) { t.Fatal(err) } - if !p.IsPinned(ak) { - t.Fatal("Failed to find key") - } + assertPinned(t, p, ak, "Failed to find key") // create new node c, to be indirectly pinned through b c, _ := randNode() - _, err = dserv.Add(c) + ck, err := dserv.Add(c) if err != nil { t.Fatal(err) } @@ -84,10 +94,10 @@ func TestPinnerBasic(t *testing.T) { t.Fatal(err) } + assertPinned(t, p, ck, "child of recursively pinned node not found") + bk, _ := b.Key() - if !p.IsPinned(bk) { - t.Fatal("Recursively pinned node not found..") - } + assertPinned(t, p, bk, "Recursively pinned node not found..") d, _ := randNode() d.AddNodeLink("a", a) @@ -109,9 +119,7 @@ func TestPinnerBasic(t *testing.T) { } dk, _ := d.Key() - if !p.IsPinned(dk) { - t.Fatal("pinned node not found.") - } + assertPinned(t, p, dk, "pinned node not found.") // Test recursive unpin err = p.Unpin(ctx, dk, true) @@ -130,14 +138,10 @@ func TestPinnerBasic(t *testing.T) { } // Test directly pinned - if !np.IsPinned(ak) { - t.Fatal("Could not find pinned node!") - } + assertPinned(t, np, ak, "Could not find pinned node!") // Test recursively pinned - if !np.IsPinned(bk) { - t.Fatal("could not find recursively pinned node") - } + assertPinned(t, np, bk, "could not find recursively pinned node") } func TestDuplicateSemantics(t *testing.T) { @@ -195,7 +199,45 @@ func TestFlush(t *testing.T) { if err := p.Flush(); err != nil { t.Fatal(err) } - if !p.IsPinned(k) { - t.Fatal("expected key to still be pinned") + assertPinned(t, p, k, "expected key to still be pinned") +} + +func TestPinRecursiveFail(t *testing.T) { + ctx := context.Background() + dstore := dssync.MutexWrap(ds.NewMapDatastore()) + bstore := blockstore.NewBlockstore(dstore) + bserv, err := bs.New(bstore, offline.Exchange(bstore)) + if err != nil { + t.Fatal(err) + } + + dserv := mdag.NewDAGService(bserv) + + p := NewPinner(dstore, dserv) + + a, _ := randNode() + b, _ := randNode() + err = a.AddNodeLinkClean("child", b) + if err != nil { + t.Fatal(err) + } + + // Note: this isnt a time based test, we expect the pin to fail + mctx, _ := context.WithTimeout(ctx, time.Millisecond) + err = p.Pin(mctx, a, true) + if err == nil { + t.Fatal("should have failed to pin here") + } + + _, err = dserv.Add(b) + if err != nil { + t.Fatal(err) + } + + // this one is time based... but shouldnt cause any issues + mctx, _ = context.WithTimeout(ctx, time.Second) + err = p.Pin(mctx, a, true) + if err != nil { + t.Fatal(err) } } diff --git a/unixfs/mod/dagmodifier_test.go b/unixfs/mod/dagmodifier_test.go index 41ef3f2fb84..0054a55a367 100644 --- a/unixfs/mod/dagmodifier_test.go +++ b/unixfs/mod/dagmodifier_test.go @@ -10,7 +10,6 @@ import ( "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync" "github.com/ipfs/go-ipfs/blocks/blockstore" - key "github.com/ipfs/go-ipfs/blocks/key" bs "github.com/ipfs/go-ipfs/blockservice" "github.com/ipfs/go-ipfs/exchange/offline" imp "github.com/ipfs/go-ipfs/importer" @@ -564,20 +563,6 @@ func TestCorrectPinning(t *testing.T) { } -func enumerateChildren(t *testing.T, nd *mdag.Node, ds mdag.DAGService) []key.Key { - var out []key.Key - for _, lnk := range nd.Links { - out = append(out, key.Key(lnk.Hash)) - child, err := lnk.GetNode(context.Background(), ds) - if err != nil { - t.Fatal(err) - } - children := enumerateChildren(t, child, ds) - out = append(out, children...) - } - return out -} - func BenchmarkDagmodWrite(b *testing.B) { b.StopTimer() dserv, pins := getMockDagServ(b) From c2fc8bc3cc08a9442f88221a1aa45005c93f1e82 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 10 Jul 2015 11:03:15 -0700 Subject: [PATCH 4/5] pin rm fails appropriately for indirect pins License: MIT Signed-off-by: Jeromy --- pin/pin.go | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/pin/pin.go b/pin/pin.go index 6075d510ede..ad69416bedc 100644 --- a/pin/pin.go +++ b/pin/pin.go @@ -127,18 +127,26 @@ func (p *pinner) Pin(ctx context.Context, node *mdag.Node, recurse bool) error { func (p *pinner) Unpin(ctx context.Context, k key.Key, recursive bool) error { p.lock.Lock() defer p.lock.Unlock() - if p.recursePin.HasKey(k) { + reason, pinned, err := p.isPinned(k) + if err != nil { + return err + } + if !pinned { + return fmt.Errorf("%s is not pinned", k) + } + switch reason { + case "recursive": if recursive { p.recursePin.RemoveBlock(k) return nil } else { return fmt.Errorf("%s is pinned recursively", k) } - } else if p.directPin.HasKey(k) { + case "direct": p.directPin.RemoveBlock(k) return nil - } else { - return fmt.Errorf("%s is not pinned", k) + default: + return fmt.Errorf("%s is pinned indirectly under %s", k, reason) } } @@ -152,6 +160,12 @@ func (p *pinner) isInternalPin(key key.Key) bool { func (p *pinner) IsPinned(k key.Key) (string, bool, error) { p.lock.RLock() defer p.lock.RUnlock() + return p.isPinned(k) +} + +// isPinned is the implementation of IsPinned that does not lock. +// intended for use by other pinned methods that already take locks +func (p *pinner) isPinned(k key.Key) (string, bool, error) { if p.recursePin.HasKey(k) { return "recursive", true, nil } From b521f547f3db4039884d31f53776633c2b010ba6 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 10 Jul 2015 11:34:29 -0700 Subject: [PATCH 5/5] dont use searchset for indirect pin checking License: MIT Signed-off-by: Jeromy --- pin/pin.go | 45 +++++++++++++++++++++------------------------ 1 file changed, 21 insertions(+), 24 deletions(-) diff --git a/pin/pin.go b/pin/pin.go index ad69416bedc..d52f94f5b8c 100644 --- a/pin/pin.go +++ b/pin/pin.go @@ -177,19 +177,16 @@ func (p *pinner) isPinned(k key.Key) (string, bool, error) { } for _, rk := range p.recursePin.GetKeys() { - ss := &searchSet{target: k} - rnd, err := p.dserv.Get(context.Background(), rk) if err != nil { return "", false, err } - err = mdag.EnumerateChildren(context.Background(), p.dserv, rnd, ss) + has, err := hasChild(p.dserv, rnd, k) if err != nil { return "", false, err } - - if ss.found { + if has { return rk.B58String(), true, nil } } @@ -350,26 +347,26 @@ func (p *pinner) PinWithMode(k key.Key, mode PinMode) { } } -// searchSet implements key.KeySet in -type searchSet struct { - target key.Key - found bool -} +func hasChild(ds mdag.DAGService, root *mdag.Node, child key.Key) (bool, error) { + for _, lnk := range root.Links { + k := key.Key(lnk.Hash) + if k == child { + return true, nil + } -func (ss *searchSet) Add(k key.Key) { - if ss.target == k { - ss.found = true - } -} + nd, err := ds.Get(context.Background(), k) + if err != nil { + return false, err + } -func (ss *searchSet) Has(k key.Key) bool { - // returning true to all Has queries will cause EnumerateChildren to return - // almost immediately - return ss.found -} + has, err := hasChild(ds, nd, child) + if err != nil { + return false, err + } -func (ss *searchSet) Keys() []key.Key { - return nil + if has { + return has, nil + } + } + return false, nil } - -func (ss *searchSet) Remove(key.Key) {}