Skip to content

Commit

Permalink
feat(share/availability/full): Introduce Q4 trimming for archival nod…
Browse files Browse the repository at this point in the history
…es (#4028)
  • Loading branch information
renaynay authored and cmwaters committed Jan 24, 2025
1 parent f4044ae commit d5038d9
Show file tree
Hide file tree
Showing 13 changed files with 467 additions and 136 deletions.
59 changes: 59 additions & 0 deletions nodebuilder/pruner/migration_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package pruner

import (
"context"
"fmt"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"

fullavail "github.com/celestiaorg/celestia-node/share/availability/full"
)

// TODO @renaynay: remove this file after a few releases -- this utility serves as a temporary solution
// to detect if the node has been run with pruning enabled before on previous version(s), and disallow
// running as an archival node.

var (
storePrefix = datastore.NewKey("full_avail")
previousModeKey = datastore.NewKey("previous_mode")
)

// detectFirstRun is a temporary function that serves to assist migration to the refactored pruner
// implementation (v0.21.0). It checks if the node has been run with pruning enabled before by checking
// if the pruner service ran before, and disallows running as an archival node in the case it has.
func detectFirstRun(ctx context.Context, cfg *Config, ds datastore.Datastore, lastPrunedHeight uint64) error {
ds = namespace.Wrap(ds, storePrefix)

exists, err := ds.Has(ctx, previousModeKey)
if err != nil {
return fmt.Errorf("share/availability/full: failed to check previous pruned run in "+
"datastore: %w", err)
}
if exists {
// node has already been run on current version, no migration is necessary
return nil
}

isArchival := !cfg.EnableService

// if the node has been pruned before on a previous version, it cannot revert
// to archival mode
if isArchival && lastPrunedHeight > 1 {
return fullavail.ErrDisallowRevertToArchival
}

return recordFirstRun(ctx, ds, isArchival)
}

// recordFirstRun exists to assist migration to new pruner implementation (v0.21.0) by recording
// the first run of the pruner service in the full availability's datastore. It assumes the datastore
// is already namespace-wrapped.
func recordFirstRun(ctx context.Context, ds datastore.Datastore, isArchival bool) error {
mode := []byte("pruned")
if isArchival {
mode = []byte("archival")
}

return ds.Put(ctx, previousModeKey, mode)
}
60 changes: 60 additions & 0 deletions nodebuilder/pruner/migration_utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package pruner

import (
"context"
"testing"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
ds_sync "github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

fullavail "github.com/celestiaorg/celestia-node/share/availability/full"
)

func TestDetectFirstRun(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

t.Run("FirstRunArchival", func(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())

cfg := &Config{EnableService: false}

err := detectFirstRun(ctx, cfg, ds, 1)
assert.NoError(t, err)

nsWrapped := namespace.Wrap(ds, storePrefix)
prevMode, err := nsWrapped.Get(ctx, previousModeKey)
require.NoError(t, err)
assert.Equal(t, []byte("archival"), prevMode)
})

t.Run("FirstRunPruned", func(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())

cfg := &Config{EnableService: true}

err := detectFirstRun(ctx, cfg, ds, 1)
assert.NoError(t, err)

nsWrapped := namespace.Wrap(ds, storePrefix)
prevMode, err := nsWrapped.Get(ctx, previousModeKey)
require.NoError(t, err)
assert.Equal(t, []byte("pruned"), prevMode)
})

t.Run("RevertToArchivalNotAllowed", func(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())

// create archival node instance over a node that has been pruned before
// (height 500)
cfg := &Config{EnableService: false}
lastPrunedHeight := uint64(500)

err := detectFirstRun(ctx, cfg, ds, lastPrunedHeight)
assert.Error(t, err)
assert.ErrorIs(t, err, fullavail.ErrDisallowRevertToArchival)
})
}
117 changes: 64 additions & 53 deletions nodebuilder/pruner/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,15 @@ package pruner

