Skip to content

Commit

Permalink
refactor(nodebuilder/pruner): Move all migration logic to nodebuilder…
Browse files Browse the repository at this point in the history
… pruner module
  • Loading branch information
renaynay committed Jan 16, 2025
1 parent 0b7ee0e commit 5c2b9a9
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 187 deletions.
71 changes: 71 additions & 0 deletions nodebuilder/pruner/migration_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package pruner

import (
"bytes"
"context"
"fmt"

"github.com/ipfs/go-datastore"

fullavail "github.com/celestiaorg/celestia-node/share/availability/full"
)

var (
storePrefix = datastore.NewKey("full_avail")
previousModeKey = datastore.NewKey("previous_run")
pruned = []byte("pruned")
archival = []byte("archival")
)

// convertFromArchivalToPruned ensures that a node has not been run with pruning enabled before
// cannot revert to archival mode. It returns true only if the node is converting to
// pruned mode for the first time.
func convertFromArchivalToPruned(ctx context.Context, cfg *Config, ds datastore.Datastore) (bool, error) {
prevMode, err := ds.Get(ctx, previousModeKey)
if err != nil {
return false, err
}

if bytes.Equal(prevMode, pruned) && !cfg.EnableService {
return false, fullavail.ErrDisallowRevertToArchival
}

if bytes.Equal(prevMode, archival) && cfg.EnableService {
// allow conversion from archival to pruned
err = ds.Put(ctx, previousModeKey, pruned)
if err != nil {
return false, fmt.Errorf("share/availability/full: failed to updated pruning mode in "+
"datastore: %w", err)
}
return true, nil
}

// no changes in pruning mode
return false, nil
}

// detectFirstRun is a temporary function that serves to assist migration to the refactored pruner
// implementation (v0.21.0). It checks if the node has been run with pruning enabled before by checking
// if the pruner service ran before, and disallows running as an archival node in the case it has.
//
// TODO @renaynay: remove this function after a few releases.
func detectFirstRun(ctx context.Context, cfg *Config, ds datastore.Datastore, lastPrunedHeight uint64) error {
exists, err := ds.Has(ctx, previousModeKey)
if err != nil {
return fmt.Errorf("share/availability/full: failed to check previous pruned run in "+
"datastore: %w", err)
}
if exists {
return nil
}

if !cfg.EnableService {
if lastPrunedHeight > 1 {
return fullavail.ErrDisallowRevertToArchival
}

return ds.Put(ctx, previousModeKey, archival)
}

return ds.Put(ctx, previousModeKey, pruned)
}
123 changes: 123 additions & 0 deletions nodebuilder/pruner/migration_utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package pruner

import (
"context"
"testing"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
ds_sync "github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

fullavail "github.com/celestiaorg/celestia-node/share/availability/full"
)

// TestDisallowRevertArchival tests that a node that has been previously run
// with full pruning cannot convert back into an "archival" node
func TestDisallowRevertArchival(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

// create a pruned node instance (non-archival) for the first time
cfg := &Config{EnableService: true}
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
nsWrapped := namespace.Wrap(ds, storePrefix)
err := nsWrapped.Put(ctx, previousModeKey, pruned)
require.NoError(t, err)

convert, err := convertFromArchivalToPruned(ctx, cfg, nsWrapped)
assert.NoError(t, err)
assert.False(t, convert)
// ensure availability impl recorded the pruned run
prevMode, err := nsWrapped.Get(ctx, previousModeKey)
require.NoError(t, err)
assert.Equal(t, pruned, prevMode)

// now change to archival mode
cfg.EnableService = false

// ensure failure
convert, err = convertFromArchivalToPruned(ctx, cfg, nsWrapped)
assert.Error(t, err)
assert.ErrorIs(t, err, fullavail.ErrDisallowRevertToArchival)
assert.False(t, convert)

// ensure the node can still run in pruned mode
cfg.EnableService = true
convert, err = convertFromArchivalToPruned(ctx, cfg, nsWrapped)
assert.NoError(t, err)
assert.False(t, convert)
}

// TestAllowConversionFromArchivalToPruned tests that a node that has been previously run
// in archival mode can convert to a pruned node
func TestAllowConversionFromArchivalToPruned(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
nsWrapped := namespace.Wrap(ds, storePrefix)
err := nsWrapped.Put(ctx, previousModeKey, archival)
require.NoError(t, err)

cfg := &Config{EnableService: false}

convert, err := convertFromArchivalToPruned(ctx, cfg, nsWrapped)
assert.NoError(t, err)
assert.False(t, convert)

cfg.EnableService = true

convert, err = convertFromArchivalToPruned(ctx, cfg, nsWrapped)
assert.NoError(t, err)
assert.True(t, convert)

prevMode, err := nsWrapped.Get(ctx, previousModeKey)
require.NoError(t, err)
assert.Equal(t, pruned, prevMode)
}

