Skip to content

Commit

Permalink
refactor(share/availability/full): Full availability stores key with
Browse files Browse the repository at this point in the history
information about the nodes previous run (archival or pruned) and is
checked during startup
  • Loading branch information
renaynay committed Jan 14, 2025
1 parent 9bf0217 commit f370346
Show file tree
Hide file tree
Showing 11 changed files with 366 additions and 128 deletions.
37 changes: 33 additions & 4 deletions nodebuilder/pruner/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package pruner
import (
"context"

"github.com/ipfs/go-datastore"
logging "github.com/ipfs/go-log/v2"
"go.uber.org/fx"

Expand Down Expand Up @@ -33,9 +32,6 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
// This is necessary to invoke the pruner service as independent thanks to a
// quirk in FX.
fx.Invoke(func(_ *pruner.Service) {}),
fx.Invoke(func(ctx context.Context, ds datastore.Batching, p pruner.Pruner) error {
return pruner.DetectPreviousRun(ctx, ds, p.Kind())
}),
)

baseComponents := fx.Options(
Expand Down Expand Up @@ -67,6 +63,7 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
baseComponents,
fx.Supply(fullAvailOpts),
fx.Provide(func(fa *fullavail.ShareAvailability) pruner.Pruner { return fa }),
convertToPruned(),
)
case node.Bridge:
coreOpts := make([]core.Option, 0)
Expand All @@ -83,6 +80,7 @@ func ConstructModule(tp node.Type, cfg *Config) fx.Option {
fx.Provide(func(fa *fullavail.ShareAvailability) pruner.Pruner { return fa }),
fx.Supply(coreOpts),
fx.Supply(fullAvailOpts),
convertToPruned(),
)
default:
panic("unknown node type")
Expand All @@ -98,3 +96,34 @@ func advertiseArchival(tp node.Type, pruneCfg *Config) fx.Option {
return opt
})
}

