Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement mark and sweep garbage collection #1420

Merged
merged 5 commits into from
Jul 11, 2015
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions core/commands/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,7 @@ remains to be implemented.
}

if !hash {
n.Pinning.RemovePinWithMode(rnk, pin.Indirect)
n.Pinning.PinWithMode(rnk, pin.Recursive)

err = n.Pinning.Flush()
if err != nil {
res.SetError(err, cmds.ErrNormal)
Expand Down Expand Up @@ -243,14 +241,12 @@ func add(n *core.IpfsNode, reader io.Reader, useTrickle bool) (*dag.Node, error)
reader,
n.DAG,
chunk.DefaultSplitter,
importer.PinIndirectCB(n.Pinning),
)
} else {
node, err = importer.BuildDagFromReader(
reader,
n.DAG,
chunk.DefaultSplitter,
importer.PinIndirectCB(n.Pinning),
)
}

Expand Down Expand Up @@ -329,13 +325,11 @@ func addDir(n *core.IpfsNode, dir files.File, out chan interface{}, progress boo
return nil, err
}

k, err := n.DAG.Add(tree)
_, err = n.DAG.Add(tree)
if err != nil {
return nil, err
}

n.Pinning.PinWithMode(k, pin.Indirect)

return tree, nil
}

Expand Down
61 changes: 25 additions & 36 deletions core/commands/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
key "github.com/ipfs/go-ipfs/blocks/key"
cmds "github.com/ipfs/go-ipfs/commands"
corerepo "github.com/ipfs/go-ipfs/core/corerepo"
dag "github.com/ipfs/go-ipfs/merkledag"
u "github.com/ipfs/go-ipfs/util"
)

Expand Down Expand Up @@ -169,14 +170,12 @@ Use --type=<type> to specify the type of pinned keys to list. Valid values are:
* "indirect": pinned indirectly by an ancestor (like a refcount)
* "all"

