diff --git a/nodebuilder/pruner/migration_utils.go b/nodebuilder/pruner/migration_utils.go new file mode 100644 index 0000000000..efae8af760 --- /dev/null +++ b/nodebuilder/pruner/migration_utils.go @@ -0,0 +1,71 @@ +package pruner + +import ( + "bytes" + "context" + "fmt" + + "github.com/ipfs/go-datastore" + + fullavail "github.com/celestiaorg/celestia-node/share/availability/full" +) + +var ( + storePrefix = datastore.NewKey("full_avail") + previousModeKey = datastore.NewKey("previous_run") + pruned = []byte("pruned") + archival = []byte("archival") +) + +// convertFromArchivalToPruned ensures that a node has not been run with pruning enabled before +// cannot revert to archival mode. It returns true only if the node is converting to +// pruned mode for the first time. +func convertFromArchivalToPruned(ctx context.Context, cfg *Config, ds datastore.Datastore) (bool, error) { + prevMode, err := ds.Get(ctx, previousModeKey) + if err != nil { + return false, err + } + + if bytes.Equal(prevMode, pruned) && !cfg.EnableService { + return false, fullavail.ErrDisallowRevertToArchival + } + + if bytes.Equal(prevMode, archival) && cfg.EnableService { + // allow conversion from archival to pruned + err = ds.Put(ctx, previousModeKey, pruned) + if err != nil { + return false, fmt.Errorf("share/availability/full: failed to updated pruning mode in "+ + "datastore: %w", err) + } + return true, nil + } + + // no changes in pruning mode + return false, nil +} + +// detectFirstRun is a temporary function that serves to assist migration to the refactored pruner +// implementation (v0.21.0). It checks if the node has been run with pruning enabled before by checking +// if the pruner service ran before, and disallows running as an archival node in the case it has. +// +// TODO @renaynay: remove this function after a few releases. +func detectFirstRun(ctx context.Context, cfg *Config, ds datastore.Datastore, lastPrunedHeight uint64) error { + exists, err := ds.Has(ctx, previousModeKey) + if err != nil { + return fmt.Errorf("share/availability/full: failed to check previous pruned run in "+ + "datastore: %w", err) + } + if exists { + return nil + } + + if !cfg.EnableService { + if lastPrunedHeight > 1 { + return fullavail.ErrDisallowRevertToArchival + } + + return ds.Put(ctx, previousModeKey, archival) + } + + return ds.Put(ctx, previousModeKey, pruned) +} diff --git a/nodebuilder/pruner/migration_utils_test.go b/nodebuilder/pruner/migration_utils_test.go new file mode 100644 index 0000000000..868d0a4e94 --- /dev/null +++ b/nodebuilder/pruner/migration_utils_test.go @@ -0,0 +1,123 @@ +package pruner + +import ( + "context" + "testing" + + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" + ds_sync "github.com/ipfs/go-datastore/sync" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + fullavail "github.com/celestiaorg/celestia-node/share/availability/full" +) + +// TestDisallowRevertArchival tests that a node that has been previously run +// with full pruning cannot convert back into an "archival" node +func TestDisallowRevertArchival(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + // create a pruned node instance (non-archival) for the first time + cfg := &Config{EnableService: true} + ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) + nsWrapped := namespace.Wrap(ds, storePrefix) + err := nsWrapped.Put(ctx, previousModeKey, pruned) + require.NoError(t, err) + + convert, err := convertFromArchivalToPruned(ctx, cfg, nsWrapped) + assert.NoError(t, err) + assert.False(t, convert) + // ensure availability impl recorded the pruned run + prevMode, err := nsWrapped.Get(ctx, previousModeKey) + require.NoError(t, err) + assert.Equal(t, pruned, prevMode) + + // now change to archival mode + cfg.EnableService = false + + // ensure failure + convert, err = convertFromArchivalToPruned(ctx, cfg, nsWrapped) + assert.Error(t, err) + assert.ErrorIs(t, err, fullavail.ErrDisallowRevertToArchival) + assert.False(t, convert) + + // ensure the node can still run in pruned mode + cfg.EnableService = true + convert, err = convertFromArchivalToPruned(ctx, cfg, nsWrapped) + assert.NoError(t, err) + assert.False(t, convert) +} + +// TestAllowConversionFromArchivalToPruned tests that a node that has been previously run +// in archival mode can convert to a pruned node +func TestAllowConversionFromArchivalToPruned(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + + ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) + nsWrapped := namespace.Wrap(ds, storePrefix) + err := nsWrapped.Put(ctx, previousModeKey, archival) + require.NoError(t, err) + + cfg := &Config{EnableService: false} + + convert, err := convertFromArchivalToPruned(ctx, cfg, nsWrapped) + assert.NoError(t, err) + assert.False(t, convert) + + cfg.EnableService = true + + convert, err = convertFromArchivalToPruned(ctx, cfg, nsWrapped) + assert.NoError(t, err) + assert.True(t, convert) + + prevMode, err := nsWrapped.Get(ctx, previousModeKey) + require.NoError(t, err) + assert.Equal(t, pruned, prevMode) +} + +func TestDetectFirstRun(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + t.Run("FirstRunArchival", func(t *testing.T) { + ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) + nsWrapped := namespace.Wrap(ds, storePrefix) + + cfg := &Config{EnableService: false} + + err := detectFirstRun(ctx, cfg, nsWrapped, 1) + assert.NoError(t, err) + + prevMode, err := nsWrapped.Get(ctx, previousModeKey) + require.NoError(t, err) + assert.Equal(t, archival, prevMode) + }) + + t.Run("FirstRunPruned", func(t *testing.T) { + ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) + nsWrapped := namespace.Wrap(ds, storePrefix) + + cfg := &Config{EnableService: true} + + err := detectFirstRun(ctx, cfg, nsWrapped, 1) + assert.NoError(t, err) + + prevMode, err := nsWrapped.Get(ctx, previousModeKey) + require.NoError(t, err) + assert.Equal(t, pruned, prevMode) + }) + + t.Run("RevertToArchivalNotAllowed", func(t *testing.T) { + ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) + nsWrapped := namespace.Wrap(ds, storePrefix) + + cfg := &Config{EnableService: false} + + err := detectFirstRun(ctx, cfg, nsWrapped, 500) + assert.Error(t, err) + assert.ErrorIs(t, err, fullavail.ErrDisallowRevertToArchival) + }) +} diff --git a/nodebuilder/pruner/module.go b/nodebuilder/pruner/module.go index ca76c07711..65419b61af 100644 --- a/nodebuilder/pruner/module.go +++ b/nodebuilder/pruner/module.go @@ -3,6 +3,8 @@ package pruner import ( "context" + "github.com/ipfs/go-datastore" + "github.com/ipfs/go-datastore/namespace" logging "github.com/ipfs/go-log/v2" "go.uber.org/fx" @@ -102,20 +104,23 @@ func advertiseArchival(tp node.Type, pruneCfg *Config) fx.Option { func convertToPruned() fx.Option { return fx.Invoke(func( ctx context.Context, - fa *fullavail.ShareAvailability, + cfg *Config, + ds datastore.Batching, p *pruner.Service, ) error { + ds = namespace.Wrap(ds, storePrefix) + lastPrunedHeight, err := p.LastPruned(ctx) if err != nil { return err } - err = fullavail.DetectFirstRun(ctx, fa, lastPrunedHeight) + err = detectFirstRun(ctx, cfg, ds, lastPrunedHeight) if err != nil { return err } - convert, err := fa.ConvertFromArchivalToPruned(ctx) + convert, err := convertFromArchivalToPruned(ctx, cfg, ds) if err != nil { return err } diff --git a/nodebuilder/share/module.go b/nodebuilder/share/module.go index f6aac79521..3becb947c8 100644 --- a/nodebuilder/share/module.go +++ b/nodebuilder/share/module.go @@ -242,10 +242,9 @@ func availabilityComponents(tp node.Type, cfg *Config) fx.Option { fx.Provide(func( s *store.Store, getter shwap.Getter, - ds datastore.Batching, opts []full.Option, ) *full.ShareAvailability { - return full.NewShareAvailability(s, getter, ds, opts...) + return full.NewShareAvailability(s, getter, opts...) }), fx.Provide(func(avail *full.ShareAvailability) share.Availability { return avail diff --git a/share/availability/full/availability.go b/share/availability/full/availability.go index 14c44a9a94..16d57ea4df 100644 --- a/share/availability/full/availability.go +++ b/share/availability/full/availability.go @@ -1,14 +1,11 @@ package full import ( - "bytes" "context" "errors" "fmt" "time" - "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/namespace" logging "github.com/ipfs/go-log/v2" "github.com/celestiaorg/celestia-node/header" @@ -25,10 +22,7 @@ var ErrDisallowRevertToArchival = errors.New( "node has been run with pruner enabled before, it is not safe to convert to an archival" + "Run with --experimental-pruning enabled or consider re-initializing the store") -var ( - log = logging.Logger("share/full") - storePrefix = datastore.NewKey("full_avail") -) +var log = logging.Logger("share/full") // ShareAvailability implements share.Availability using the full data square // recovery technique. It is considered "full" because it is required @@ -37,8 +31,6 @@ type ShareAvailability struct { store *store.Store getter shwap.Getter - ds datastore.Datastore - storageWindow time.Duration archival bool } @@ -47,7 +39,6 @@ type ShareAvailability struct { func NewShareAvailability( store *store.Store, getter shwap.Getter, - ds datastore.Datastore, opts ...Option, ) *ShareAvailability { p := defaultParams() @@ -58,7 +49,6 @@ func NewShareAvailability( return &ShareAvailability{ store: store, getter: getter, - ds: namespace.Wrap(ds, storePrefix), storageWindow: availability.StorageWindow, archival: p.archival, } @@ -127,62 +117,3 @@ func (fa *ShareAvailability) Prune(ctx context.Context, eh *header.ExtendedHeade log.Debugf("removing block %s at height %d", eh.DAH.String(), eh.Height()) return fa.store.RemoveODSQ4(ctx, eh.Height(), eh.DAH.Hash()) } - -var ( - previousModeKey = datastore.NewKey("previous_run") - pruned = []byte("pruned") - archival = []byte("archival") -) - -// ConvertFromArchivalToPruned ensures that a node has not been run with pruning enabled before -// cannot revert to archival mode. It returns true only if the node is converting to -// pruned mode for the first time. -func (fa *ShareAvailability) ConvertFromArchivalToPruned(ctx context.Context) (bool, error) { - prevMode, err := fa.ds.Get(ctx, previousModeKey) - if err != nil { - return false, err - } - - if bytes.Equal(prevMode, pruned) && fa.archival { - return false, ErrDisallowRevertToArchival - } - - if bytes.Equal(prevMode, archival) && !fa.archival { - // allow conversion from archival to pruned - err = fa.ds.Put(ctx, previousModeKey, pruned) - if err != nil { - return false, fmt.Errorf("share/availability/full: failed to updated pruning mode in "+ - "datastore: %w", err) - } - return true, nil - } - - // no changes in pruning mode - return false, nil -} - -// DetectFirstRun is a temporary function that serves to assist migration to the refactored pruner -// implementation (v0.21.0). It checks if the node has been run with pruning enabled before by checking -// if the pruner service ran before, and disallows running as an archival node in the case it has. -// -// TODO @renaynay: remove this function after a few releases. -func DetectFirstRun(ctx context.Context, fa *ShareAvailability, lastPrunedHeight uint64) error { - exists, err := fa.ds.Has(ctx, previousModeKey) - if err != nil { - return fmt.Errorf("share/availability/full: failed to check previous pruned run in "+ - "datastore: %w", err) - } - if exists { - return nil - } - - if fa.archival { - if lastPrunedHeight > 1 { - return ErrDisallowRevertToArchival - } - - return fa.ds.Put(ctx, previousModeKey, archival) - } - - return fa.ds.Put(ctx, previousModeKey, pruned) -} diff --git a/share/availability/full/availability_test.go b/share/availability/full/availability_test.go index ccef73464f..24435ee0d8 100644 --- a/share/availability/full/availability_test.go +++ b/share/availability/full/availability_test.go @@ -6,9 +6,6 @@ import ( "time" "github.com/golang/mock/gomock" - "github.com/ipfs/go-datastore" - "github.com/ipfs/go-datastore/namespace" - ds_sync "github.com/ipfs/go-datastore/sync" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -36,7 +33,7 @@ func TestSharesAvailable(t *testing.T) { store, err := store.NewStore(store.DefaultParameters(), t.TempDir()) require.NoError(t, err) - avail := NewShareAvailability(store, getter, datastore.NewMapDatastore()) + avail := NewShareAvailability(store, getter) err = avail.SharesAvailable(ctx, eh) require.NoError(t, err) @@ -63,7 +60,7 @@ func TestSharesAvailable_StoredEds(t *testing.T) { store, err := store.NewStore(store.DefaultParameters(), t.TempDir()) require.NoError(t, err) - avail := NewShareAvailability(store, nil, datastore.NewMapDatastore()) + avail := NewShareAvailability(store, nil) err = store.PutODSQ4(ctx, roots, eh.Height(), eds) require.NoError(t, err) @@ -93,7 +90,7 @@ func TestSharesAvailable_ErrNotAvailable(t *testing.T) { store, err := store.NewStore(store.DefaultParameters(), t.TempDir()) require.NoError(t, err) - avail := NewShareAvailability(store, getter, datastore.NewMapDatastore()) + avail := NewShareAvailability(store, getter) errors := []error{shwap.ErrNotFound, context.DeadlineExceeded} for _, getterErr := range errors { @@ -117,7 +114,7 @@ func TestSharesAvailable_OutsideSamplingWindow_NonArchival(t *testing.T) { suite := headertest.NewTestSuite(t, 3, time.Nanosecond) headers := suite.GenExtendedHeaders(10) - avail := NewShareAvailability(store, getter, datastore.NewMapDatastore()) + avail := NewShareAvailability(store, getter) avail.storageWindow = time.Nanosecond // make all headers outside sampling window for _, h := range headers { @@ -143,7 +140,7 @@ func TestSharesAvailable_OutsideSamplingWindow_Archival(t *testing.T) { getter.EXPECT().GetEDS(gomock.Any(), gomock.Any()).Times(1).Return(eds, nil) - avail := NewShareAvailability(store, getter, datastore.NewMapDatastore(), WithArchivalMode()) + avail := NewShareAvailability(store, getter, WithArchivalMode()) avail.storageWindow = time.Nanosecond // make all headers outside sampling window err = avail.SharesAvailable(ctx, eh) @@ -152,107 +149,3 @@ func TestSharesAvailable_OutsideSamplingWindow_Archival(t *testing.T) { require.NoError(t, err) assert.True(t, has) } - -// TestDisallowRevertArchival tests that a node that has been previously run -// with full pruning cannot convert back into an "archival" node -func TestDisallowRevertArchival(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) - nsWrapped := namespace.Wrap(ds, storePrefix) - err := nsWrapped.Put(ctx, previousModeKey, pruned) - require.NoError(t, err) - - // create a pruned node instance (non-archival) for the first time - fa := NewShareAvailability(nil, nil, ds) - - convert, err := fa.ConvertFromArchivalToPruned(ctx) - assert.NoError(t, err) - assert.False(t, convert) - // ensure availability impl recorded the pruned run - prevMode, err := fa.ds.Get(ctx, previousModeKey) - require.NoError(t, err) - assert.Equal(t, pruned, prevMode) - - // now change to archival mode - fa = NewShareAvailability(nil, nil, ds, WithArchivalMode()) - - // ensure failure - convert, err = fa.ConvertFromArchivalToPruned(ctx) - assert.Error(t, err) - assert.ErrorIs(t, err, ErrDisallowRevertToArchival) - assert.False(t, convert) - - // ensure the node can still run in pruned mode - fa = NewShareAvailability(nil, nil, ds) - convert, err = fa.ConvertFromArchivalToPruned(ctx) - assert.NoError(t, err) - assert.False(t, convert) -} - -// TestAllowConversionFromArchivalToPruned tests that a node that has been previously run -// in archival mode can convert to a pruned node -func TestAllowConversionFromArchivalToPruned(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - t.Cleanup(cancel) - - ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) - nsWrapped := namespace.Wrap(ds, storePrefix) - err := nsWrapped.Put(ctx, previousModeKey, archival) - require.NoError(t, err) - - fa := NewShareAvailability(nil, nil, ds, WithArchivalMode()) - - convert, err := fa.ConvertFromArchivalToPruned(ctx) - assert.NoError(t, err) - assert.False(t, convert) - - fa = NewShareAvailability(nil, nil, ds) - - convert, err = fa.ConvertFromArchivalToPruned(ctx) - assert.NoError(t, err) - assert.True(t, convert) - - prevMode, err := fa.ds.Get(ctx, previousModeKey) - require.NoError(t, err) - assert.Equal(t, pruned, prevMode) -} - -func TestDetectFirstRun(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - t.Run("FirstRunArchival", func(t *testing.T) { - ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) - - fa := NewShareAvailability(nil, nil, ds, WithArchivalMode()) - err := DetectFirstRun(ctx, fa, 1) - assert.NoError(t, err) - - prevMode, err := fa.ds.Get(ctx, previousModeKey) - require.NoError(t, err) - assert.Equal(t, archival, prevMode) - }) - - t.Run("FirstRunPruned", func(t *testing.T) { - ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) - - fa := NewShareAvailability(nil, nil, ds) - err := DetectFirstRun(ctx, fa, 1) - assert.NoError(t, err) - - prevMode, err := fa.ds.Get(ctx, previousModeKey) - require.NoError(t, err) - assert.Equal(t, pruned, prevMode) - }) - - t.Run("RevertToArchivalNotAllowed", func(t *testing.T) { - ds := ds_sync.MutexWrap(datastore.NewMapDatastore()) - - fa := NewShareAvailability(nil, nil, ds, WithArchivalMode()) - err := DetectFirstRun(ctx, fa, 500) - assert.Error(t, err) - assert.ErrorIs(t, err, ErrDisallowRevertToArchival) - }) -}