Skip to content

Commit

Permalink
Refactors Snapshotter to timer instead of checking frequency directly
Browse files Browse the repository at this point in the history
  • Loading branch information
Karsten Kraus authored and Argelbargel committed Sep 13, 2023
1 parent 88e0e90 commit ff31aa1
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 21 deletions.
8 changes: 3 additions & 5 deletions cmd/vault-raft-snapshot-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (
"os"
"os/signal"
"syscall"
"time"

"github.com/urfave/cli/v2"

Expand Down Expand Up @@ -110,6 +109,7 @@ func startSnapshotter(configFile cli.Path) {
log.Fatalf("Cannot instantiate snapshotter: %s\n", err)
}

internal.WatchConfigAndReconfigure(snapshotter)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -124,15 +124,13 @@ func startSnapshotter(configFile cli.Path) {
}

func runSnapshotter(ctx context.Context, snapshotter *internal.Snapshotter) {
internal.WatchConfigAndReconfigure(snapshotter)

for {
frequency, err := snapshotter.TakeSnapshot(ctx)
timeout, err := snapshotter.TakeSnapshot(ctx)
if err != nil {
log.Printf("Could not take snapshot or upload to all targets: %v\n", err)
}
select {
case <-time.After(frequency):
case <-timeout.C:
continue
case <-ctx.Done():
os.Exit(1)
Expand Down
46 changes: 37 additions & 9 deletions internal/app/vault_raft_snapshot_agent/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ type SnapshotConfig struct {
}

type Snapshotter struct {
lock sync.Mutex
client *vault.VaultClient
uploaders []upload.Uploader
config SnapshotConfig
lock sync.Mutex
client *vault.VaultClient
uploaders []upload.Uploader
config SnapshotConfig
lastSnapshot time.Time
snapshotTimer *time.Timer
}

func CreateSnapshotter(config SnapshotterConfig) (*Snapshotter, error) {
Expand Down Expand Up @@ -65,15 +67,18 @@ func (s *Snapshotter) Configure(config SnapshotConfig, client *vault.VaultClient
s.client = client
s.uploaders = uploaders
s.config = config
s.updateTimer(config.Frequency)
}

func (s *Snapshotter) TakeSnapshot(ctx context.Context) (time.Duration, error) {
func (s *Snapshotter) TakeSnapshot(ctx context.Context) (*time.Timer, error) {
s.lock.Lock()
defer s.lock.Unlock()

s.resetTimer()

snapshot, err := os.CreateTemp("", "snapshot")
if err != nil {
return s.config.Frequency, err
return s.snapshotTimer, err
}

defer os.Remove(snapshot.Name())
Expand All @@ -83,15 +88,15 @@ func (s *Snapshotter) TakeSnapshot(ctx context.Context) (time.Duration, error) {

err = s.client.TakeSnapshot(ctx, snapshot)
if err != nil {
return s.config.Frequency, err
return s.snapshotTimer, err
}

_, err = snapshot.Seek(0, io.SeekStart)
if err != nil {
return s.config.Frequency, err
return s.snapshotTimer, err
}

return s.config.Frequency, s.uploadSnapshot(ctx, snapshot, time.Now().Format(s.config.TimestampFormat))
return s.snapshotTimer, s.uploadSnapshot(ctx, snapshot, time.Now().Format(s.config.TimestampFormat))
}

func (s *Snapshotter) uploadSnapshot(ctx context.Context, snapshot io.Reader, timestamp string) error {
Expand All @@ -107,3 +112,26 @@ func (s *Snapshotter) uploadSnapshot(ctx context.Context, snapshot io.Reader, ti

return errs
}

func (s *Snapshotter) resetTimer() {
s.lastSnapshot = time.Now()
s.snapshotTimer = time.NewTimer(s.config.Frequency)
}

func (s *Snapshotter) updateTimer(frequency time.Duration) {
if s.snapshotTimer != nil {
if !s.snapshotTimer.Stop() {
<-s.snapshotTimer.C
}

now := time.Now()
timeout := time.Duration(0)

nextSnapshot := s.lastSnapshot.Add(frequency)
if nextSnapshot.After(now) {
timeout = nextSnapshot.Sub(now)
}

s.snapshotTimer.Reset(timeout)
}
}
143 changes: 136 additions & 7 deletions internal/app/vault_raft_snapshot_agent/snapshotter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,11 @@ func TestSnapshotterLocksConfigure(t *testing.T) {

assert.GreaterOrEqual(t, time.Since(start), clientAPIStub.snapshotRuntime+250, "TakeSnapshot did not prevent re-configuration during snapshots")

frequency, err := snapshotter.TakeSnapshot(context.Background())
timer, err := snapshotter.TakeSnapshot(context.Background())

assert.NotNil(t, timer)
assert.NoError(t, err, "TakeSnapshot failed unexpectedly")
assert.Equal(t, newConfig.Frequency, frequency, "Snaphotter did not re-configure propertly")
assert.Equal(t, newConfig.Frequency, snapshotter.config.Frequency, "Snaphotter did not re-configure propertly")
}

func TestSnapshotterAbortsAfterTimeout(t *testing.T) {
Expand Down Expand Up @@ -191,20 +192,148 @@ func TestSnapshotterContinuesUploadingIfUploadFails(t *testing.T) {
assert.True(t, uploaderStub2.uploaded, "TakeSnapshot did not upload to second uploader")
}

func TestSnapshotterReturnsFrequency(t *testing.T) {
clientAPIStub := snapshotterVaultClientAPIStub{}
func TestSnapshotterResetsTimer(t *testing.T) {
clientAPIStub := snapshotterVaultClientAPIStub{leader: true}
uploaderStub := uploaderStub{}

config := SnapshotConfig{
Frequency: time.Minute,
Frequency: time.Second,
}

snapshotter := Snapshotter{}
snapshotter.Configure(config, vault.NewClient("http://127.0.0.1:8200", &clientAPIStub, nil), []upload.Uploader{&uploaderStub})

frequency, _ := snapshotter.TakeSnapshot(context.Background())
start := time.Now()
timer, err := snapshotter.TakeSnapshot(context.Background())

assert.NotNil(t, timer)
assert.NoError(t, err)

for {
<-timer.C
break
}

assert.GreaterOrEqual(t, time.Since(start), time.Second)
assert.Less(t, time.Since(start), 2*time.Second)
assert.Equal(t, config.Frequency, snapshotter.config.Frequency)
}

func TestSnapshotterResetsTimerOnError(t *testing.T) {
clientAPIStub := snapshotterVaultClientAPIStub{leader: false}
uploaderStub := uploaderStub{}

config := SnapshotConfig{
Frequency: time.Second,
}

snapshotter := Snapshotter{}
snapshotter.Configure(config, vault.NewClient("http://127.0.0.1:8200", &clientAPIStub, nil), []upload.Uploader{&uploaderStub})

start := time.Now()
timer, err := snapshotter.TakeSnapshot(context.Background())
assert.NotNil(t, timer)
assert.Error(t, err)

for {
<-timer.C
break
}

assert.GreaterOrEqual(t, time.Since(start), time.Second)
assert.Less(t, time.Since(start), 2*time.Second)
assert.Equal(t, config.Frequency, snapshotter.config.Frequency)
}

func TestSnapshotterUpdatesTimerOnConfigureForGreaterFrequency(t *testing.T) {
clientAPIStub := snapshotterVaultClientAPIStub{leader: false}
uploaderStub := uploaderStub{}

config := SnapshotConfig{
Frequency: time.Second,
}

snapshotter := Snapshotter{}
snapshotter.Configure(config, vault.NewClient("http://127.0.0.1:8200", &clientAPIStub, nil), []upload.Uploader{&uploaderStub})

start := time.Now()
timer, _ := snapshotter.TakeSnapshot(context.Background())

newConfig := SnapshotConfig{
Frequency: time.Second * 2,
}

snapshotter.Configure(newConfig, vault.NewClient("http://127.0.0.1:8200", &clientAPIStub, nil), []upload.Uploader{&uploaderStub})

for {
<-timer.C
break
}

assert.GreaterOrEqual(t, time.Since(start), 2*time.Second)
assert.Less(t, time.Since(start), 3*time.Second)
assert.Equal(t, newConfig.Frequency, snapshotter.config.Frequency)
}

func TestSnapshotterUpdatesTimerOnConfigureForLesserFrequency(t *testing.T) {
clientAPIStub := snapshotterVaultClientAPIStub{leader: false}
uploaderStub := uploaderStub{}

config := SnapshotConfig{
Frequency: time.Second,
}

snapshotter := Snapshotter{}
snapshotter.Configure(config, vault.NewClient("http://127.0.0.1:8200", &clientAPIStub, nil), []upload.Uploader{&uploaderStub})

start := time.Now()
timer, _ := snapshotter.TakeSnapshot(context.Background())

newConfig := SnapshotConfig{
Frequency: time.Millisecond * 500,
}

snapshotter.Configure(newConfig, vault.NewClient("http://127.0.0.1:8200", &clientAPIStub, nil), []upload.Uploader{&uploaderStub})

for {
<-timer.C
break
}

assert.GreaterOrEqual(t, time.Since(start), 500*time.Millisecond)
assert.Less(t, time.Since(start), 750*time.Millisecond)
assert.Equal(t, newConfig.Frequency, snapshotter.config.Frequency)
}

func TestSnapshotterTriggersTimerOnConfigureForLesserFrequency(t *testing.T) {
clientAPIStub := snapshotterVaultClientAPIStub{leader: false}
uploaderStub := uploaderStub{}

config := SnapshotConfig{
Frequency: time.Second,
}

snapshotter := Snapshotter{}
snapshotter.Configure(config, vault.NewClient("http://127.0.0.1:8200", &clientAPIStub, nil), []upload.Uploader{&uploaderStub})

timer, _ := snapshotter.TakeSnapshot(context.Background())
time.Sleep(time.Millisecond * 500)

newConfig := SnapshotConfig{
Frequency: time.Millisecond * 300,
}

start := time.Now()
snapshotter.Configure(newConfig, vault.NewClient("http://127.0.0.1:8200", &clientAPIStub, nil), []upload.Uploader{&uploaderStub})

for {

<-timer.C
break
}

assert.Equal(t, config.Frequency, frequency)
assert.LessOrEqual(t, time.Since(start), 10*time.Millisecond)
assert.Equal(t, newConfig.Frequency, snapshotter.config.Frequency)
}

type snapshotterVaultClientAPIStub struct {
Expand Down

0 comments on commit ff31aa1

Please sign in to comment.