Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(share/availability/full): Introduce Q4 trimming for archival nodes #4028

Merged
merged 8 commits into from
Jan 20, 2025
59 changes: 59 additions & 0 deletions nodebuilder/pruner/migration_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package pruner

import (
"context"
"fmt"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"

fullavail "github.com/celestiaorg/celestia-node/share/availability/full"
)

// TODO @renaynay: remove this file after a few releases -- this utility serves as a temporary solution
// to detect if the node has been run with pruning enabled before on previous version(s), and disallow
// running as an archival node.

var (
storePrefix = datastore.NewKey("full_avail")
previousModeKey = datastore.NewKey("previous_mode")
)

// detectFirstRun is a temporary function that serves to assist migration to the refactored pruner
// implementation (v0.21.0). It checks if the node has been run with pruning enabled before by checking
// if the pruner service ran before, and disallows running as an archival node in the case it has.
func detectFirstRun(ctx context.Context, cfg *Config, ds datastore.Datastore, lastPrunedHeight uint64) error {
ds = namespace.Wrap(ds, storePrefix)

exists, err := ds.Has(ctx, previousModeKey)
if err != nil {
return fmt.Errorf("share/availability/full: failed to check previous pruned run in "+
"datastore: %w", err)
}
if exists {
// node has already been run on current version, no migration is necessary
return nil
}

isArchival := !cfg.EnableService

// if the node has been pruned before on a previous version, it cannot revert
// to archival mode
if isArchival && lastPrunedHeight > 1 {
return fullavail.ErrDisallowRevertToArchival
}

return recordFirstRun(ctx, ds, isArchival)
}

// recordFirstRun exists to assist migration to new pruner implementation (v0.21.0) by recording
// the first run of the pruner service in the full availability's datastore. It assumes the datastore
// is already namespace-wrapped.
func recordFirstRun(ctx context.Context, ds datastore.Datastore, isArchival bool) error {
mode := []byte("pruned")
if isArchival {
mode = []byte("archival")
}

return ds.Put(ctx, previousModeKey, mode)
}
60 changes: 60 additions & 0 deletions nodebuilder/pruner/migration_utils_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package pruner

import (
"context"
"testing"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
ds_sync "github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

fullavail "github.com/celestiaorg/celestia-node/share/availability/full"
)

func TestDetectFirstRun(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

t.Run("FirstRunArchival", func(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())

cfg := &Config{EnableService: false}

err := detectFirstRun(ctx, cfg, ds, 1)
assert.NoError(t, err)

nsWrapped := namespace.Wrap(ds, storePrefix)
prevMode, err := nsWrapped.Get(ctx, previousModeKey)
require.NoError(t, err)
assert.Equal(t, []byte("archival"), prevMode)
})

t.Run("FirstRunPruned", func(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())

cfg := &Config{EnableService: true}

err := detectFirstRun(ctx, cfg, ds, 1)
assert.NoError(t, err)

nsWrapped := namespace.Wrap(ds, storePrefix)
prevMode, err := nsWrapped.Get(ctx, previousModeKey)
require.NoError(t, err)
assert.Equal(t, []byte("pruned"), prevMode)
})

t.Run("RevertToArchivalNotAllowed", func(t *testing.T) {
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())

// create archival node instance over a node that has been pruned before
// (height 500)
cfg := &Config{EnableService: false}
lastPrunedHeight := uint64(500)

err := detectFirstRun(ctx, cfg, ds, lastPrunedHeight)
assert.Error(t, err)
assert.ErrorIs(t, err, fullavail.ErrDisallowRevertToArchival)
})
}
117 changes: 64 additions & 53 deletions nodebuilder/pruner/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,15 @@ package pruner