// convertToPruned checks if the node is being converted to an archival node
// to a pruned node.
func convertToPruned() fx.Option {
return fx.Invoke(func(
ctx context.Context,
fa *fullavail.ShareAvailability,
p *pruner.Service,
) error {
lastPrunedHeight, err := p.LastPruned(ctx)
if err != nil {
return err
}

err = fullavail.DetectFirstRun(ctx, fa, lastPrunedHeight)
if err != nil {
return err
}

convert, err := fa.ConvertFromArchivalToPruned(ctx)
if err != nil {
return err
}

if convert {
return p.ClearCheckpoint(ctx)
}

return nil
})
}
9 changes: 7 additions & 2 deletions nodebuilder/share/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,13 @@ func availabilityComponents(tp node.Type, cfg *Config) fx.Option {
)
case node.Bridge, node.Full:
return fx.Options(
fx.Provide(func(s *store.Store, getter shwap.Getter, opts []full.Option) *full.ShareAvailability {
return full.NewShareAvailability(s, getter, opts...)
fx.Provide(func(
s *store.Store,
getter shwap.Getter,
ds datastore.Batching,
opts []full.Option,
) *full.ShareAvailability {
return full.NewShareAvailability(s, getter, ds, opts...)
}),
fx.Provide(func(avail *full.ShareAvailability) share.Availability {
return avail
Expand Down
99 changes: 96 additions & 3 deletions nodebuilder/tests/prune_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package tests
import (
"bytes"
"context"
"encoding/json"
"testing"
"time"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/libp2p/go-libp2p/core/host"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -19,8 +22,8 @@ import (
"github.com/celestiaorg/celestia-node/nodebuilder/das"
"github.com/celestiaorg/celestia-node/nodebuilder/node"
"github.com/celestiaorg/celestia-node/nodebuilder/tests/swamp"
"github.com/celestiaorg/celestia-node/pruner"
"github.com/celestiaorg/celestia-node/share"
full_avail "github.com/celestiaorg/celestia-node/share/availability/full"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/peers"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrex_getter"
"github.com/celestiaorg/celestia-node/share/shwap/p2p/shrex/shrexeds"
Expand Down Expand Up @@ -169,7 +172,7 @@ func TestArchivalBlobSync(t *testing.T) {
}
}

func TestConvertFromPrunedToArchival(t *testing.T) {
func TestDisallowConvertFromPrunedToArchival(t *testing.T) {
sw := swamp.NewSwamp(t, swamp.WithBlockTime(time.Second))
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
t.Cleanup(cancel)
Expand All @@ -189,6 +192,96 @@ func TestConvertFromPrunedToArchival(t *testing.T) {
err = store.PutConfig(archivalCfg)
require.NoError(t, err)
_, err = sw.NewNodeWithStore(nt, store)
require.ErrorIs(t, err, pruner.ErrDisallowRevertToArchival, nt.String())
assert.Error(t, err)
assert.ErrorIs(t, full_avail.ErrDisallowRevertToArchival, err)
}
}

func TestDisallowConvertToArchivalViaLastPrunedCheck(t *testing.T) {
sw := swamp.NewSwamp(t, swamp.WithBlockTime(time.Second))
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
t.Cleanup(cancel)

var cp struct {
LastPrunedHeight uint64 `json:"last_pruned_height"`
FailedHeaders map[uint64]struct{} `json:"failed"`
}

for _, nt := range []node.Type{node.Bridge, node.Full} {
archivalCfg := nodebuilder.DefaultConfig(nt)

store := nodebuilder.MockStore(t, archivalCfg)
ds, err := store.Datastore()
require.NoError(t, err)

cp.LastPrunedHeight = 500
cp.FailedHeaders = make(map[uint64]struct{})
bin, err := json.Marshal(cp)
require.NoError(t, err)

prunerStore := namespace.Wrap(ds, datastore.NewKey("pruner"))
err = prunerStore.Put(ctx, datastore.NewKey("checkpoint"), bin)
require.NoError(t, err)

_, err = sw.NewNodeWithStore(nt, store)
require.Error(t, err)
assert.ErrorIs(t, full_avail.ErrDisallowRevertToArchival, err)
}
}

func TestConvertFromArchivalToPruned(t *testing.T) {
sw := swamp.NewSwamp(t, swamp.WithBlockTime(time.Second))
ctx, cancel := context.WithTimeout(context.Background(), time.Second*30)
t.Cleanup(cancel)

var cp struct {
LastPrunedHeight uint64 `json:"last_pruned_height"`
FailedHeaders map[uint64]struct{} `json:"failed"`
}

for _, nt := range []node.Type{node.Bridge, node.Full} {
archivalCfg := nodebuilder.DefaultConfig(nt)

store := nodebuilder.MockStore(t, archivalCfg)
ds, err := store.Datastore()
require.NoError(t, err)

// the archival node has trimmed up to height 500
fullAvailStore := namespace.Wrap(ds, datastore.NewKey("full_avail"))
err = fullAvailStore.Put(ctx, datastore.NewKey("previous_run"), []byte("archival"))
require.NoError(t, err)

cp.LastPrunedHeight = 500
cp.FailedHeaders = make(map[uint64]struct{})
bin, err := json.Marshal(cp)
require.NoError(t, err)

prunerStore := namespace.Wrap(ds, datastore.NewKey("pruner"))
err = prunerStore.Put(ctx, datastore.NewKey("checkpoint"), bin)
require.NoError(t, err)

archivalNode := sw.MustNewNodeWithStore(nt, store)
err = archivalNode.Start(ctx)
require.NoError(t, err)
err = archivalNode.Stop(ctx)
require.NoError(t, err)

// convert to pruned node
pruningCfg := nodebuilder.DefaultConfig(nt)
pruningCfg.Pruner.EnableService = true
err = store.PutConfig(pruningCfg)
require.NoError(t, err)
pruningNode, err := sw.NewNodeWithStore(nt, store)
assert.NoError(t, err)
err = pruningNode.Start(ctx)
assert.NoError(t, err)
require.NoError(t, pruningNode.Stop(ctx))

// expect that the checkpoint has been overridden
bin, err = prunerStore.Get(ctx, datastore.NewKey("checkpoint"))
require.NoError(t, err)
err = json.Unmarshal(bin, &cp)
require.NoError(t, err)
assert.Equal(t, uint64(1), cp.LastPrunedHeight)
}
}
40 changes: 2 additions & 38 deletions pruner/checkpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,11 @@ import (
"fmt"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"

"github.com/celestiaorg/celestia-node/header"
)

var (
// ErrDisallowRevertToArchival is returned when a node has been run with pruner enabled before and
// launching it with archival mode.
ErrDisallowRevertToArchival = errors.New(
"node has been run with pruner enabled before, it is not safe to convert to an archival" +
"Run with --experimental-pruning enabled or consider re-initializing the store")

storePrefix = datastore.NewKey("pruner")
checkpointKey = datastore.NewKey("checkpoint")
errCheckpointNotFound = errors.New("checkpoint not found")
Expand All @@ -27,46 +20,17 @@ var (
// checkpoint contains information related to the state of the
// pruner service that is periodically persisted to disk.
type checkpoint struct {
PrunerKind string `json:"pruner_kind"`
LastPrunedHeight uint64 `json:"last_pruned_height"`
FailedHeaders map[uint64]struct{} `json:"failed"`
}

func newCheckpoint(prunerKind string) *checkpoint {
func newCheckpoint() *checkpoint {
return &checkpoint{
PrunerKind: prunerKind,
LastPrunedHeight: 1,
FailedHeaders: map[uint64]struct{}{},
}
}

// DetectPreviousRun ensures that a node that has been run with "full" pruning
// mode previously cannot revert back to an "archival" one. This check should
// only be performed when a node is either a Full or Bridge node.
func DetectPreviousRun(ctx context.Context, ds datastore.Datastore, expectedKind string) error {
wrappedDs := namespace.Wrap(ds, storePrefix)

cp, err := getCheckpoint(ctx, wrappedDs)
if err != nil {
if errors.Is(err, errCheckpointNotFound) {
return nil
}
return fmt.Errorf("failed to load checkpoint: %w", err)
}

if cp.PrunerKind != expectedKind {
// do not allow reversion back to archival mode
if cp.PrunerKind == "full" {
return ErrDisallowRevertToArchival
}
// allow conversion from archival to full by overriding previous checkpoint
log.Infow("overriding checkpoint to enable full pruning mode...")
cp = newCheckpoint(expectedKind)
return storeCheckpoint(ctx, wrappedDs, cp)
}
return nil
}

// storeCheckpoint persists the checkpoint to disk.
func storeCheckpoint(ctx context.Context, ds datastore.Datastore, c *checkpoint) error {
bin, err := json.Marshal(c)
Expand Down Expand Up @@ -101,7 +65,7 @@ func (s *Service) loadCheckpoint(ctx context.Context) error {
cp, err := getCheckpoint(ctx, s.ds)
if err != nil {
if errors.Is(err, errCheckpointNotFound) {
s.checkpoint = newCheckpoint(s.pruner.Kind())
s.checkpoint = newCheckpoint()
return storeCheckpoint(ctx, s.ds, s.checkpoint)
}
return err
Expand Down
63 changes: 0 additions & 63 deletions pruner/checkpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,14 @@ import (
"testing"

"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
ds_sync "github.com/ipfs/go-datastore/sync"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestStoreCheckpoint(t *testing.T) {
ctx := context.Background()
ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
c := &checkpoint{
PrunerKind: "test",
LastPrunedHeight: 1,
FailedHeaders: map[uint64]struct{}{1: {}},
}
Expand All @@ -27,63 +24,3 @@ func TestStoreCheckpoint(t *testing.T) {
require.NoError(t, err)
require.Equal(t, c, c2)
}

// TestDisallowRevertArchival tests that a node that has been previously run
// with full pruning cannot convert back into an "archival" node.
func TestDisallowRevertArchival(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
namespaceWrapped := namespace.Wrap(ds, storePrefix)
c := &checkpoint{
PrunerKind: "full",
LastPrunedHeight: 1,
FailedHeaders: map[uint64]struct{}{1: {}},
}

err := storeCheckpoint(ctx, namespaceWrapped, c)
require.NoError(t, err)

// give the unwrapped ds here as this is expected to run
// before pruner service is constructed
err = DetectPreviousRun(ctx, ds, "archival")
assert.Error(t, err)
assert.ErrorIs(t, err, ErrDisallowRevertToArchival)

// ensure no false positives
err = DetectPreviousRun(ctx, ds, "full")
assert.NoError(t, err)

// ensure checkpoint is retrievable after
cp, err := getCheckpoint(ctx, namespaceWrapped)
require.NoError(t, err)
require.NotNil(t, cp)
assert.Equal(t, cp.LastPrunedHeight, c.LastPrunedHeight)
}

func TestCheckpointOverride(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)

ds := ds_sync.MutexWrap(datastore.NewMapDatastore())
namespaceWrapped := namespace.Wrap(ds, storePrefix)
c := &checkpoint{
PrunerKind: "archival",
LastPrunedHeight: 600,
FailedHeaders: map[uint64]struct{}{1: {}},
}

err := storeCheckpoint(ctx, namespaceWrapped, c)
require.NoError(t, err)

// give the unwrapped ds here as this is expected to run
// before pruner service is constructed
err = DetectPreviousRun(ctx, ds, "full")
assert.NoError(t, err)

cp, err := getCheckpoint(ctx, namespaceWrapped)
require.NoError(t, err)
assert.Equal(t, "full", cp.PrunerKind)
assert.Equal(t, uint64(1), cp.LastPrunedHeight)
}
1 change: 0 additions & 1 deletion pruner/pruner.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,4 @@ import (
// from the node's datastore.
type Pruner interface {
Prune(context.Context, *header.ExtendedHeader) error
Kind() string
}
13 changes: 13 additions & 0 deletions pruner/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,19 @@ func (s *Service) Stop(ctx context.Context) error {
}
}

func (s *Service) LastPruned(ctx context.Context) (uint64, error) {
err := s.loadCheckpoint(ctx)
if err != nil {
return 0, err
}
return s.checkpoint.LastPrunedHeight, nil
}

func (s *Service) ClearCheckpoint(ctx context.Context) error {
s.checkpoint = newCheckpoint()
return storeCheckpoint(ctx, s.ds, s.checkpoint)
}

// run prunes blocks older than the availability wiindow periodically until the
// pruner service is stopped.
func (s *Service) run() {
Expand Down
Loading

0 comments on commit f370346

Please sign in to comment.