Skip to content

Commit

Permalink
try
Browse files Browse the repository at this point in the history
  • Loading branch information
renaynay committed Jan 14, 2025
1 parent 3d6ec59 commit 9be3949
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 32 deletions.
20 changes: 16 additions & 4 deletions nodebuilder/pruner/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
baseComponents,
fx.Supply(fullAvailOpts),
fx.Provide(func(fa *fullavail.ShareAvailability) pruner.Pruner { return fa }),
checkPreviousRun(),
convertToPruned(),
)
case node.Bridge:
coreOpts := make([]core.Option, 0)
Expand All @@ -80,7 +80,7 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
fx.Provide(func(fa *fullavail.ShareAvailability) pruner.Pruner { return fa }),
fx.Supply(coreOpts),
fx.Supply(fullAvailOpts),
checkPreviousRun(),
convertToPruned(),
)
default:
panic("unknown node type")
Expand All @@ -97,13 +97,25 @@ func advertiseArchival(tp node.Type, pruneCfg *Config) fx.Option {
})
}

func checkPreviousRun() fx.Option {
// convertToPruned checks if the node is being converted to an archival node
// to a pruned node.
func convertToPruned() fx.Option {
return fx.Invoke(func(
ctx context.Context,
fa *fullavail.ShareAvailability,
p *pruner.Service,
) error {
convert, err := fa.ConvertToPruned(ctx)
lastPrunedHeight, err := p.LastPruned(ctx)
if err != nil {
return err
}

err = fullavail.DetectFirstRun(ctx, fa, lastPrunedHeight)
if err != nil {
return err
}

convert, err := fa.ConvertFromArchivalToPruned(ctx)
if err != nil {
return err
}
Expand Down
94 changes: 93 additions & 1 deletion nodebuilder/tests/prune_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package tests
import (
"bytes"
"context"
"encoding/json"
"testing"
"time"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/libp2p/go-libp2p/core/host"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -169,7 +172,7 @@ func TestArchivalBlobSync(t *testing.T) {
}
}

func TestConvertFromPrunedToArchival(t *testing.T) {
func TestDisallowConvertFromPrunedToArchival(t *testing.T) {
sw := swamp.NewSwamp(t, swamp.WithBlockTime(time.Second))
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
t.Cleanup(cancel)
Expand All @@ -193,3 +196,92 @@ func TestConvertFromPrunedToArchival(t *testing.T) {
assert.ErrorIs(t, full_avail.ErrDisallowRevertToArchival, err)
}
}

func TestDisallowConvertToArchivalViaLastPrunedCheck(t *testing.T) {
sw := swamp.NewSwamp(t, swamp.WithBlockTime(time.Second))
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
t.Cleanup(cancel)

var cp struct {
LastPrunedHeight uint64 `json:"last_pruned_height"`
FailedHeaders map[uint64]struct{} `json:"failed"`
}

for _, nt := range []node.Type{node.Bridge, node.Full} {
archivalCfg := nodebuilder.DefaultConfig(nt)

store := nodebuilder.MockStore(t, archivalCfg)
ds, err := store.Datastore()
require.NoError(t, err)

cp.LastPrunedHeight = 500
cp.FailedHeaders = make(map[uint64]struct{})
bin, err := json.Marshal(cp)
require.NoError(t, err)

prunerStore := namespace.Wrap(ds, datastore.NewKey("pruner"))
err = prunerStore.Put(ctx, datastore.NewKey("checkpoint"), bin)
require.NoError(t, err)

_, err = sw.NewNodeWithStore(nt, store)
require.Error(t, err)
assert.ErrorIs(t, full_avail.ErrDisallowRevertToArchival, err)
}
}

func TestConvertFromArchivalToPruned(t *testing.T) {
sw := swamp.NewSwamp(t, swamp.WithBlockTime(time.Second))
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
t.Cleanup(cancel)

var cp struct {
LastPrunedHeight uint64 `json:"last_pruned_height"`
FailedHeaders map[uint64]struct{} `json:"failed"`
}

for _, nt := range []node.Type{node.Bridge, node.Full} {
archivalCfg := nodebuilder.DefaultConfig(nt)

store := nodebuilder.MockStore(t, archivalCfg)
ds, err := store.Datastore()
require.NoError(t, err)

// the archival node has trimmed up to height 500
fullAvailStore := namespace.Wrap(ds, datastore.NewKey("full_avail"))
err = fullAvailStore.Put(ctx, datastore.NewKey("previous_run"), []byte("archival"))
require.NoError(t, err)

cp.LastPrunedHeight = 500
cp.FailedHeaders = make(map[uint64]struct{})
bin, err := json.Marshal(cp)
require.NoError(t, err)

prunerStore := namespace.Wrap(ds, datastore.NewKey("pruner"))
err = prunerStore.Put(ctx, datastore.NewKey("checkpoint"), bin)
require.NoError(t, err)

archivalNode := sw.MustNewNodeWithStore(nt, store)
err = archivalNode.Start(ctx)
require.NoError(t, err)
err = archivalNode.Stop(ctx)
require.NoError(t, err)

// convert to pruned node
pruningCfg := nodebuilder.DefaultConfig(nt)
pruningCfg.Pruner.EnableService = true
err = store.PutConfig(pruningCfg)
require.NoError(t, err)
pruningNode, err := sw.NewNodeWithStore(nt, store)
assert.NoError(t, err)
err = pruningNode.Start(ctx)
assert.NoError(t, err)
require.NoError(t, pruningNode.Stop(ctx))

// expect that the checkpoint has been overridden
bin, err = prunerStore.Get(ctx, datastore.NewKey("checkpoint"))
require.NoError(t, err)
err = json.Unmarshal(bin, &cp)
require.NoError(t, err)
assert.Equal(t, uint64(1), cp.LastPrunedHeight)
}
}
53 changes: 40 additions & 13 deletions share/availability/full/availability.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package full

import (
"bytes"
"context"
"errors"
"fmt"
Expand Down Expand Up @@ -127,28 +128,28 @@ func (fa *ShareAvailability) Prune(ctx context.Context, eh *header.ExtendedHeade
return fa.store.RemoveODSQ4(ctx, eh.Height(), eh.DAH.Hash())
}

var previousPrunedRunKey = datastore.NewKey("previous_run")
var (
previousModeKey = datastore.NewKey("previous_run")
pruned = []byte("pruned")
archival = []byte("archival")
)

// ConvertToPruned ensures that a node has not been run with pruning enabled before
// ConvertFromArchivalToPruned ensures that a node has not been run with pruning enabled before
// cannot revert to archival mode. It returns true only if the node is converting to
// pruned mode for the first time.
func (fa *ShareAvailability) ConvertToPruned(ctx context.Context) (bool, error) {
prevPruned, err := fa.ds.Has(ctx, previousPrunedRunKey)
func (fa *ShareAvailability) ConvertFromArchivalToPruned(ctx context.Context) (bool, error) {
prevMode, err := fa.ds.Get(ctx, previousModeKey)
if err != nil {
return false, fmt.Errorf("share/availability/full: failed to check previous pruned run in "+
"datastore: %w", err)
return false, err
}

// node has been run with pruning enabled previously and
// is attempting to revert to archival, do not allow
if prevPruned && fa.archival {
if bytes.Equal(prevMode, pruned) && fa.archival {
return false, ErrDisallowRevertToArchival
}

// if no previous pruned run has been recorded, record
// for the first time
if !prevPruned && !fa.archival {
err = fa.ds.Put(ctx, previousPrunedRunKey, []byte{})
if bytes.Equal(prevMode, archival) && !fa.archival {
// allow conversion from archival to pruned
err = fa.ds.Put(ctx, previousModeKey, pruned)
if err != nil {
return false, fmt.Errorf("share/availability/full: failed to updated pruning mode in "+
"datastore: %w", err)
Expand All @@ -159,3 +160,29 @@ func (fa *ShareAvailability) ConvertToPruned(ctx context.Context) (bool, error)
// no changes in pruning mode
return false, nil
}

// DetectFirstRun is a temporary function that serves to assist migration to the refactored pruner
// implementation (v0.21.0). It checks if the node has been run with pruning enabled before by checking
// if the pruner service ran before, and disallows running as an archival node in the case it has.
//
// TODO @renaynay: remove this function after a few releases.
func DetectFirstRun(ctx context.Context, fa *ShareAvailability, lastPrunedHeight uint64) error {
exists, err := fa.ds.Has(ctx, previousModeKey)
if err != nil {
return fmt.Errorf("share/availability/full: failed to check previous pruned run in "+
"datastore: %w", err)
}
if exists {
return nil
}

if fa.archival {
if lastPrunedHeight > 1 {
return ErrDisallowRevertToArchival
}

return fa.ds.Put(ctx, previousModeKey, archival)
}

return fa.ds.Put(ctx, previousModeKey, pruned)
}
69 changes: 55 additions & 14 deletions share/availability/full/availability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (

"github.com/golang/mock/gomock"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
ds_sync "github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -159,30 +160,33 @@ func TestDisallowRevertArchival(t *testing.T) {
t.Cleanup(cancel)

ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
nsWrapped := namespace.Wrap(ds, storePrefix)
err := nsWrapped.Put(ctx, previousModeKey, pruned)
require.NoError(t, err)

// create a pruned node instance (non-archival) for the first time
fa := NewShareAvailability(nil, nil, ds)

convert, err := fa.ConvertToPruned(ctx)
convert, err := fa.ConvertFromArchivalToPruned(ctx)
assert.NoError(t, err)
assert.True(t, convert)
assert.False(t, convert)
// ensure availability impl recorded the pruned run
has, err := fa.ds.Has(ctx, previousPrunedRunKey)
prevMode, err := fa.ds.Get(ctx, previousModeKey)
require.NoError(t, err)
assert.True(t, has)
assert.Equal(t, pruned, prevMode)

// now change to archival mode
fa = NewShareAvailability(nil, nil, ds, WithArchivalMode())

// ensure failure
convert, err = fa.ConvertToPruned(ctx)
convert, err = fa.ConvertFromArchivalToPruned(ctx)
assert.Error(t, err)
assert.ErrorIs(t, err, ErrDisallowRevertToArchival)
assert.False(t, convert)

// ensure the node can still run in pruned mode
fa = NewShareAvailability(nil, nil, ds)
convert, err = fa.ConvertToPruned(ctx)
convert, err = fa.ConvertFromArchivalToPruned(ctx)
assert.NoError(t, err)
assert.False(t, convert)
}
Expand All @@ -194,24 +198,61 @@ func TestAllowConversionFromArchivalToPruned(t *testing.T) {
t.Cleanup(cancel)

ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
nsWrapped := namespace.Wrap(ds, storePrefix)
err := nsWrapped.Put(ctx, previousModeKey, archival)
require.NoError(t, err)

fa := NewShareAvailability(nil, nil, ds, WithArchivalMode())

convert, err := fa.ConvertToPruned(ctx)
convert, err := fa.ConvertFromArchivalToPruned(ctx)
assert.NoError(t, err)
assert.False(t, convert)

has, err := fa.ds.Has(ctx, previousPrunedRunKey)
require.NoError(t, err)
assert.False(t, has)

fa = NewShareAvailability(nil, nil, ds)

convert, err = fa.ConvertToPruned(ctx)
convert, err = fa.ConvertFromArchivalToPruned(ctx)
assert.NoError(t, err)
assert.True(t, convert)

has, err = fa.ds.Has(ctx, previousPrunedRunKey)
prevMode, err := fa.ds.Get(ctx, previousModeKey)
require.NoError(t, err)
assert.True(t, has)
assert.Equal(t, pruned, prevMode)
}

func TestDetectFirstRun(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

t.Run("FirstRunArchival", func(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())

fa := NewShareAvailability(nil, nil, ds, WithArchivalMode())
err := DetectFirstRun(ctx, fa, 1)
assert.NoError(t, err)

prevMode, err := fa.ds.Get(ctx, previousModeKey)
require.NoError(t, err)
assert.Equal(t, archival, prevMode)
})

t.Run("FirstRunPruned", func(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())

fa := NewShareAvailability(nil, nil, ds)
err := DetectFirstRun(ctx, fa, 1)
assert.NoError(t, err)

prevMode, err := fa.ds.Get(ctx, previousModeKey)
require.NoError(t, err)
assert.Equal(t, pruned, prevMode)
})

t.Run("RevertToArchivalNotAllowed", func(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())

fa := NewShareAvailability(nil, nil, ds, WithArchivalMode())
err := DetectFirstRun(ctx, fa, 500)
assert.Error(t, err)
assert.ErrorIs(t, err, ErrDisallowRevertToArchival)
})
}

0 comments on commit 9be3949

Please sign in to comment.