From 3787392a439e584020c8bc93e5c2f31bf7c20cdf Mon Sep 17 00:00:00 2001 From: tg Date: Tue, 28 May 2019 19:54:10 +0300 Subject: [PATCH] wp --- merkledag.go | 57 +++---------- merkledag_test.go | 55 ++++++++++++- progresstracker.go | 198 +++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 262 insertions(+), 48 deletions(-) create mode 100644 progresstracker.go diff --git a/merkledag.go b/merkledag.go index 3153cf4..a76853d 100644 --- a/merkledag.go +++ b/merkledag.go @@ -22,11 +22,6 @@ func init() { ipld.Register(cid.DagCBOR, ipldcbor.DecodeBlock) } -// contextKey is a type to use as value for the ProgressTracker contexts. -type contextKey string - -const progressContextKey contextKey = "progress" - // NewDAGService constructs a new DAGService (using the default implementation). // Note that the default implementation is also an ipld.LinkGetter. func NewDAGService(bs bserv.BlockService) *dagService { @@ -196,19 +191,7 @@ func FetchGraphWithDepthLimit(ctx context.Context, root cid.Cid, depthLim int, s return false } - v, _ := ctx.Value(progressContextKey).(*ProgressTracker) - if v == nil { - return EnumerateChildrenAsyncDepth(ctx, GetLinksDirect(ng), root, 0, visit) - } - - visitProgress := func(c cid.Cid, depth int) bool { - if visit(c, depth) { - v.Increment() - return true - } - return false - } - return EnumerateChildrenAsyncDepth(ctx, GetLinksDirect(ng), root, 0, visitProgress) + return EnumerateChildrenAsyncDepth(ctx, GetLinksDirect(ng), root, 0, visit) } // GetMany gets many nodes from the DAG at once. @@ -314,32 +297,6 @@ func EnumerateChildrenDepth(ctx context.Context, getLinks GetLinks, root cid.Cid return nil } -// ProgressTracker is used to show progress when fetching nodes. -type ProgressTracker struct { - Total int - lk sync.Mutex -} - -// DeriveContext returns a new context with value "progress" derived from -// the given one. -func (p *ProgressTracker) DeriveContext(ctx context.Context) context.Context { - return context.WithValue(ctx, progressContextKey, p) -} - -// Increment adds one to the total progress. -func (p *ProgressTracker) Increment() { - p.lk.Lock() - defer p.lk.Unlock() - p.Total++ -} - -// Value returns the current progress. -func (p *ProgressTracker) Value() int { - p.lk.Lock() - defer p.lk.Unlock() - return p.Total -} - // FetchGraphConcurrency is total number of concurrent fetches that // 'fetchNodes' will start at a time var FetchGraphConcurrency = 32 @@ -361,6 +318,11 @@ func EnumerateChildrenAsync(ctx context.Context, getLinks GetLinks, c cid.Cid, v // // NOTE: It *does not* make multiple concurrent calls to the passed `visit` function. func EnumerateChildrenAsyncDepth(ctx context.Context, getLinks GetLinks, c cid.Cid, startDepth int, visit func(cid.Cid, int) bool) error { + progressTracker := GetProgressTracker(ctx) + if progressTracker != nil { + defer progressTracker.allFinished() + } + type cidDepth struct { cid cid.Cid depth int @@ -395,6 +357,10 @@ func EnumerateChildrenAsyncDepth(ctx context.Context, getLinks GetLinks, c cid.C setlk.Unlock() if shouldVisit { + if progressTracker != nil { + progressTracker.plannedToFetch(ci) + } + links, err := getLinks(ctx, ci) if err != nil { select { @@ -403,6 +369,9 @@ func EnumerateChildrenAsyncDepth(ctx context.Context, getLinks GetLinks, c cid.C } return } + if progressTracker != nil { + progressTracker.fetched(ci) + } outLinks := &linksDepth{ links: links, diff --git a/merkledag_test.go b/merkledag_test.go index bc87f3b..25c00b7 100644 --- a/merkledag_test.go +++ b/merkledag_test.go @@ -755,17 +755,64 @@ func testProgressIndicator(t *testing.T, depth int) { top, numChildren := mkDag(ds, depth) - v := new(ProgressTracker) - ctx := v.DeriveContext(context.Background()) + tracker := NewProgressTracker(true) + ctx, cancel := WithProgressTracker(context.Background(), tracker) + defer cancel() err := FetchGraph(ctx, top, ds) if err != nil { t.Fatal(err) } - if v.Value() != numChildren+1 { + allEvents := make(ProgressTrackerEvents, 0) + for event := range tracker.Events { + allEvents = append(allEvents, event) + } + + if trackerValue := tracker.Value(); trackerValue != numChildren+1 { t.Errorf("wrong number of children reported in progress indicator, expected %d, got %d", - numChildren+1, v.Value()) + numChildren+1, trackerValue) + } + + // planned to fetch, fetched must be symmetrical for each cid + if expected, actual := (numChildren+1)*2, len(allEvents); expected != actual { + t.Errorf("wrong number of tracked events in progress indicator, expected %d, got %d", + expected, actual) + } + + // group events by cid + eventsByCid := make(map[cid.Cid]ProgressTrackerEvents, (numChildren+1)*2) + for _, event := range allEvents { + events, found := eventsByCid[event.Cid] + if !found { + events = make(ProgressTrackerEvents, 0, 2) + } + + events = append(events, event) + } + + // check if all events present for each cid + for cid, cidEvents := range eventsByCid { + if len(cidEvents) != 2 { + t.Errorf("can not find matched events for cid: %v", cid) + } + + for _, eventType := range []ProgressTrackerEventType{ + ProgressTrackerEventTypePlannedToFetch, + ProgressTrackerEventTypeFetched, + } { + found := false + for _, cidEvent := range cidEvents { + if cidEvent.Type == eventType { + found = true + break + } + } + + if !found { + t.Errorf("can not find an event %s for cid %s", eventType, cid) + } + } } } diff --git a/progresstracker.go b/progresstracker.go new file mode 100644 index 0000000..c5861f2 --- /dev/null +++ b/progresstracker.go @@ -0,0 +1,198 @@ +package merkledag + +import ( + "context" + "sync" + "sync/atomic" + + cid "github.com/ipfs/go-cid" +) + +// contextKey is a type to use as value for the ProgressTracker contexts. +type progressTrackerContextKey string + +const progressContextKey progressTrackerContextKey = "progress" + +// ProgressTracker is used to show progress when fetching nodes. +type ProgressTracker struct { + Events chan *ProgressTrackerEvent + + total int32 + + trackEvents bool + lk sync.Mutex + + bufferedEvents ProgressTrackerEvents + inEvents chan *ProgressTrackerEvent + newEventsSignal chan struct{} + + onceStop sync.Once + stop chan struct{} + stopped chan struct{} +} + +// WithProgressTracker returns a new context with value "progress" derived from +// the given one. +func WithProgressTracker(ctx context.Context, p *ProgressTracker) (nCtx context.Context, cancel func()) { + return context.WithValue(ctx, progressContextKey, p), func() { p.Stop() } +} + +// NewProgressTracker returns progress tracker instance. Verbose turns on events tracking +func NewProgressTracker(trackEvents bool) *ProgressTracker { + tracker := &ProgressTracker{ + Events: make(chan *ProgressTrackerEvent), + trackEvents: trackEvents, + inEvents: make(chan *ProgressTrackerEvent), + newEventsSignal: make(chan struct{}, 1), + + stop: make(chan struct{}), + stopped: make(chan struct{}, 2), // in, out pumps + } + + if trackEvents { + go tracker.inPump() + go tracker.outPump() + } + + return tracker +} + +// GetProgressTracker returns a progress tracker instance if present +func GetProgressTracker(ctx context.Context) *ProgressTracker { + v, _ := ctx.Value(progressContextKey).(*ProgressTracker) + return v +} + +// Value returns the current progress. +func (p *ProgressTracker) Value() int { + return int(atomic.LoadInt32(&p.total)) +} + +// Stop stops the "pump" goroutines and clear the resources +func (p *ProgressTracker) Stop() { + if p.trackEvents { + return + } + + p.onceStop.Do(func() { + close(p.stop) + }) + + // in, out pumps + <-p.stopped + <-p.stopped + + close(p.stopped) +} + +func (p *ProgressTracker) addNewEventToBuffer(event *ProgressTrackerEvent) { + p.lk.Lock() + p.bufferedEvents = append(p.bufferedEvents, event) + p.lk.Unlock() + + select { + case p.newEventsSignal <- struct{}{}: + default: + } +} + +func (p *ProgressTracker) inPump() { + defer func() { + p.stopped <- struct{}{} + close(p.newEventsSignal) + }() + + for { + select { + case event, more := <-p.inEvents: + if !more { + return + } + + p.addNewEventToBuffer(event) + case <-p.stop: + return + } + } +} + +func (p *ProgressTracker) outPump() { + defer func() { + p.stopped <- struct{}{} + close(p.Events) + }() + + finished := false + for { + _, more := <-p.newEventsSignal + if !more { + finished = true + } + + p.lk.Lock() + eventsToSend := p.bufferedEvents + p.bufferedEvents = p.bufferedEvents[:0] + p.lk.Unlock() + + for _, event := range eventsToSend { + select { + case p.Events <- event: + case <-p.stop: + return + } + } + + if finished { + return + } + } +} + +func (p *ProgressTracker) plannedToFetch(c cid.Cid) { + if !p.trackEvents { + return + } + + p.inEvents <- &ProgressTrackerEvent{ + Type: ProgressTrackerEventTypePlannedToFetch, + Cid: c, + } +} + +func (p *ProgressTracker) fetched(c cid.Cid) { + atomic.AddInt32(&p.total, 1) + + if !p.trackEvents { + return + } + + p.inEvents <- &ProgressTrackerEvent{ + Type: ProgressTrackerEventTypeFetched, + Cid: c, + } +} + +func (p *ProgressTracker) allFinished() { + close(p.inEvents) +} + +const ( + // ProgressTrackerEventTypePlannedToFetch signals that a cid is planned to fetch + ProgressTrackerEventTypePlannedToFetch ProgressTrackerEventType = "planned to fetch" + // ProgressTrackerEventTypeFetched signals that a cid was fetched + ProgressTrackerEventTypeFetched ProgressTrackerEventType = "was fetched" +) + +type ( + // ProgressTrackerEventType describes an event type + ProgressTrackerEventType string + + // ProgressTrackerEvent is used to show what exactly has happened + ProgressTrackerEvent struct { + Type ProgressTrackerEventType + Cid cid.Cid + } + + // ProgressTrackerEvents is a list of events in a chronological order + ProgressTrackerEvents []*ProgressTrackerEvent +)