Skip to content

Commit

Permalink
improve FetchGraph
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Jeromy <[email protected]>
  • Loading branch information
whyrusleeping committed Jul 2, 2015
1 parent b9d8841 commit 0149e9e
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 43 deletions.
101 changes: 69 additions & 32 deletions merkledag/merkledag.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package merkledag

import (
"fmt"
"sync"

"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
blocks "github.com/ipfs/go-ipfs/blocks"
Expand Down Expand Up @@ -110,48 +109,86 @@ func (n *dagService) Remove(nd *Node) error {
return n.Blocks.DeleteBlock(k)
}

// FetchGraph asynchronously fetches all nodes that are children of the given
// node, and returns a channel that may be waited upon for the fetch to complete
func FetchGraph(ctx context.Context, root *Node, serv DAGService) <-chan error {
var keys []key.Key
for _, l := range root.Links {
keys = append(keys, key.Key(l.Hash))
}
// FetchGraph fetches all nodes that are children of the given node
func FetchGraph(ctx context.Context, root *Node, serv DAGService) error {
toprocess := make(chan []key.Key, 8)
nodes := make(chan *Node, 8)
errs := make(chan error, 1)

out := make(chan error)
nodes := serv.GetNodes(ctx, keys)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer close(toprocess)

var wg sync.WaitGroup
for _, n := range nodes {
wg.Add(1)
go func(n NodeGetter) {
defer wg.Done()
nd, err := n.Get(ctx)
if err != nil {
select {
case out <- err:
case <-ctx.Done():
}
return
go fetchNodes(ctx, serv, toprocess, nodes, errs)

nodes <- root
live := 1

for {
select {
case nd, ok := <-nodes:
if !ok {
return nil
}

err = <-FetchGraph(ctx, nd, serv)
if err != nil {
var keys []key.Key
for _, lnk := range nd.Links {
keys = append(keys, key.Key(lnk.Hash))
}
keys = dedupeKeys(keys)

// keep track of open request, when zero, we're done
live += len(keys) - 1

if live == 0 {
return nil
}

if len(keys) > 0 {
select {
case out <- err:
case toprocess <- keys:
case <-ctx.Done():
return ctx.Err()
}
return
}
}(n)
case err := <-errs:
return err
case <-ctx.Done():
return ctx.Err()
}
}
}

go func() {
wg.Wait()
close(out)
}()
func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out chan<- *Node, errs chan<- error) {
defer close(out)
for {
select {
case ks, ok := <-in:
if !ok {
return
}

return out
ng := ds.GetNodes(ctx, ks)
for _, g := range ng {
go func(g NodeGetter) {
nd, err := g.Get(ctx)
if err != nil {
select {
case errs <- err:
case <-ctx.Done():
}
return
}

select {
case out <- nd:
case <-ctx.Done():
return
}
}(g)
}
}
}
}

// FindLinks searches this nodes links for the given key,
Expand Down
8 changes: 2 additions & 6 deletions merkledag/merkledag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,7 @@ func TestFetchGraph(t *testing.T) {
t.Fatal(err)
}

done := FetchGraph(context.TODO(), root, ds)

err = <-done
err = FetchGraph(context.TODO(), root, ds)
if err != nil {
t.Fatal(err)
}
Expand All @@ -249,9 +247,7 @@ func TestFetchGraphOther(t *testing.T) {
t.Fatal(err)
}

done := FetchGraph(context.TODO(), root, dservs[1])

err = <-done
err = FetchGraph(context.TODO(), root, dservs[1])
if err != nil {
t.Fatal(err)
}
Expand Down
8 changes: 3 additions & 5 deletions pin/pin.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,9 @@ func (p *pinner) Pin(ctx context.Context, node *mdag.Node, recurse bool) error {
}

// fetch entire graph
done := mdag.FetchGraph(ctx, node, p.dserv)
select {
case <-ctx.Done():
return ctx.Err()
case <-done:
err := mdag.FetchGraph(ctx, node, p.dserv)
if err != nil {
return err
}

p.recursePin.AddBlock(k)
Expand Down

0 comments on commit 0149e9e

Please sign in to comment.