Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(share/availability/full): Introduce Q4 trimming for archival nodes #4028

Merged
merged 8 commits into from
Jan 20, 2025
112 changes: 58 additions & 54 deletions nodebuilder/pruner/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,14 @@ package pruner

import (
"context"
"time"

"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"
"go.uber.org/fx"

"github.com/celestiaorg/celestia-node/core"
"github.com/celestiaorg/celestia-node/libs/fxutil"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
modshare "github.com/celestiaorg/celestia-node/nodebuilder/share"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/pruner/full"
"github.com/celestiaorg/celestia-node/share/availability"
fullavail "github.com/celestiaorg/celestia-node/share/availability/full"
"github.com/celestiaorg/celestia-node/share/availability/light"
Expand All @@ -23,12 +19,6 @@ import (
var log = logging.Logger("module/pruner")

func ConstructModule(tp node.Type, cfg *Config) fx.Option {
baseComponents := fx.Options(
fx.Supply(cfg),
availWindow(tp, cfg.EnableService),
advertiseArchival(tp, cfg),
)

prunerService := fx.Options(
fx.Provide(fx.Annotate(
newPrunerService,
Expand All @@ -44,49 +34,53 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
fx.Invoke(func(_ *pruner.Service) {}),
)

baseComponents := fx.Options(
fx.Supply(cfg),
// TODO @renaynay: move this to share module construction
fx.Supply(modshare.Window(availability.StorageWindow)),
renaynay marked this conversation as resolved.
Show resolved Hide resolved
advertiseArchival(tp, cfg),
prunerService,
)

switch tp {
case node.Light:
// LNs enforce pruning by default
return fx.Module("prune",
baseComponents,
prunerService,
// TODO(@walldiss @renaynay): remove conversion after Availability and Pruner interfaces are merged
// note this provide exists in pruner module to avoid cyclical imports
fx.Provide(func(la *light.ShareAvailability) pruner.Pruner { return la }),
)
case node.Full:
if cfg.EnableService {
return fx.Module("prune",
baseComponents,
prunerService,
fxutil.ProvideAs(full.NewPruner, new(pruner.Pruner)),
fx.Supply([]fullavail.Option{}),
)
fullAvailOpts := make([]fullavail.Option, 0)

if !cfg.EnableService {
// populate archival mode opts
fullAvailOpts = []fullavail.Option{fullavail.WithArchivalMode()}
}

return fx.Module("prune",
baseComponents,
fx.Invoke(func(ctx context.Context, ds datastore.Batching) error {
return pruner.DetectPreviousRun(ctx, ds)
}),
fx.Supply([]fullavail.Option{fullavail.WithArchivalMode()}),
fx.Supply(fullAvailOpts),
fx.Provide(func(fa *fullavail.ShareAvailability) pruner.Pruner { return fa }),
convertToPruned(),
)
case node.Bridge:
if cfg.EnableService {
return fx.Module("prune",
baseComponents,
prunerService,
fxutil.ProvideAs(full.NewPruner, new(pruner.Pruner)),
fx.Supply([]fullavail.Option{}),
fx.Supply([]core.Option{}),
)
coreOpts := make([]core.Option, 0)
fullAvailOpts := make([]fullavail.Option, 0)

if !cfg.EnableService {
// populate archival mode opts
coreOpts = []core.Option{core.WithArchivalMode()}
fullAvailOpts = []fullavail.Option{fullavail.WithArchivalMode()}
}

return fx.Module("prune",
baseComponents,
fx.Invoke(func(ctx context.Context, ds datastore.Batching) error {
return pruner.DetectPreviousRun(ctx, ds)
}),
fx.Supply([]fullavail.Option{fullavail.WithArchivalMode()}),
fx.Supply([]core.Option{core.WithArchivalMode()}),
fx.Provide(func(fa *fullavail.ShareAvailability) pruner.Pruner { return fa }),
fx.Supply(coreOpts),
fx.Supply(fullAvailOpts),
convertToPruned(),
)
default:
panic("unknown node type")
Expand All @@ -103,23 +97,33 @@ func advertiseArchival(tp node.Type, pruneCfg *Config) fx.Option {
})
}

func availWindow(tp node.Type, pruneEnabled bool) fx.Option {
switch tp {
case node.Light:
// light nodes are still subject to sampling within window
// even if pruning is not enabled.
return fx.Provide(func() modshare.Window {
return modshare.Window(availability.StorageWindow)
})
case node.Full, node.Bridge:
return fx.Provide(func() modshare.Window {
if pruneEnabled {
return modshare.Window(availability.StorageWindow)
}
// implicitly disable pruning by setting the window to 0
return modshare.Window(time.Duration(0))
})
default:
panic("unknown node type")
}
// convertToPruned checks if the node is being converted to an archival node
// to a pruned node.
func convertToPruned() fx.Option {
return fx.Invoke(func(
ctx context.Context,
fa *fullavail.ShareAvailability,
p *pruner.Service,
) error {
lastPrunedHeight, err := p.LastPruned(ctx)
if err != nil {
return err
}

err = fullavail.DetectFirstRun(ctx, fa, lastPrunedHeight)
if err != nil {
return err
}

convert, err := fa.ConvertFromArchivalToPruned(ctx)
if err != nil {
return err
}

if convert {
walldiss marked this conversation as resolved.
Show resolved Hide resolved
return p.ClearCheckpoint(ctx)
}

return nil
})
}
9 changes: 8 additions & 1 deletion nodebuilder/share/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,14 @@ func availabilityComponents(tp node.Type, cfg *Config) fx.Option {
)
case node.Bridge, node.Full:
return fx.Options(
fx.Provide(full.NewShareAvailability),
fx.Provide(func(
s *store.Store,
getter shwap.Getter,
ds datastore.Batching,
opts []full.Option,
) *full.ShareAvailability {
return full.NewShareAvailability(s, getter, ds, opts...)
}),
fx.Provide(func(avail *full.ShareAvailability) share.Availability {
return avail
}),
Expand Down
105 changes: 94 additions & 11 deletions nodebuilder/tests/prune_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
//go:build pruning || integration

vgonkivs marked this conversation as resolved.
Show resolved Hide resolved
package tests

import (
"bytes"
"context"
"encoding/json"
"testing"
"time"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/libp2p/go-libp2p/core/host"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -21,8 +22,8 @@ import (
"github.com/celestiaorg/celestia-node/nodebuilder/das"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/nodebuilder/tests/swamp"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share"
full_avail "github.com/celestiaorg/celestia-node/share/availability/full"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/peers"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrex_getter"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexeds"
Expand All @@ -41,11 +42,6 @@ import (
// spin up 3 pruning FNs, connect
// spin up 1 LN that syncs historic blobs
func TestArchivalBlobSync(t *testing.T) {
if testing.Short() {
// TODO: https://github.com/celestiaorg/celestia-node/issues/3636
t.Skip()
}

const (
blocks = 50
btime = time.Millisecond * 300
Expand Down Expand Up @@ -176,12 +172,12 @@ func TestArchivalBlobSync(t *testing.T) {
}
}

func TestConvertFromPrunedToArchival(t *testing.T) {
func TestDisallowConvertFromPrunedToArchival(t *testing.T) {
sw := swamp.NewSwamp(t, swamp.WithBlockTime(time.Second))
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
t.Cleanup(cancel)

// Light nodes are allowed to disable pruning in wish
// Light nodes have pruning enabled by default
for _, nt := range []node.Type{node.Bridge, node.Full} {
pruningCfg := nodebuilder.DefaultConfig(nt)
pruningCfg.Pruner.EnableService = true
Expand All @@ -196,6 +192,93 @@ func TestConvertFromPrunedToArchival(t *testing.T) {
err = store.PutConfig(archivalCfg)
require.NoError(t, err)
_, err = sw.NewNodeWithStore(nt, store)
require.ErrorIs(t, err, pruner.ErrDisallowRevertToArchival, nt.String())
assert.Error(t, err)
assert.ErrorIs(t, full_avail.ErrDisallowRevertToArchival, err)
}
}

func TestDisallowConvertToArchivalViaLastPrunedCheck(t *testing.T) {
sw := swamp.NewSwamp(t, swamp.WithBlockTime(time.Second))
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
t.Cleanup(cancel)

var cp struct {
LastPrunedHeight uint64 `json:"last_pruned_height"`
FailedHeaders map[uint64]struct{} `json:"failed"`
}

for _, nt := range []node.Type{node.Bridge, node.Full} {
archivalCfg := nodebuilder.DefaultConfig(nt)

store := nodebuilder.MockStore(t, archivalCfg)
ds, err := store.Datastore()
require.NoError(t, err)

cp.LastPrunedHeight = 500
cp.FailedHeaders = make(map[uint64]struct{})
bin, err := json.Marshal(cp)
require.NoError(t, err)

prunerStore := namespace.Wrap(ds, datastore.NewKey("pruner"))
err = prunerStore.Put(ctx, datastore.NewKey("checkpoint"), bin)
require.NoError(t, err)

_, err = sw.NewNodeWithStore(nt, store)
require.Error(t, err)
assert.ErrorIs(t, full_avail.ErrDisallowRevertToArchival, err)
}
}

func TestConvertFromArchivalToPruned(t *testing.T) {
sw := swamp.NewSwamp(t, swamp.WithBlockTime(time.Second))
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
t.Cleanup(cancel)

var cp struct {
LastPrunedHeight uint64 `json:"last_pruned_height"`
FailedHeaders map[uint64]struct{} `json:"failed"`
}

for _, nt := range []node.Type{node.Bridge, node.Full} {
archivalCfg := nodebuilder.DefaultConfig(nt)

store := nodebuilder.MockStore(t, archivalCfg)
ds, err := store.Datastore()
require.NoError(t, err)

// the archival node has trimmed up to height 500
fullAvailStore := namespace.Wrap(ds, datastore.NewKey("full_avail"))
err = fullAvailStore.Put(ctx, datastore.NewKey("previous_run"), []byte("archival"))
require.NoError(t, err)

cp.LastPrunedHeight = 500
cp.FailedHeaders = make(map[uint64]struct{})
bin, err := json.Marshal(cp)
require.NoError(t, err)

prunerStore := namespace.Wrap(ds, datastore.NewKey("pruner"))
err = prunerStore.Put(ctx, datastore.NewKey("checkpoint"), bin)
require.NoError(t, err)

archivalNode := sw.MustNewNodeWithStore(nt, store)
err = archivalNode.Start(ctx)
require.NoError(t, err)
err = archivalNode.Stop(ctx)
require.NoError(t, err)

// convert to pruned node
pruningCfg := nodebuilder.DefaultConfig(nt)
pruningCfg.Pruner.EnableService = true
err = store.PutConfig(pruningCfg)
require.NoError(t, err)
_, err = sw.NewNodeWithStore(nt, store)
assert.NoError(t, err)

// expect that the checkpoint has been overridden
bin, err = prunerStore.Get(ctx, datastore.NewKey("checkpoint"))
require.NoError(t, err)
err = json.Unmarshal(bin, &cp)
require.NoError(t, err)
assert.Equal(t, uint64(1), cp.LastPrunedHeight)
}
}
20 changes: 0 additions & 20 deletions pruner/archival/pruner.go

This file was deleted.

Loading
Loading