Skip to content

Commit

Permalink
wp
Browse files Browse the repository at this point in the history
  • Loading branch information
reinerRubin committed May 28, 2019
1 parent 5e8e186 commit 3787392
Show file tree
Hide file tree
Showing 3 changed files with 262 additions and 48 deletions.
57 changes: 13 additions & 44 deletions merkledag.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,6 @@ func init() {
ipld.Register(cid.DagCBOR, ipldcbor.DecodeBlock)
}

// contextKey is a type to use as value for the ProgressTracker contexts.
type contextKey string

const progressContextKey contextKey = "progress"

// NewDAGService constructs a new DAGService (using the default implementation).
// Note that the default implementation is also an ipld.LinkGetter.
func NewDAGService(bs bserv.BlockService) *dagService {
Expand Down Expand Up @@ -196,19 +191,7 @@ func FetchGraphWithDepthLimit(ctx context.Context, root cid.Cid, depthLim int, s
return false
}

v, _ := ctx.Value(progressContextKey).(*ProgressTracker)
if v == nil {
return EnumerateChildrenAsyncDepth(ctx, GetLinksDirect(ng), root, 0, visit)
}

visitProgress := func(c cid.Cid, depth int) bool {
if visit(c, depth) {
v.Increment()
return true
}
return false
}
return EnumerateChildrenAsyncDepth(ctx, GetLinksDirect(ng), root, 0, visitProgress)
return EnumerateChildrenAsyncDepth(ctx, GetLinksDirect(ng), root, 0, visit)
}

// GetMany gets many nodes from the DAG at once.
Expand Down Expand Up @@ -314,32 +297,6 @@ func EnumerateChildrenDepth(ctx context.Context, getLinks GetLinks, root cid.Cid
return nil
}

// ProgressTracker is used to show progress when fetching nodes.
type ProgressTracker struct {
Total int
lk sync.Mutex
}

// DeriveContext returns a new context with value "progress" derived from
// the given one.
func (p *ProgressTracker) DeriveContext(ctx context.Context) context.Context {
return context.WithValue(ctx, progressContextKey, p)
}

// Increment adds one to the total progress.
func (p *ProgressTracker) Increment() {
p.lk.Lock()
defer p.lk.Unlock()
p.Total++
}

// Value returns the current progress.
func (p *ProgressTracker) Value() int {
p.lk.Lock()
defer p.lk.Unlock()
return p.Total
}

