Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(share/availability/full): Introduce Q4 trimming for archival nodes #4028

Merged
merged 8 commits into from
Jan 20, 2025
85 changes: 30 additions & 55 deletions nodebuilder/pruner/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,15 @@ package pruner

import (
"context"
"time"

"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/core"
"github.com/celestiaorg/celestia-node/libs/fxutil"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
modshare "github.com/celestiaorg/celestia-node/nodebuilder/share"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/pruner/full"
"github.com/celestiaorg/celestia-node/share/availability"
fullavail "github.com/celestiaorg/celestia-node/share/availability/full"
"github.com/celestiaorg/celestia-node/share/availability/light"
Expand All @@ -23,12 +20,6 @@ import (
var log = logging.Logger("module/pruner")

func ConstructModule(tp node.Type, cfg *Config) fx.Option {
baseComponents := fx.Options(
fx.Supply(cfg),
availWindow(tp, cfg.EnableService),
advertiseArchival(tp, cfg),
)

prunerService := fx.Options(
fx.Provide(fx.Annotate(
newPrunerService,
Expand All @@ -42,51 +33,56 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
// This is necessary to invoke the pruner service as independent thanks to a
// quirk in FX.
fx.Invoke(func(_ *pruner.Service) {}),
fx.Invoke(func(ctx context.Context, ds datastore.Batching, p pruner.Pruner) error {
return pruner.DetectPreviousRun(ctx, ds, p.Kind())
renaynay marked this conversation as resolved.
Show resolved Hide resolved
}),
)

baseComponents := fx.Options(
fx.Supply(cfg),
// TODO @renaynay: move this to share module construction
fx.Supply(modshare.Window(availability.StorageWindow)),
renaynay marked this conversation as resolved.
Show resolved Hide resolved
advertiseArchival(tp, cfg),
prunerService,
)

switch tp {
case node.Light:
// LNs enforce pruning by default
return fx.Module("prune",
baseComponents,
prunerService,
// TODO(@walldiss @renaynay): remove conversion after Availability and Pruner interfaces are merged
// note this provide exists in pruner module to avoid cyclical imports
fx.Provide(func(la *light.ShareAvailability) pruner.Pruner { return la }),
)
case node.Full:
if cfg.EnableService {
return fx.Module("prune",
baseComponents,
prunerService,
fxutil.ProvideAs(full.NewPruner, new(pruner.Pruner)),
fx.Supply([]fullavail.Option{}),
)
fullAvailOpts := make([]fullavail.Option, 0)

if !cfg.EnableService {
// populate archival mode opts
fullAvailOpts = []fullavail.Option{fullavail.WithArchivalMode()}
}

return fx.Module("prune",
baseComponents,
fx.Invoke(func(ctx context.Context, ds datastore.Batching) error {
return pruner.DetectPreviousRun(ctx, ds)
}),
fx.Supply([]fullavail.Option{fullavail.WithArchivalMode()}),
fx.Supply(fullAvailOpts),
fx.Provide(func(fa *fullavail.ShareAvailability) pruner.Pruner { return fa }),
)
case node.Bridge:
if cfg.EnableService {
return fx.Module("prune",
baseComponents,
prunerService,
fxutil.ProvideAs(full.NewPruner, new(pruner.Pruner)),
fx.Supply([]fullavail.Option{}),
fx.Supply([]core.Option{}),
)
coreOpts := make([]core.Option, 0)
fullAvailOpts := make([]fullavail.Option, 0)

if !cfg.EnableService {
// populate archival mode opts
coreOpts = []core.Option{core.WithArchivalMode()}
fullAvailOpts = []fullavail.Option{fullavail.WithArchivalMode()}
}

return fx.Module("prune",
baseComponents,
fx.Invoke(func(ctx context.Context, ds datastore.Batching) error {
return pruner.DetectPreviousRun(ctx, ds)
}),
fx.Supply([]fullavail.Option{fullavail.WithArchivalMode()}),
fx.Supply([]core.Option{core.WithArchivalMode()}),
fx.Provide(func(fa *fullavail.ShareAvailability) pruner.Pruner { return fa }),
fx.Supply(coreOpts),
fx.Supply(fullAvailOpts),
)
default:
panic("unknown node type")
Expand All @@ -102,24 +98,3 @@ func advertiseArchival(tp node.Type, pruneCfg *Config) fx.Option {
return opt
})
}

func availWindow(tp node.Type, pruneEnabled bool) fx.Option {
switch tp {
case node.Light:
// light nodes are still subject to sampling within window
// even if pruning is not enabled.
return fx.Provide(func() modshare.Window {
return modshare.Window(availability.StorageWindow)
})
case node.Full, node.Bridge:
return fx.Provide(func() modshare.Window {
if pruneEnabled {
return modshare.Window(availability.StorageWindow)
}
// implicitly disable pruning by setting the window to 0
return modshare.Window(time.Duration(0))
})
default:
panic("unknown node type")
}
}
4 changes: 3 additions & 1 deletion nodebuilder/share/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,9 @@ func availabilityComponents(tp node.Type, cfg *Config) fx.Option {
)
case node.Bridge, node.Full:
return fx.Options(
fx.Provide(full.NewShareAvailability),
fx.Provide(func(s *store.Store, getter shwap.Getter, opts []full.Option) *full.ShareAvailability {
return full.NewShareAvailability(s, getter, opts...)
}),
fx.Provide(func(avail *full.ShareAvailability) share.Availability {
return avail
}),
Expand Down
9 changes: 1 addition & 8 deletions nodebuilder/tests/prune_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
//go:build pruning || integration

vgonkivs marked this conversation as resolved.
Show resolved Hide resolved
package tests

import (
Expand Down Expand Up @@ -41,11 +39,6 @@ import (
// spin up 3 pruning FNs, connect
// spin up 1 LN that syncs historic blobs
func TestArchivalBlobSync(t *testing.T) {
if testing.Short() {
// TODO: https://github.com/celestiaorg/celestia-node/issues/3636
t.Skip()
}

const (
blocks = 50
btime = time.Millisecond * 300
Expand Down Expand Up @@ -181,7 +174,7 @@ func TestConvertFromPrunedToArchival(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
t.Cleanup(cancel)

// Light nodes are allowed to disable pruning in wish
// Light nodes have pruning enabled by default
for _, nt := range []node.Type{node.Bridge, node.Full} {
pruningCfg := nodebuilder.DefaultConfig(nt)
pruningCfg.Pruner.EnableService = true
Expand Down
20 changes: 0 additions & 20 deletions pruner/archival/pruner.go

This file was deleted.

42 changes: 31 additions & 11 deletions pruner/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,44 @@ var (
// checkpoint contains information related to the state of the
// pruner service that is periodically persisted to disk.
type checkpoint struct {
PrunerKind string `json:"pruner_kind"`
LastPrunedHeight uint64 `json:"last_pruned_height"`
FailedHeaders map[uint64]struct{} `json:"failed"`
}

// DetectPreviousRun checks if the pruner has run before by checking for the existence of a
// checkpoint.
func DetectPreviousRun(ctx context.Context, ds datastore.Datastore) error {
_, err := getCheckpoint(ctx, namespace.Wrap(ds, storePrefix))
if errors.Is(err, errCheckpointNotFound) {
return nil
func newCheckpoint(prunerKind string) *checkpoint {
return &checkpoint{
PrunerKind: prunerKind,
LastPrunedHeight: 1,
FailedHeaders: map[uint64]struct{}{},
}
}

// DetectPreviousRun ensures that a node that has been run with "full" pruning
// mode previously cannot revert back to an "archival" one. This check should
// only be performed when a node is either a Full or Bridge node.
func DetectPreviousRun(ctx context.Context, ds datastore.Datastore, expectedKind string) error {
cristaloleg marked this conversation as resolved.
Show resolved Hide resolved
wrappedDs := namespace.Wrap(ds, storePrefix)

cp, err := getCheckpoint(ctx, wrappedDs)
if err != nil {
if errors.Is(err, errCheckpointNotFound) {
return nil
}
return fmt.Errorf("failed to load checkpoint: %w", err)
}
return ErrDisallowRevertToArchival

if cp.PrunerKind != expectedKind {
// do not allow reversion back to archival mode
if cp.PrunerKind == "full" {
renaynay marked this conversation as resolved.
Show resolved Hide resolved
return ErrDisallowRevertToArchival
}
// allow conversion from archival to full by overriding previous checkpoint
log.Infow("overriding checkpoint to enable full pruning mode...")
cp = newCheckpoint(expectedKind)
return storeCheckpoint(ctx, wrappedDs, cp)
}
return nil
}

// storeCheckpoint persists the checkpoint to disk.
Expand Down Expand Up @@ -78,10 +101,7 @@ func (s *Service) loadCheckpoint(ctx context.Context) error {
cp, err := getCheckpoint(ctx, s.ds)
if err != nil {
if errors.Is(err, errCheckpointNotFound) {
s.checkpoint = &checkpoint{
LastPrunedHeight: 1,
FailedHeaders: map[uint64]struct{}{},
}
s.checkpoint = newCheckpoint(s.pruner.Kind())
return storeCheckpoint(ctx, s.ds, s.checkpoint)
}
return err
Expand Down
63 changes: 63 additions & 0 deletions pruner/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ import (
"testing"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
ds_sync "github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestStoreCheckpoint(t *testing.T) {
ctx := context.Background()
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
c := &checkpoint{
PrunerKind: "test",
LastPrunedHeight: 1,
FailedHeaders: map[uint64]struct{}{1: {}},
}
Expand All @@ -24,3 +27,63 @@ func TestStoreCheckpoint(t *testing.T) {
require.NoError(t, err)
require.Equal(t, c, c2)
}

// TestDisallowRevertArchival tests that a node that has been previously run
// with full pruning cannot convert back into an "archival" node.
func TestDisallowRevertArchival(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
namespaceWrapped := namespace.Wrap(ds, storePrefix)
c := &checkpoint{
PrunerKind: "full",
LastPrunedHeight: 1,
FailedHeaders: map[uint64]struct{}{1: {}},
}

err := storeCheckpoint(ctx, namespaceWrapped, c)
require.NoError(t, err)

// give the unwrapped ds here as this is expected to run
// before pruner service is constructed
err = DetectPreviousRun(ctx, ds, "archival")
assert.Error(t, err)
assert.ErrorIs(t, err, ErrDisallowRevertToArchival)

// ensure no false positives
err = DetectPreviousRun(ctx, ds, "full")
assert.NoError(t, err)

// ensure checkpoint is retrievable after
cp, err := getCheckpoint(ctx, namespaceWrapped)
require.NoError(t, err)
require.NotNil(t, cp)
assert.Equal(t, cp.LastPrunedHeight, c.LastPrunedHeight)
}

func TestCheckpointOverride(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
namespaceWrapped := namespace.Wrap(ds, storePrefix)
c := &checkpoint{
PrunerKind: "archival",
LastPrunedHeight: 600,
FailedHeaders: map[uint64]struct{}{1: {}},
}

err := storeCheckpoint(ctx, namespaceWrapped, c)
require.NoError(t, err)

// give the unwrapped ds here as this is expected to run
// before pruner service is constructed
err = DetectPreviousRun(ctx, ds, "full")
assert.NoError(t, err)

cp, err := getCheckpoint(ctx, namespaceWrapped)
require.NoError(t, err)
assert.Equal(t, "full", cp.PrunerKind)
assert.Equal(t, uint64(1), cp.LastPrunedHeight)
}
32 changes: 0 additions & 32 deletions pruner/full/pruner.go

This file was deleted.

1 change: 1 addition & 0 deletions pruner/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,5 @@ import (
// from the node's datastore.
type Pruner interface {
Prune(context.Context, *header.ExtendedHeader) error
Kind() string
renaynay marked this conversation as resolved.
Show resolved Hide resolved
}
4 changes: 4 additions & 0 deletions pruner/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,10 @@ func (mp *mockPruner) Prune(_ context.Context, h *header.ExtendedHeader) error {
return nil
}

func (mp *mockPruner) Kind() string {
return "mock"
}

// TODO @renaynay @distractedm1nd: Deduplicate via headertest utility.
// https://github.com/celestiaorg/celestia-node/issues/3278.
type SpacedHeaderGenerator struct {
Expand Down
Loading
Loading