Skip to content
This repository has been archived by the owner on Jun 20, 2023. It is now read-only.

Commit

Permalink
Merge pull request #25 from ipfs/fix/two-bugs
Browse files Browse the repository at this point in the history
fix several bugs
  • Loading branch information
Stebalien authored Apr 28, 2020
2 parents 0505db7 + 3c0bbfc commit 6d2a4b9
Show file tree
Hide file tree
Showing 2 changed files with 146 additions and 55 deletions.
118 changes: 64 additions & 54 deletions simple/reprovide.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package simple

import (
"context"
"errors"
"fmt"
"time"

Expand All @@ -18,14 +19,21 @@ import (

var logR = logging.Logger("reprovider.simple")

// ErrClosed is returned by Trigger when operating on a closed reprovider.
var ErrClosed = errors.New("reprovider service stopped")

// KeyChanFunc is function streaming CIDs to pass to content routing
type KeyChanFunc func(context.Context) (<-chan cid.Cid, error)
type doneFunc func(error)

// Reprovider reannounces blocks to the network
type Reprovider struct {
ctx context.Context
trigger chan doneFunc
// Reprovider context. Cancel to stop, then wait on closedCh.
ctx context.Context
cancel context.CancelFunc
closedCh chan struct{}

// Trigger triggers a reprovide.
trigger chan chan<- error

// The routing system to provide values through
rsys routing.ContentRouting
Expand All @@ -37,9 +45,12 @@ type Reprovider struct {

// NewReprovider creates new Reprovider instance.
func NewReprovider(ctx context.Context, reprovideIniterval time.Duration, rsys routing.ContentRouting, keyProvider KeyChanFunc) *Reprovider {
ctx, cancel := context.WithCancel(ctx)
return &Reprovider{
ctx: ctx,
trigger: make(chan doneFunc),
ctx: ctx,
cancel: cancel,
closedCh: make(chan struct{}),
trigger: make(chan chan<- error),

rsys: rsys,
keyProvider: keyProvider,
Expand All @@ -49,44 +60,60 @@ func NewReprovider(ctx context.Context, reprovideIniterval time.Duration, rsys r

// Close the reprovider
func (rp *Reprovider) Close() error {
rp.cancel()
<-rp.closedCh
return nil
}

// Run re-provides keys with 'tick' interval or when triggered
func (rp *Reprovider) Run() {
// dont reprovide immediately.
// may have just started the daemon and shutting it down immediately.
// probability( up another minute | uptime ) increases with uptime.
after := time.After(time.Minute)
var done doneFunc
for {
if rp.tick == 0 {
after = make(chan time.Time)
defer close(rp.closedCh)

var initialReprovideCh, reprovideCh <-chan time.Time

// If reproviding is enabled (non-zero)
if rp.tick > 0 {
reprovideTicker := time.NewTicker(rp.tick)
defer reprovideTicker.Stop()
reprovideCh = reprovideTicker.C

// If the reprovide ticker is larger than a minute (likely),
// provide once after we've been up a minute.
//
// Don't provide _immediately_ as we might be just about to stop.
if rp.tick > time.Minute {
initialReprovideTimer := time.NewTimer(time.Minute)
defer initialReprovideTimer.Stop()

initialReprovideCh = initialReprovideTimer.C
}
}

var done chan<- error
for rp.ctx.Err() == nil {
select {
case <-initialReprovideCh:
case <-reprovideCh:
case done = <-rp.trigger:
case <-rp.ctx.Done():
return
case done = <-rp.trigger:
case <-after:
}

//'mute' the trigger channel so when `ipfs bitswap reprovide` is called
//a 'reprovider is already running' error is returned
unmute := rp.muteTrigger()

err := rp.Reprovide()
if err != nil {

// only log if we've hit an actual error, otherwise just tell the client we're shutting down
if rp.ctx.Err() != nil {
err = ErrClosed
} else if err != nil {
logR.Errorf("failed to reprovide: %s", err)
}

if done != nil {
done(err)
if err != nil {
done <- err
}
close(done)
}

unmute()

after = time.After(rp.tick)
}
}

Expand Down Expand Up @@ -119,44 +146,27 @@ func (rp *Reprovider) Reprovide() error {
return nil
}

// Trigger starts reprovision process in rp.Run and waits for it
// Trigger starts the reprovision process in rp.Run and waits for it to finish.
//
// Returns an error if a reprovide is already in progress.
func (rp *Reprovider) Trigger(ctx context.Context) error {
progressCtx, done := context.WithCancel(ctx)

var err error
df := func(e error) {
err = e
done()
resultCh := make(chan error, 1)
select {
case rp.trigger <- resultCh:
default:
return fmt.Errorf("reprovider is already running")
}

select {
case err := <-resultCh:
return err
case <-rp.ctx.Done():
return context.Canceled
return ErrClosed
case <-ctx.Done():
return context.Canceled
case rp.trigger <- df:
<-progressCtx.Done()
return err
return ctx.Err()
}
}

func (rp *Reprovider) muteTrigger() context.CancelFunc {
ctx, cf := context.WithCancel(rp.ctx)
go func() {
defer cf()
for {
select {
case <-ctx.Done():
return
case done := <-rp.trigger:
done(fmt.Errorf("reprovider is already running"))
}
}
}()

return cf
}

// Strategies

// NewBlockstoreProvider returns key provider using bstore.AllKeysChan
Expand Down
83 changes: 82 additions & 1 deletion simple/reprovide_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,22 @@ func setupDag(t *testing.T) (nodes []cid.Cid, bstore blockstore.Blockstore) {
}

func TestReprovide(t *testing.T) {
testReprovide(t, func(r *Reprovider, ctx context.Context) error {
return r.Reprovide()
})
}

func TestTrigger(t *testing.T) {
testReprovide(t, func(r *Reprovider, ctx context.Context) error {
go r.Run()
time.Sleep(1 * time.Second)
defer r.Close()
err := r.Trigger(ctx)
return err
})
}

func testReprovide(t *testing.T, trigger func(r *Reprovider, ctx context.Context) error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -71,7 +87,7 @@ func TestReprovide(t *testing.T) {

keyProvider := NewBlockstoreProvider(bstore)
reprov := NewReprovider(ctx, time.Hour, clA, keyProvider)
err := reprov.Reprovide()
err := trigger(reprov, ctx)
if err != nil {
t.Fatal(err)
}
Expand All @@ -95,6 +111,71 @@ func TestReprovide(t *testing.T) {
}
}

func TestTriggerTwice(t *testing.T) {
// Ensure we can only trigger once at a time.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

clA, _, _, _ := setupRouting(t)

keyCh := make(chan cid.Cid)
startCh := make(chan struct{})
keyFunc := func(ctx context.Context) (<-chan cid.Cid, error) {
<-startCh
return keyCh, nil
}

reprov := NewReprovider(ctx, time.Hour, clA, keyFunc)
go reprov.Run()
defer reprov.Close()

// Wait for the reprovider to start, otherwise, the reprovider will
// think a concurrent reprovide is running.
//
// We _could_ fix this race... but that would be complexity for nothing.
// 1. We start a reprovide 1 minute after startup anyways.
// 2. The window is really narrow.
time.Sleep(1 * time.Second)

errCh := make(chan error, 2)

// Trigger in the background
go func() {
errCh <- reprov.Trigger(ctx)
}()

// Wait for the trigger to really start.
startCh <- struct{}{}

start := time.Now()
// Try to trigger again, this should fail immediately.
if err := reprov.Trigger(ctx); err == nil {
t.Fatal("expected an error")
}
if time.Since(start) > 10*time.Millisecond {
t.Fatal("expected reprovide to fail instantly")
}

// Let the trigger progress.
close(keyCh)

// Check the result.
err := <-errCh
if err != nil {
t.Fatal(err)
}

// Try to trigger again, this should work.
go func() {
errCh <- reprov.Trigger(ctx)
}()
startCh <- struct{}{}
err = <-errCh
if err != nil {
t.Fatal(err)
}
}

type mockPinner struct {
recursive []cid.Cid
direct []cid.Cid
Expand Down

0 comments on commit 6d2a4b9

Please sign in to comment.