import (
"context"
"time"

"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/core"
"github.com/celestiaorg/celestia-node/libs/fxutil"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
modshare "github.com/celestiaorg/celestia-node/nodebuilder/share"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/pruner/full"
"github.com/celestiaorg/celestia-node/share/availability"
fullavail "github.com/celestiaorg/celestia-node/share/availability/full"
"github.com/celestiaorg/celestia-node/share/availability/light"
Expand All @@ -23,12 +20,6 @@ import (
var log = logging.Logger("module/pruner")

func ConstructModule(tp node.Type, cfg *Config) fx.Option {
baseComponents := fx.Options(
fx.Supply(cfg),
availWindow(tp, cfg.EnableService),
advertiseArchival(tp, cfg),
)

prunerService := fx.Options(
fx.Provide(fx.Annotate(
newPrunerService,
Expand All @@ -44,49 +35,53 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
fx.Invoke(func(_ *pruner.Service) {}),
)

baseComponents := fx.Options(
fx.Supply(cfg),
// TODO @renaynay: move this to share module construction
fx.Supply(modshare.Window(availability.StorageWindow)),
advertiseArchival(tp, cfg),
prunerService,
)

switch tp {
case node.Light:
// LNs enforce pruning by default
return fx.Module("prune",
baseComponents,
prunerService,
// TODO(@walldiss @renaynay): remove conversion after Availability and Pruner interfaces are merged
// note this provide exists in pruner module to avoid cyclical imports
fx.Provide(func(la *light.ShareAvailability) pruner.Pruner { return la }),
)
case node.Full:
if cfg.EnableService {
return fx.Module("prune",
baseComponents,
prunerService,
fxutil.ProvideAs(full.NewPruner, new(pruner.Pruner)),
fx.Supply([]fullavail.Option{}),
)
fullAvailOpts := make([]fullavail.Option, 0)

if !cfg.EnableService {
// populate archival mode opts
fullAvailOpts = []fullavail.Option{fullavail.WithArchivalMode()}
}

return fx.Module("prune",
baseComponents,
fx.Invoke(func(ctx context.Context, ds datastore.Batching) error {
return pruner.DetectPreviousRun(ctx, ds)
}),
fx.Supply([]fullavail.Option{fullavail.WithArchivalMode()}),
fx.Supply(fullAvailOpts),
fx.Provide(func(fa *fullavail.ShareAvailability) pruner.Pruner { return fa }),
convertToPruned(),
)
case node.Bridge:
if cfg.EnableService {
return fx.Module("prune",
baseComponents,
prunerService,
fxutil.ProvideAs(full.NewPruner, new(pruner.Pruner)),
fx.Supply([]fullavail.Option{}),
fx.Supply([]core.Option{}),
)
coreOpts := make([]core.Option, 0)
fullAvailOpts := make([]fullavail.Option, 0)

if !cfg.EnableService {
// populate archival mode opts
coreOpts = []core.Option{core.WithArchivalMode()}
fullAvailOpts = []fullavail.Option{fullavail.WithArchivalMode()}
}

return fx.Module("prune",
baseComponents,
fx.Invoke(func(ctx context.Context, ds datastore.Batching) error {
return pruner.DetectPreviousRun(ctx, ds)
}),
fx.Supply([]fullavail.Option{fullavail.WithArchivalMode()}),
fx.Supply([]core.Option{core.WithArchivalMode()}),
fx.Provide(func(fa *fullavail.ShareAvailability) pruner.Pruner { return fa }),
fx.Supply(coreOpts),
fx.Supply(fullAvailOpts),
convertToPruned(),
)
default:
panic("unknown node type")
Expand All @@ -103,23 +98,39 @@ func advertiseArchival(tp node.Type, pruneCfg *Config) fx.Option {
})
}

func availWindow(tp node.Type, pruneEnabled bool) fx.Option {
switch tp {
case node.Light:
// light nodes are still subject to sampling within window
// even if pruning is not enabled.
return fx.Provide(func() modshare.Window {
return modshare.Window(availability.StorageWindow)
})
case node.Full, node.Bridge:
return fx.Provide(func() modshare.Window {
if pruneEnabled {
return modshare.Window(availability.StorageWindow)
}
// implicitly disable pruning by setting the window to 0
return modshare.Window(time.Duration(0))
})
default:
panic("unknown node type")
}
// convertToPruned checks if the node is being converted to an archival node
// to a pruned node.
func convertToPruned() fx.Option {
return fx.Invoke(func(
ctx context.Context,
cfg *Config,
ds datastore.Batching,
p *pruner.Service,
) error {
lastPrunedHeight, err := p.LastPruned(ctx)
if err != nil {
return err
}

err = detectFirstRun(ctx, cfg, ds, lastPrunedHeight)
if err != nil {
return err
}

isArchival := !cfg.EnableService
convert, err := fullavail.ConvertFromArchivalToPruned(ctx, ds, isArchival)
if err != nil {
return err
}

// if we convert the node from archival to pruned, we need to reset the checkpoint
// to ensure the node goes back and deletes *all* blocks older than the
// availability window, as archival "pruning" only trims the .q4 file,
// but retains the ODS.
if convert {
return p.ResetCheckpoint(ctx)
}

return nil
})
}
8 changes: 7 additions & 1 deletion nodebuilder/share/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,13 @@ func availabilityComponents(tp node.Type, cfg *Config) fx.Option {
)
case node.Bridge, node.Full:
return fx.Options(
fx.Provide(full.NewShareAvailability),
fx.Provide(func(
s *store.Store,
getter shwap.Getter,
opts []full.Option,
) *full.ShareAvailability {
return full.NewShareAvailability(s, getter, opts...)
}),
fx.Provide(func(avail *full.ShareAvailability) share.Availability {
return avail
}),
Expand Down
Loading

0 comments on commit d5038d9

Please sign in to comment.