Skip to content

Commit

Permalink
reintroduce indirect pinning, add enumerateChildren dag method
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Jeromy <[email protected]>
  • Loading branch information
whyrusleeping committed Jul 7, 2015
1 parent 57bc208 commit 7580503
Show file tree
Hide file tree
Showing 6 changed files with 134 additions and 89 deletions.
47 changes: 20 additions & 27 deletions blocks/key/key_set.go
Original file line number Diff line number Diff line change
@@ -1,46 +1,39 @@
package key

import (
"sync"
)

type KeySet interface {
Add(Key)
Has(Key) bool
Remove(Key)
Keys() []Key
}

type ks struct {
lock sync.RWMutex
data map[Key]struct{}
type keySet struct {
keys map[Key]struct{}
}

func NewKeySet() KeySet {
return &ks{
data: make(map[Key]struct{}),
}
return &keySet{make(map[Key]struct{})}
}

func (wl *ks) Add(k Key) {
wl.lock.Lock()
defer wl.lock.Unlock()

wl.data[k] = struct{}{}
func (gcs *keySet) Add(k Key) {
gcs.keys[k] = struct{}{}
}

func (wl *ks) Remove(k Key) {
wl.lock.Lock()
defer wl.lock.Unlock()

delete(wl.data, k)
func (gcs *keySet) Has(k Key) bool {
_, has := gcs.keys[k]
return has
}

func (wl *ks) Keys() []Key {
wl.lock.RLock()
defer wl.lock.RUnlock()
keys := make([]Key, 0)
for k := range wl.data {
keys = append(keys, k)
func (ks *keySet) Keys() []Key {
var out []Key
for k, _ := range ks.keys {
out = append(out, k)
}
return keys
return out
}

func (ks *keySet) Remove(k Key) {
delete(ks.keys, k)
}

// TODO: implement disk-backed keyset for working with massive DAGs
48 changes: 34 additions & 14 deletions core/commands/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
key "github.com/ipfs/go-ipfs/blocks/key"
cmds "github.com/ipfs/go-ipfs/commands"
corerepo "github.com/ipfs/go-ipfs/core/corerepo"
dag "github.com/ipfs/go-ipfs/merkledag"
u "github.com/ipfs/go-ipfs/util"
)

Expand Down Expand Up @@ -153,24 +154,25 @@ var listPinCmd = &cmds.Command{
Helptext: cmds.HelpText{
Tagline: "List objects pinned to local storage",
ShortDescription: `
Returns a list of hashes of objects being pinned. Objects that are
recursively pinned are not included in the list.
Returns a list of hashes of objects being pinned. Objects that are indirectly
or recursively pinned are not included in the list.
`,
LongDescription: `
Returns a list of hashes of objects being pinned. Objects that are
recursively pinned are not included in the list.
Returns a list of hashes of objects being pinned. Objects that are indirectly
or recursively pinned are not included in the list.
Use --type=<type> to specify the type of pinned keys to list. Valid values are:
* "direct": pin that specific object.
* "recursive": pin that specific object, and indirectly, all its descendants
* "recursive": pin that specific object, and indirectly pin all its decendants
* "indirect": pinned indirectly by an ancestor (like a refcount)
* "all"
Defaults to "direct".
`,
},

Options: []cmds.Option{
cmds.StringOption("type", "t", "The type of pinned keys to list. Can be \"direct\", \"recursive\", or \"all\". Defaults to \"direct\""),
cmds.StringOption("type", "t", "The type of pinned keys to list. Can be \"direct\", \"indirect\", \"recursive\", or \"all\". Defaults to \"direct\""),
cmds.BoolOption("quiet", "q", "Write just hashes of objects"),
},
Run: func(req cmds.Request, res cmds.Response) {
Expand All @@ -190,26 +192,45 @@ Defaults to "direct".
}

switch typeStr {
case "all", "direct", "recursive":
case "all", "direct", "indirect", "recursive":
default:
err = fmt.Errorf("Invalid type '%s', must be one of {direct, recursive, all}", typeStr)
err = fmt.Errorf("Invalid type '%s', must be one of {direct, indirect, recursive, all}", typeStr)
res.SetError(err, cmds.ErrClient)
}

keys := make(map[string]RefKeyObject)
if typeStr == "direct" || typeStr == "all" {
for _, k := range n.Pinning.DirectKeys() {
keys[k.B58String()] = RefKeyObject{
Type: "direct",
Count: 1,
Type: "direct",
}
}
}
if typeStr == "indirect" || typeStr == "all" {
ks := key.NewKeySet()
for _, k := range n.Pinning.RecursiveKeys() {
nd, err := n.DAG.Get(n.Context(), k)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
err = dag.EnumerateChildren(n.Context(), n.DAG, nd, ks)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}

}
for _, k := range ks.Keys() {
keys[k.B58String()] = RefKeyObject{
Type: "indirect",
}
}
}
if typeStr == "recursive" || typeStr == "all" {
for _, k := range n.Pinning.RecursiveKeys() {
keys[k.B58String()] = RefKeyObject{
Type: "recursive",
Count: 1,
Type: "recursive",
}
}
}
Expand Down Expand Up @@ -242,8 +263,7 @@ Defaults to "direct".
}

type RefKeyObject struct {
Type string
Count uint64
Type string
}

type RefKeyList struct {
Expand Down
21 changes: 21 additions & 0 deletions merkledag/merkledag.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,3 +313,24 @@ func (np *nodePromise) Get(ctx context.Context) (*Node, error) {
}
return np.cache, nil
}

// EnumerateChildren will walk the dag below the given root node and add all
// unseen children to the passed in set.
// TODO: parallelize to avoid disk latency perf hits?
func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, set key.KeySet) error {
for _, lnk := range root.Links {
k := key.Key(lnk.Hash)
if !set.Has(k) {
set.Add(k)
child, err := ds.Get(ctx, k)
if err != nil {
return err
}
err = EnumerateChildren(ctx, ds, child, set)
if err != nil {
return err
}
}
}
return nil
}
38 changes: 38 additions & 0 deletions merkledag/merkledag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,3 +252,41 @@ func TestFetchGraphOther(t *testing.T) {
t.Fatal(err)
}
}

func TestEnumerateChildren(t *testing.T) {
bsi := bstest.Mocks(t, 1)
ds := NewDAGService(bsi[0])

spl := &chunk.SizeSplitter{512}

read := io.LimitReader(u.NewTimeSeededRand(), 1024*1024)

root, err := imp.BuildDagFromReader(read, ds, spl)
if err != nil {
t.Fatal(err)
}

ks := key.NewKeySet()
err = EnumerateChildren(context.Background(), ds, root, ks)
if err != nil {
t.Fatal(err)
}

var traverse func(n *Node)
traverse = func(n *Node) {
// traverse dag and check
for _, lnk := range n.Links {
k := key.Key(lnk.Hash)
if !ks.Has(k) {
t.Fatal("missing key in set!")
}
child, err := ds.Get(context.Background(), k)
if err != nil {
t.Fatal(err)
}
traverse(child)
}
}

traverse(root)
}
59 changes: 19 additions & 40 deletions pin/gc/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,42 +14,6 @@ import (

var log = eventlog.Logger("gc")

type GCSet struct {
keys map[key.Key]struct{}
}

func NewGCSet() *GCSet {
return &GCSet{make(map[key.Key]struct{})}
}

func (gcs *GCSet) Add(k key.Key) {
gcs.keys[k] = struct{}{}
}

func (gcs *GCSet) Has(k key.Key) bool {
_, has := gcs.keys[k]
return has
}

func (gcs *GCSet) AddDag(ds dag.DAGService, root key.Key) error {
ctx := context.Background()
nd, err := ds.Get(ctx, root)
if err != nil {
return err
}

gcs.Add(root)

for _, lnk := range nd.Links {
k := key.Key(lnk.Hash)
err := gcs.AddDag(ds, k)
if err != nil {
return err
}
}
return nil
}

// GC performs a mark and sweep garbage collection of the blocks in the blockstore
// first, it creates a 'marked' set and adds to it the following:
// - all recursively pinned blocks, plus all of their descendants (recursively)
Expand All @@ -68,11 +32,18 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner) (<-chan key.
}
ds := dag.NewDAGService(bsrv)

// GCSet currently implemented in memory, in the future, may be bloom filter or
// KeySet currently implemented in memory, in the future, may be bloom filter or
// disk backed to conserve memory.
gcs := NewGCSet()
gcs := key.NewKeySet()
for _, k := range pn.RecursiveKeys() {
err := gcs.AddDag(ds, k)
gcs.Add(k)
nd, err := ds.Get(ctx, k)
if err != nil {
return nil, err
}

// EnumerateChildren recursively walks the dag and adds the keys to the given set
err = dag.EnumerateChildren(ctx, ds, nd, gcs)
if err != nil {
return nil, err
}
Expand All @@ -81,7 +52,15 @@ func GC(ctx context.Context, bs bstore.GCBlockstore, pn pin.Pinner) (<-chan key.
gcs.Add(k)
}
for _, k := range pn.InternalPins() {
err := gcs.AddDag(ds, k)
gcs.Add(k)

nd, err := ds.Get(ctx, k)
if err != nil {
return nil, err
}

// EnumerateChildren recursively walks the dag and adds the keys to the given set
err = dag.EnumerateChildren(ctx, ds, nd, gcs)
if err != nil {
return nil, err
}
Expand Down
10 changes: 2 additions & 8 deletions test/sharness/t0080-repo.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,8 @@ test_expect_success "'ipfs pin rm' output looks good" '
'

test_expect_success "file no longer pinned" '
# we expect the welcome files to show up here
echo "$HASH_WELCOME_DOCS" >expected2 &&
ipfs refs -r "$HASH_WELCOME_DOCS" >>expected2 &&
echo QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn >> expected2 &&
ipfs pin ls --type=recursive --quiet >actual2 &&
test_sort_cmp expected2 actual2
test_expect_code 1 grep $HASH actual2
'

test_expect_success "recursively pin afile" '
Expand Down Expand Up @@ -97,8 +93,7 @@ test_expect_success "adding multiblock random file succeeds" '
MBLOCKHASH=`ipfs add -q multiblock`
'

# TODO: this starts to fail with the pinning rewrite, for unclear reasons
test_expect_failure "'ipfs pin ls --type=indirect' is correct" '
test_expect_success "'ipfs pin ls --type=indirect' is correct" '
ipfs refs "$MBLOCKHASH" >refsout &&
ipfs refs -r "$HASH_WELCOME_DOCS" >>refsout &&
sed -i"~" "s/\(.*\)/\1 indirect/g" refsout &&
Expand Down Expand Up @@ -128,7 +123,6 @@ test_expect_success "'ipfs pin ls --type=recursive' is correct" '
echo "$MBLOCKHASH" >rp_expected &&
echo "$HASH_WELCOME_DOCS" >>rp_expected &&
echo QmUNLLsPACCz1vLxQVkXqqLX5R1X345qqfHbsf67hvA3Nn >>rp_expected &&
ipfs refs -r "$HASH_WELCOME_DOCS" >>rp_expected &&
sed -i"~" "s/\(.*\)/\1 recursive/g" rp_expected &&
ipfs pin ls --type=recursive >rp_actual &&
test_sort_cmp rp_expected rp_actual
Expand Down

0 comments on commit 7580503

Please sign in to comment.