// FetchGraphConcurrency is total number of concurrent fetches that
// 'fetchNodes' will start at a time
var FetchGraphConcurrency = 32
Expand All @@ -361,6 +318,11 @@ func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c cid.Cid, v
//
// NOTE: It *does not* make multiple concurrent calls to the passed `visit` function.
func EnumerateChildrenAsyncDepth(ctx context.Context, getLinks GetLinks, c cid.Cid, startDepth int, visit func(cid.Cid, int) bool) error {
progressTracker := GetProgressTracker(ctx)
if progressTracker != nil {
defer progressTracker.allFinished()
}

type cidDepth struct {
cid cid.Cid
depth int
Expand Down Expand Up @@ -395,6 +357,10 @@ func EnumerateChildrenAsyncDepth(ctx context.Context, getLinks GetLinks, c cid.C
setlk.Unlock()

if shouldVisit {
if progressTracker != nil {
progressTracker.plannedToFetch(ci)
}

links, err := getLinks(ctx, ci)
if err != nil {
select {
Expand All @@ -403,6 +369,9 @@ func EnumerateChildrenAsyncDepth(ctx context.Context, getLinks GetLinks, c cid.C
}
return
}
if progressTracker != nil {
progressTracker.fetched(ci)
}

outLinks := &linksDepth{
links: links,
Expand Down
55 changes: 51 additions & 4 deletions merkledag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,17 +755,64 @@ func testProgressIndicator(t *testing.T, depth int) {

top, numChildren := mkDag(ds, depth)

v := new(ProgressTracker)
ctx := v.DeriveContext(context.Background())
tracker := NewProgressTracker(true)
ctx, cancel := WithProgressTracker(context.Background(), tracker)
defer cancel()

err := FetchGraph(ctx, top, ds)
if err != nil {
t.Fatal(err)
}

if v.Value() != numChildren+1 {
allEvents := make(ProgressTrackerEvents, 0)
for event := range tracker.Events {
allEvents = append(allEvents, event)
}

if trackerValue := tracker.Value(); trackerValue != numChildren+1 {
t.Errorf("wrong number of children reported in progress indicator, expected %d, got %d",
numChildren+1, v.Value())
numChildren+1, trackerValue)
}

// planned to fetch, fetched must be symmetrical for each cid
if expected, actual := (numChildren+1)*2, len(allEvents); expected != actual {
t.Errorf("wrong number of tracked events in progress indicator, expected %d, got %d",
expected, actual)
}

// group events by cid
eventsByCid := make(map[cid.Cid]ProgressTrackerEvents, (numChildren+1)*2)
for _, event := range allEvents {
events, found := eventsByCid[event.Cid]
if !found {
events = make(ProgressTrackerEvents, 0, 2)
}

events = append(events, event)
}

// check if all events present for each cid
for cid, cidEvents := range eventsByCid {
if len(cidEvents) != 2 {
t.Errorf("can not find matched events for cid: %v", cid)
}

for _, eventType := range []ProgressTrackerEventType{
ProgressTrackerEventTypePlannedToFetch,
ProgressTrackerEventTypeFetched,
} {
found := false
for _, cidEvent := range cidEvents {
if cidEvent.Type == eventType {
found = true
break
}
}

if !found {
t.Errorf("can not find an event %s for cid %s", eventType, cid)
}
}
}
}

Expand Down
198 changes: 198 additions & 0 deletions progresstracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,198 @@
package merkledag

import (
"context"
"sync"
"sync/atomic"

cid "github.com/ipfs/go-cid"
)

// contextKey is a type to use as value for the ProgressTracker contexts.
type progressTrackerContextKey string

const progressContextKey progressTrackerContextKey = "progress"

// ProgressTracker is used to show progress when fetching nodes.
type ProgressTracker struct {
Events chan *ProgressTrackerEvent

total int32

trackEvents bool
lk sync.Mutex

bufferedEvents ProgressTrackerEvents
inEvents chan *ProgressTrackerEvent
newEventsSignal chan struct{}

onceStop sync.Once
stop chan struct{}
stopped chan struct{}
}

// WithProgressTracker returns a new context with value "progress" derived from
// the given one.
func WithProgressTracker(ctx context.Context, p *ProgressTracker) (nCtx context.Context, cancel func()) {
return context.WithValue(ctx, progressContextKey, p), func() { p.Stop() }
}

// NewProgressTracker returns progress tracker instance. Verbose turns on events tracking
func NewProgressTracker(trackEvents bool) *ProgressTracker {
tracker := &ProgressTracker{
Events: make(chan *ProgressTrackerEvent),
trackEvents: trackEvents,
inEvents: make(chan *ProgressTrackerEvent),
newEventsSignal: make(chan struct{}, 1),

stop: make(chan struct{}),
stopped: make(chan struct{}, 2), // in, out pumps
}

if trackEvents {
go tracker.inPump()
go tracker.outPump()
}

return tracker
}

// GetProgressTracker returns a progress tracker instance if present
func GetProgressTracker(ctx context.Context) *ProgressTracker {
v, _ := ctx.Value(progressContextKey).(*ProgressTracker)
return v
}

// Value returns the current progress.
func (p *ProgressTracker) Value() int {
return int(atomic.LoadInt32(&p.total))
}

// Stop stops the "pump" goroutines and clear the resources
func (p *ProgressTracker) Stop() {
if p.trackEvents {
return
}

p.onceStop.Do(func() {
close(p.stop)
})

// in, out pumps
<-p.stopped
<-p.stopped

close(p.stopped)
}

func (p *ProgressTracker) addNewEventToBuffer(event *ProgressTrackerEvent) {
p.lk.Lock()
p.bufferedEvents = append(p.bufferedEvents, event)
p.lk.Unlock()

select {
case p.newEventsSignal <- struct{}{}:
default:
}
}

func (p *ProgressTracker) inPump() {
defer func() {
p.stopped <- struct{}{}
close(p.newEventsSignal)
}()

for {
select {
case event, more := <-p.inEvents:
if !more {
return
}

p.addNewEventToBuffer(event)
case <-p.stop:
return
}
}
}

func (p *ProgressTracker) outPump() {
defer func() {
p.stopped <- struct{}{}
close(p.Events)
}()

finished := false
for {
_, more := <-p.newEventsSignal
if !more {
finished = true
}

p.lk.Lock()
eventsToSend := p.bufferedEvents
p.bufferedEvents = p.bufferedEvents[:0]
p.lk.Unlock()

for _, event := range eventsToSend {
select {
case p.Events <- event:
case <-p.stop:
return
}
}

if finished {
return
}
}
}

func (p *ProgressTracker) plannedToFetch(c cid.Cid) {
if !p.trackEvents {
return
}

p.inEvents <- &ProgressTrackerEvent{
Type: ProgressTrackerEventTypePlannedToFetch,
Cid: c,
}
}

func (p *ProgressTracker) fetched(c cid.Cid) {
atomic.AddInt32(&p.total, 1)

if !p.trackEvents {
return
}

p.inEvents <- &ProgressTrackerEvent{
Type: ProgressTrackerEventTypeFetched,
Cid: c,
}
}

func (p *ProgressTracker) allFinished() {
close(p.inEvents)
}

const (
// ProgressTrackerEventTypePlannedToFetch signals that a cid is planned to fetch
ProgressTrackerEventTypePlannedToFetch ProgressTrackerEventType = "planned to fetch"
// ProgressTrackerEventTypeFetched signals that a cid was fetched
ProgressTrackerEventTypeFetched ProgressTrackerEventType = "was fetched"
)

type (
// ProgressTrackerEventType describes an event type
ProgressTrackerEventType string

// ProgressTrackerEvent is used to show what exactly has happened
ProgressTrackerEvent struct {
Type ProgressTrackerEventType
Cid cid.Cid
}

// ProgressTrackerEvents is a list of events in a chronological order
ProgressTrackerEvents []*ProgressTrackerEvent
)

0 comments on commit 3787392

Please sign in to comment.