import (
"context"
"time"

"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/core"
"github.com/celestiaorg/celestia-node/libs/fxutil"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
modshare "github.com/celestiaorg/celestia-node/nodebuilder/share"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/pruner/full"
"github.com/celestiaorg/celestia-node/share/availability"
fullavail "github.com/celestiaorg/celestia-node/share/availability/full"
"github.com/celestiaorg/celestia-node/share/availability/light"
Expand All @@ -23,12 +20,6 @@ import (
var log = logging.Logger("module/pruner")

func ConstructModule(tp node.Type, cfg *Config) fx.Option {
baseComponents := fx.Options(
fx.Supply(cfg),
availWindow(tp, cfg.EnableService),
advertiseArchival(tp, cfg),
)

prunerService := fx.Options(
fx.Provide(fx.Annotate(
newPrunerService,
Expand All @@ -44,49 +35,53 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
fx.Invoke(func(_ *pruner.Service) {}),
)

baseComponents := fx.Options(
fx.Supply(cfg),
// TODO @renaynay: move this to share module construction
fx.Supply(modshare.Window(availability.StorageWindow)),
renaynay marked this conversation as resolved.
Show resolved Hide resolved
advertiseArchival(tp, cfg),
prunerService,
)

switch tp {
case node.Light:
// LNs enforce pruning by default
return fx.Module("prune",
baseComponents,
prunerService,
// TODO(@walldiss @renaynay): remove conversion after Availability and Pruner interfaces are merged
// note this provide exists in pruner module to avoid cyclical imports
fx.Provide(func(la *light.ShareAvailability) pruner.Pruner { return la }),
)
case node.Full:
if cfg.EnableService {
return fx.Module("prune",
baseComponents,
prunerService,
fxutil.ProvideAs(full.NewPruner, new(pruner.Pruner)),
fx.Supply([]fullavail.Option{}),
)
fullAvailOpts := make([]fullavail.Option, 0)

if !cfg.EnableService {
// populate archival mode opts
fullAvailOpts = []fullavail.Option{fullavail.WithArchivalMode()}
}

return fx.Module("prune",
baseComponents,
fx.Invoke(func(ctx context.Context, ds datastore.Batching) error {
return pruner.DetectPreviousRun(ctx, ds)
}),
fx.Supply([]fullavail.Option{fullavail.WithArchivalMode()}),
fx.Supply(fullAvailOpts),
fx.Provide(func(fa *fullavail.ShareAvailability) pruner.Pruner { return fa }),
convertToPruned(),
)
case node.Bridge:
if cfg.EnableService {
return fx.Module("prune",
baseComponents,
prunerService,
fxutil.ProvideAs(full.NewPruner, new(pruner.Pruner)),
fx.Supply([]fullavail.Option{}),
fx.Supply([]core.Option{}),
)
coreOpts := make([]core.Option, 0)
fullAvailOpts := make([]fullavail.Option, 0)

if !cfg.EnableService {
// populate archival mode opts
coreOpts = []core.Option{core.WithArchivalMode()}
fullAvailOpts = []fullavail.Option{fullavail.WithArchivalMode()}
}

return fx.Module("prune",
baseComponents,
fx.Invoke(func(ctx context.Context, ds datastore.Batching) error {
return pruner.DetectPreviousRun(ctx, ds)
}),
fx.Supply([]fullavail.Option{fullavail.WithArchivalMode()}),
fx.Supply([]core.Option{core.WithArchivalMode()}),
fx.Provide(func(fa *fullavail.ShareAvailability) pruner.Pruner { return fa }),
fx.Supply(coreOpts),
fx.Supply(fullAvailOpts),
convertToPruned(),
)
default:
panic("unknown node type")
Expand All @@ -103,23 +98,39 @@ func advertiseArchival(tp node.Type, pruneCfg *Config) fx.Option {
})
}

func availWindow(tp node.Type, pruneEnabled bool) fx.Option {
switch tp {
case node.Light:
// light nodes are still subject to sampling within window
// even if pruning is not enabled.
return fx.Provide(func() modshare.Window {
return modshare.Window(availability.StorageWindow)
})
case node.Full, node.Bridge:
return fx.Provide(func() modshare.Window {
if pruneEnabled {
return modshare.Window(availability.StorageWindow)
}
// implicitly disable pruning by setting the window to 0
return modshare.Window(time.Duration(0))
})
default:
panic("unknown node type")
}
// convertToPruned checks if the node is being converted to an archival node
// to a pruned node.
func convertToPruned() fx.Option {
return fx.Invoke(func(
ctx context.Context,
cfg *Config,
ds datastore.Batching,
p *pruner.Service,
) error {
lastPrunedHeight, err := p.LastPruned(ctx)
if err != nil {
return err
}

err = detectFirstRun(ctx, cfg, ds, lastPrunedHeight)
if err != nil {
return err
}

isArchival := !cfg.EnableService
convert, err := fullavail.ConvertFromArchivalToPruned(ctx, ds, isArchival)
if err != nil {
return err
}

// if we convert the node from archival to pruned, we need to reset the checkpoint
// to ensure the node goes back and deletes *all* blocks older than the
// availability window, as archival "pruning" only trims the .q4 file,
// but retains the ODS.
if convert {
walldiss marked this conversation as resolved.
Show resolved Hide resolved
return p.ResetCheckpoint(ctx)
}

return nil
})
}
8 changes: 7 additions & 1 deletion nodebuilder/share/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,13 @@ func availabilityComponents(tp node.Type, cfg *Config) fx.Option {
)
case node.Bridge, node.Full:
return fx.Options(
fx.Provide(full.NewShareAvailability),
fx.Provide(func(
s *store.Store,
getter shwap.Getter,
opts []full.Option,
) *full.ShareAvailability {
return full.NewShareAvailability(s, getter, opts...)
}),
fx.Provide(func(avail *full.ShareAvailability) share.Availability {
return avail
}),
Expand Down
Loading
Loading