Skip to content

Commit

Permalink
address concerns from PR
Browse files Browse the repository at this point in the history
License: MIT
Signed-off-by: Jeromy <[email protected]>
  • Loading branch information
whyrusleeping committed Jul 8, 2015
1 parent 18688a2 commit 4f019d2
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 74 deletions.
126 changes: 68 additions & 58 deletions merkledag/merkledag.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,56 +111,29 @@ func (n *dagService) Remove(nd *Node) error {

// FetchGraph fetches all nodes that are children of the given node
func FetchGraph(ctx context.Context, root *Node, serv DAGService) error {
toprocess := make(chan []key.Key, 8)
nodes := make(chan *Node, 8)
errs := make(chan error, 1)

ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer close(toprocess)

go fetchNodes(ctx, serv, toprocess, nodes, errs)

nodes <- root
live := 1

for {
select {
case nd, ok := <-nodes:
if !ok {
return nil
}

var keys []key.Key
for _, lnk := range nd.Links {
keys = append(keys, key.Key(lnk.Hash))
}
keys = dedupeKeys(keys)
return EnumerateChildrenAsync(ctx, serv, root, key.NewKeySet())
}

// keep track of open request, when zero, we're done
live += len(keys) - 1
func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out chan<- *Node, errs chan<- error) {
defer close(out)

if live == 0 {
return nil
get := func(g NodeGetter) {
nd, err := g.Get(ctx)
if err != nil {
select {
case errs <- err:
case <-ctx.Done():
}
return
}

if len(keys) > 0 {
select {
case toprocess <- keys:
case <-ctx.Done():
return ctx.Err()
}
}
case err := <-errs:
return err
select {
case out <- nd:
case <-ctx.Done():
return ctx.Err()
return
}
}
}

func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out chan<- *Node, errs chan<- error) {
defer close(out)
for {
select {
case ks, ok := <-in:
Expand All @@ -170,22 +143,7 @@ func fetchNodes(ctx context.Context, ds DAGService, in <-chan []key.Key, out cha

ng := ds.GetNodes(ctx, ks)
for _, g := range ng {
go func(g NodeGetter) {
nd, err := g.Get(ctx)
if err != nil {
select {
case errs <- err:
case <-ctx.Done():
}
return
}

select {
case out <- nd:
case <-ctx.Done():
return
}
}(g)
go get(g)
}
}
}
Expand Down Expand Up @@ -334,3 +292,55 @@ func EnumerateChildren(ctx context.Context, ds DAGService, root *Node, set key.K
}
return nil
}

func EnumerateChildrenAsync(ctx context.Context, ds DAGService, root *Node, set key.KeySet) error {
toprocess := make(chan []key.Key, 8)
nodes := make(chan *Node, 8)
errs := make(chan error, 1)

ctx, cancel := context.WithCancel(ctx)
defer cancel()
defer close(toprocess)

go fetchNodes(ctx, ds, toprocess, nodes, errs)

nodes <- root
live := 1

for {
select {
case nd, ok := <-nodes:
if !ok {
return nil
}
// a node has been fetched
live--

var keys []key.Key
for _, lnk := range nd.Links {
k := key.Key(lnk.Hash)
if !set.Has(k) {
set.Add(k)
live++
keys = append(keys, k)
}
}

if live == 0 {
return nil
}

if len(keys) > 0 {
select {
case toprocess <- keys:
case <-ctx.Done():
return ctx.Err()
}
}
case err := <-errs:
return err
case <-ctx.Done():
return ctx.Err()
}
}
}
29 changes: 13 additions & 16 deletions merkledag/merkledag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -216,38 +216,35 @@ func runBatchFetchTest(t *testing.T, read io.Reader) {
}

func TestFetchGraph(t *testing.T) {
bsi := bstest.Mocks(t, 1)[0]
ds := NewDAGService(bsi)
var dservs []DAGService
bsis := bstest.Mocks(t, 2)
for _, bsi := range bsis {
dservs = append(dservs, NewDAGService(bsi))
}

read := io.LimitReader(u.NewTimeSeededRand(), 1024*32)
spl := &chunk.SizeSplitter{512}

root, err := imp.BuildDagFromReader(read, ds, spl, nil)
root, err := imp.BuildDagFromReader(read, dservs[0], spl, nil)
if err != nil {
t.Fatal(err)
}

err = FetchGraph(context.TODO(), root, ds)
err = FetchGraph(context.TODO(), root, dservs[1])
if err != nil {
t.Fatal(err)
}
}

func TestFetchGraphOther(t *testing.T) {
var dservs []DAGService
for _, bsi := range bstest.Mocks(t, 2) {
dservs = append(dservs, NewDAGService(bsi))
}

read := io.LimitReader(u.NewTimeSeededRand(), 1024*32)
spl := &chunk.SizeSplitter{512}

root, err := imp.BuildDagFromReader(read, dservs[0], spl, nil)
// create an offline dagstore and ensure all blocks were fetched
bs, err := bserv.New(bsis[1].Blockstore, offline.Exchange(bsis[1].Blockstore))
if err != nil {
t.Fatal(err)
}

err = FetchGraph(context.TODO(), root, dservs[1])
offline_ds := NewDAGService(bs)
ks := key.NewKeySet()

err = EnumerateChildren(context.Background(), offline_ds, root, ks)
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit 4f019d2

Please sign in to comment.