To see the ref count on indirect pins, pass the -count option flag.
Defaults to "direct".
`,
},

Options: []cmds.Option{
cmds.StringOption("type", "t", "The type of pinned keys to list. Can be \"direct\", \"indirect\", \"recursive\", or \"all\". Defaults to \"direct\""),
cmds.BoolOption("count", "n", "Show refcount when listing indirect pins"),
cmds.BoolOption("quiet", "q", "Write just hashes of objects"),
},
Run: func(req cmds.Request, res cmds.Response) {
Expand Down Expand Up @@ -206,24 +205,35 @@ Defaults to "direct".
if typeStr == "direct" || typeStr == "all" {
for _, k := range n.Pinning.DirectKeys() {
keys[k.B58String()] = RefKeyObject{
Type: "direct",
Count: 1,
Type: "direct",
}
}
}
if typeStr == "indirect" || typeStr == "all" {
for k, v := range n.Pinning.IndirectKeys() {
ks := key.NewKeySet()
for _, k := range n.Pinning.RecursiveKeys() {
nd, err := n.DAG.Get(n.Context(), k)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
err = dag.EnumerateChildren(n.Context(), n.DAG, nd, ks)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

}
for _, k := range ks.Keys() {
keys[k.B58String()] = RefKeyObject{
Type: "indirect",
Count: v,
Type: "indirect",
}
}
}
if typeStr == "recursive" || typeStr == "all" {
for _, k := range n.Pinning.RecursiveKeys() {
keys[k.B58String()] = RefKeyObject{
Type: "recursive",
Count: 1,
Type: "recursive",
}
}
}
Expand All @@ -233,16 +243,6 @@ Defaults to "direct".
Type: RefKeyList{},
Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) {
typeStr, _, err := res.Request().Option("type").String()
if err != nil {
return nil, err
}

count, _, err := res.Request().Option("count").Bool()
if err != nil {
return nil, err
}

quiet, _, err := res.Request().Option("quiet").Bool()
if err != nil {
return nil, err
Expand All @@ -253,21 +253,11 @@ Defaults to "direct".
return nil, u.ErrCast()
}
out := new(bytes.Buffer)
if typeStr == "indirect" && count {
for k, v := range keys.Keys {
if quiet {
fmt.Fprintf(out, "%s %d\n", k, v.Count)
} else {
fmt.Fprintf(out, "%s %s %d\n", k, v.Type, v.Count)
}
}
} else {
for k, v := range keys.Keys {
if quiet {
fmt.Fprintf(out, "%s\n", k)
} else {
fmt.Fprintf(out, "%s %s\n", k, v.Type)
}
for k, v := range keys.Keys {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i believe this needs to be brought back. (with likely a mark and sweep implementation)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, its okay that this is gone. you can still specify that you want to see indirect keys, but we arent going to bother with counting them anymore. Thats too much hassle and gains us nothing.

if quiet {
fmt.Fprintf(out, "%s\n", k)
} else {
fmt.Fprintf(out, "%s %s\n", k, v.Type)
}
}
return out, nil
Expand All @@ -276,8 +266,7 @@ Defaults to "direct".
}

type RefKeyObject struct {
Type string
Count uint64
Type string
}

type RefKeyList struct {
Expand Down
2 changes: 1 addition & 1 deletion core/corehttp/gateway_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (i *gatewayHandler) newDagFromReader(r io.Reader) (*dag.Node, error) {
// TODO(cryptix): change and remove this helper once PR1136 is merged
// return ufs.AddFromReader(i.node, r.Body)
return importer.BuildDagFromReader(
r, i.node.DAG, chunk.DefaultSplitter, importer.BasicPinnerCB(i.node.Pinning))
r, i.node.DAG, chunk.DefaultSplitter)
}

// TODO(btc): break this apart into separate handlers using a more expressive muxer
Expand Down
47 changes: 17 additions & 30 deletions core/corerepo/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
key "github.com/ipfs/go-ipfs/blocks/key"
"github.com/ipfs/go-ipfs/core"
gc "github.com/ipfs/go-ipfs/pin/gc"

eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"
)
Expand All @@ -15,54 +16,40 @@ type KeyRemoved struct {
}

func GarbageCollect(n *core.IpfsNode, ctx context.Context) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should flip the params here:GarbageCollect(ctx, n)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we've always had the core.* functions take the node as their first argument. You want to change this everywhere?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah. even when there is a ctx involved? i guess that makes sense

ctx, cancel := context.WithCancel(context.Background())
defer cancel() // in case error occurs during operation
keychan, err := n.Blockstore.AllKeysChan(ctx)
rmed, err := gc.GC(ctx, n.Blockstore, n.Pinning)
if err != nil {
return err
}
for k := range keychan { // rely on AllKeysChan to close chan
if !n.Pinning.IsPinned(k) {
err := n.Blockstore.DeleteBlock(k)
if err != nil {
return err

for {
select {
case _, ok := <-rmed:
if !ok {
return nil
}
case <-ctx.Done():
return ctx.Err()
}
}
return nil

}

func GarbageCollectAsync(n *core.IpfsNode, ctx context.Context) (<-chan *KeyRemoved, error) {

keychan, err := n.Blockstore.AllKeysChan(ctx)
rmed, err := gc.GC(ctx, n.Blockstore, n.Pinning)
if err != nil {
return nil, err
}

output := make(chan *KeyRemoved)
out := make(chan *KeyRemoved)
go func() {
defer close(output)
for {
defer close(out)
for k := range rmed {
select {
case k, ok := <-keychan:
if !ok {
return
}
if !n.Pinning.IsPinned(k) {
err := n.Blockstore.DeleteBlock(k)
if err != nil {
log.Debugf("Error removing key from blockstore: %s", err)
continue
}
select {
case output <- &KeyRemoved{k}:
case <-ctx.Done():
}
}
case out <- &KeyRemoved{k}:
case <-ctx.Done():
return
}
}
}()
return output, nil
return out, nil
}
9 changes: 0 additions & 9 deletions core/coreunix/add.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
importer "github.com/ipfs/go-ipfs/importer"
chunk "github.com/ipfs/go-ipfs/importer/chunk"
merkledag "github.com/ipfs/go-ipfs/merkledag"
"github.com/ipfs/go-ipfs/pin"
"github.com/ipfs/go-ipfs/thirdparty/eventlog"
unixfs "github.com/ipfs/go-ipfs/unixfs"
)
Expand All @@ -32,7 +31,6 @@ func Add(n *core.IpfsNode, r io.Reader) (string, error) {
r,
n.DAG,
chunk.DefaultSplitter,
importer.BasicPinnerCB(n.Pinning),
)
if err != nil {
return "", err
Expand Down Expand Up @@ -71,12 +69,6 @@ func AddR(n *core.IpfsNode, root string) (key string, err error) {
return "", err
}

n.Pinning.RemovePinWithMode(k, pin.Indirect)
err = n.Pinning.Flush()
if err != nil {
return "", err
}

return k.String(), nil
}

Expand Down Expand Up @@ -106,7 +98,6 @@ func add(n *core.IpfsNode, reader io.Reader) (*merkledag.Node, error) {
reader,
n.DAG,
chunk.DefaultSplitter,
importer.PinIndirectCB(n.Pinning),
)
if err != nil {
return nil, err
Expand Down
2 changes: 1 addition & 1 deletion core/coreunix/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestMetadata(t *testing.T) {
data := make([]byte, 1000)
u.NewTimeSeededRand().Read(data)
r := bytes.NewReader(data)
nd, err := importer.BuildDagFromReader(r, ds, chunk.DefaultSplitter, nil)
nd, err := importer.BuildDagFromReader(r, ds, chunk.DefaultSplitter)
if err != nil {
t.Fatal(err)
}
Expand Down
27 changes: 0 additions & 27 deletions importer/helpers/dagbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,15 @@ package helpers

import (
dag "github.com/ipfs/go-ipfs/merkledag"
"github.com/ipfs/go-ipfs/pin"
)

// NodeCB is callback function for dag generation
// the `last` flag signifies whether or not this is the last
// (top-most root) node being added. useful for things like
// only pinning the first node recursively.
type NodeCB func(node *dag.Node, last bool) error

var nilFunc NodeCB = func(_ *dag.Node, _ bool) error { return nil }

// DagBuilderHelper wraps together a bunch of objects needed to
// efficiently create unixfs dag trees
type DagBuilderHelper struct {
dserv dag.DAGService
mp pin.Pinner
in <-chan []byte
nextData []byte // the next item to return.
maxlinks int
ncb NodeCB
}

type DagBuilderParams struct {
Expand All @@ -30,24 +19,15 @@ type DagBuilderParams struct {

// DAGService to write blocks to (required)
Dagserv dag.DAGService

// Callback for each block added
NodeCB NodeCB
}

// Generate a new DagBuilderHelper from the given params, using 'in' as a
// data source
func (dbp *DagBuilderParams) New(in <-chan []byte) *DagBuilderHelper {
ncb := dbp.NodeCB
if ncb == nil {
ncb = nilFunc
}

return &DagBuilderHelper{
dserv: dbp.Dagserv,
in: in,
maxlinks: dbp.Maxlinks,
ncb: ncb,
}
}

Expand Down Expand Up @@ -100,7 +80,6 @@ func (db *DagBuilderHelper) GetDagServ() dag.DAGService {
// FillNodeLayer will add datanodes as children to the give node until
// at most db.indirSize ndoes are added
//
// warning: **children** pinned indirectly, but input node IS NOT pinned.
func (db *DagBuilderHelper) FillNodeLayer(node *UnixfsNode) error {

// while we have room AND we're not done
Expand Down Expand Up @@ -144,12 +123,6 @@ func (db *DagBuilderHelper) Add(node *UnixfsNode) (*dag.Node, error) {
return nil, err
}

// node callback
err = db.ncb(dn, true)
if err != nil {
return nil, err
}

return dn, nil
}

Expand Down
12 changes: 0 additions & 12 deletions importer/helpers/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,8 @@ import (
"time"

"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
key "github.com/ipfs/go-ipfs/blocks/key"
chunk "github.com/ipfs/go-ipfs/importer/chunk"
dag "github.com/ipfs/go-ipfs/merkledag"
"github.com/ipfs/go-ipfs/pin"
ft "github.com/ipfs/go-ipfs/unixfs"
)

Expand Down Expand Up @@ -112,21 +110,11 @@ func (n *UnixfsNode) AddChild(child *UnixfsNode, db *DagBuilderHelper) error {
return err
}

// Pin the child node indirectly
err = db.ncb(childnode, false)
if err != nil {
return err
}

return nil
}

// Removes the child node at the given index
func (n *UnixfsNode) RemoveChild(index int, dbh *DagBuilderHelper) {
k := key.Key(n.node.Links[index].Hash)
if dbh.mp != nil {
dbh.mp.RemovePinWithMode(k, pin.Indirect)
}
n.ufmt.RemoveBlockSize(index)
n.node.Links = append(n.node.Links[:index], n.node.Links[index+1:]...)
}
Expand Down
Loading