func TestDetectFirstRun(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

t.Run("FirstRunArchival", func(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
nsWrapped := namespace.Wrap(ds, storePrefix)

cfg := &Config{EnableService: false}

err := detectFirstRun(ctx, cfg, nsWrapped, 1)
assert.NoError(t, err)

prevMode, err := nsWrapped.Get(ctx, previousModeKey)
require.NoError(t, err)
assert.Equal(t, archival, prevMode)
})

t.Run("FirstRunPruned", func(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
nsWrapped := namespace.Wrap(ds, storePrefix)

cfg := &Config{EnableService: true}

err := detectFirstRun(ctx, cfg, nsWrapped, 1)
assert.NoError(t, err)

prevMode, err := nsWrapped.Get(ctx, previousModeKey)
require.NoError(t, err)
assert.Equal(t, pruned, prevMode)
})

t.Run("RevertToArchivalNotAllowed", func(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
nsWrapped := namespace.Wrap(ds, storePrefix)

cfg := &Config{EnableService: false}

err := detectFirstRun(ctx, cfg, nsWrapped, 500)
assert.Error(t, err)
assert.ErrorIs(t, err, fullavail.ErrDisallowRevertToArchival)
})
}
11 changes: 8 additions & 3 deletions nodebuilder/pruner/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package pruner
import (
"context"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
logging "github.com/ipfs/go-log/v2"
"go.uber.org/fx"

Expand Down Expand Up @@ -102,20 +104,23 @@ func advertiseArchival(tp node.Type, pruneCfg *Config) fx.Option {
func convertToPruned() fx.Option {
return fx.Invoke(func(
ctx context.Context,
fa *fullavail.ShareAvailability,
cfg *Config,
ds datastore.Batching,
p *pruner.Service,
) error {
ds = namespace.Wrap(ds, storePrefix)

lastPrunedHeight, err := p.LastPruned(ctx)
if err != nil {
return err
}

err = fullavail.DetectFirstRun(ctx, fa, lastPrunedHeight)
err = detectFirstRun(ctx, cfg, ds, lastPrunedHeight)
if err != nil {
return err
}

convert, err := fa.ConvertFromArchivalToPruned(ctx)
convert, err := convertFromArchivalToPruned(ctx, cfg, ds)
if err != nil {
return err
}
Expand Down
3 changes: 1 addition & 2 deletions nodebuilder/share/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,10 +242,9 @@ func availabilityComponents(tp node.Type, cfg *Config) fx.Option {
fx.Provide(func(
s *store.Store,
getter shwap.Getter,
ds datastore.Batching,
opts []full.Option,
) *full.ShareAvailability {
return full.NewShareAvailability(s, getter, ds, opts...)
return full.NewShareAvailability(s, getter, opts...)
}),
fx.Provide(func(avail *full.ShareAvailability) share.Availability {
return avail
Expand Down
71 changes: 1 addition & 70 deletions share/availability/full/availability.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
package full

import (
"bytes"
"context"
"errors"
"fmt"
"time"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
logging "github.com/ipfs/go-log/v2"

"github.com/celestiaorg/celestia-node/header"
Expand All @@ -25,10 +22,7 @@ var ErrDisallowRevertToArchival = errors.New(
"node has been run with pruner enabled before, it is not safe to convert to an archival" +
"Run with --experimental-pruning enabled or consider re-initializing the store")

var (
log = logging.Logger("share/full")
storePrefix = datastore.NewKey("full_avail")
)
var log = logging.Logger("share/full")

// ShareAvailability implements share.Availability using the full data square
// recovery technique. It is considered "full" because it is required
Expand All @@ -37,8 +31,6 @@ type ShareAvailability struct {
store *store.Store
getter shwap.Getter

ds datastore.Datastore

storageWindow time.Duration
archival bool
}
Expand All @@ -47,7 +39,6 @@ type ShareAvailability struct {
func NewShareAvailability(
store *store.Store,
getter shwap.Getter,
ds datastore.Datastore,
opts ...Option,
) *ShareAvailability {
p := defaultParams()
Expand All @@ -58,7 +49,6 @@ func NewShareAvailability(
return &ShareAvailability{
store: store,
getter: getter,
ds: namespace.Wrap(ds, storePrefix),
storageWindow: availability.StorageWindow,
archival: p.archival,
}
Expand Down Expand Up @@ -127,62 +117,3 @@ func (fa *ShareAvailability) Prune(ctx context.Context, eh *header.ExtendedHeade
log.Debugf("removing block %s at height %d", eh.DAH.String(), eh.Height())
return fa.store.RemoveODSQ4(ctx, eh.Height(), eh.DAH.Hash())
}

var (
previousModeKey = datastore.NewKey("previous_run")
pruned = []byte("pruned")
archival = []byte("archival")
)

// ConvertFromArchivalToPruned ensures that a node has not been run with pruning enabled before
// cannot revert to archival mode. It returns true only if the node is converting to
// pruned mode for the first time.
func (fa *ShareAvailability) ConvertFromArchivalToPruned(ctx context.Context) (bool, error) {
prevMode, err := fa.ds.Get(ctx, previousModeKey)
if err != nil {
return false, err
}

if bytes.Equal(prevMode, pruned) && fa.archival {
return false, ErrDisallowRevertToArchival
}

if bytes.Equal(prevMode, archival) && !fa.archival {
// allow conversion from archival to pruned
err = fa.ds.Put(ctx, previousModeKey, pruned)
if err != nil {
return false, fmt.Errorf("share/availability/full: failed to updated pruning mode in "+
"datastore: %w", err)
}
return true, nil
}

// no changes in pruning mode
return false, nil
}

// DetectFirstRun is a temporary function that serves to assist migration to the refactored pruner
// implementation (v0.21.0). It checks if the node has been run with pruning enabled before by checking
// if the pruner service ran before, and disallows running as an archival node in the case it has.
//
// TODO @renaynay: remove this function after a few releases.
func DetectFirstRun(ctx context.Context, fa *ShareAvailability, lastPrunedHeight uint64) error {
exists, err := fa.ds.Has(ctx, previousModeKey)
if err != nil {
return fmt.Errorf("share/availability/full: failed to check previous pruned run in "+
"datastore: %w", err)
}
if exists {
return nil
}

if fa.archival {
if lastPrunedHeight > 1 {
return ErrDisallowRevertToArchival
}

return fa.ds.Put(ctx, previousModeKey, archival)
}

return fa.ds.Put(ctx, previousModeKey, pruned)
}
Loading

0 comments on commit 5c2b9a9

Please sign in to comment.