Skip to content

Commit

Permalink
Extend proposal builder to allow the use of a fallback active set.
Browse files Browse the repository at this point in the history
  • Loading branch information
fasmat committed Dec 19, 2023
1 parent d48328c commit 9294372
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 48 deletions.
2 changes: 1 addition & 1 deletion cmd/bootstrapper/bootstrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ var cmd = &cobra.Command{
Short: "generate bootstrapping data",
RunE: func(cmd *cobra.Command, args []string) error {
if len(args) == 0 {
return fmt.Errorf("epoch not specfiied")
return fmt.Errorf("epoch not specified")
}
var targetEpochs []types.EpochID
epochs := strings.Split(args[0], ",")
Expand Down
8 changes: 4 additions & 4 deletions cmd/bootstrapper/generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func getActiveSet(ctx context.Context, endpoint string, epoch types.EpochID) ([]
if err != nil {
return nil, fmt.Errorf("epoch stream %v: %w", endpoint, err)
}
activeSet := make([]types.ATXID, 0, 10_000)
activeSet := make([]types.ATXID, 0, 300_000)
for {
resp, err := stream.Recv()
if errors.Is(err, io.EOF) {
Expand All @@ -227,7 +227,7 @@ func (g *Generator) GenUpdate(
) (string, error) {
as := make([]string, 0, len(activeSet))
for _, atx := range activeSet {
as = append(as, hex.EncodeToString(atx.Hash32().Bytes())) // no leading 0x
as = append(as, hex.EncodeToString(atx.Bytes())) // no leading 0x
}
var update bootstrap.Update
update.Version = SchemaVersion
Expand All @@ -243,11 +243,11 @@ func (g *Generator) GenUpdate(
}
data, err := json.Marshal(update)
if err != nil {
return "", fmt.Errorf("marshal data %v: %w", string(data), err)
return "", fmt.Errorf("marshal data: %w", err)
}
// make sure the data is valid
if err = bootstrap.ValidateSchema(data); err != nil {
return "", fmt.Errorf("invalid data %v: %w", string(data), err)
return "", fmt.Errorf("invalid data: %w", err)
}
filename := PersistedFilename(epoch, suffix)
err = afero.WriteFile(g.fs, filename, data, 0o600)
Expand Down
16 changes: 9 additions & 7 deletions cmd/bootstrapper/generator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,16 @@ func launchServer(tb testing.TB, cdb *datastore.CachedDB) (grpcserver.Config, fu
}
}

func verifyUpdate(t *testing.T, data []byte, epoch types.EpochID, expBeacon string, expAsSize int) {
require.NoError(t, bootstrap.ValidateSchema(data))
func verifyUpdate(tb testing.TB, data []byte, epoch types.EpochID, expBeacon string, expAsSize int) {
tb.Helper()
require.NoError(tb, bootstrap.ValidateSchema(data))

var update bootstrap.Update
require.NoError(t, json.Unmarshal(data, &update))
require.Equal(t, SchemaVersion, update.Version)
require.EqualValues(t, epoch, update.Data.Epoch.ID)
require.Equal(t, expBeacon, update.Data.Epoch.Beacon)
require.Len(t, update.Data.Epoch.ActiveSet, expAsSize)
require.NoError(tb, json.Unmarshal(data, &update))
require.Equal(tb, SchemaVersion, update.Version)
require.EqualValues(tb, epoch, update.Data.Epoch.ID)
require.Equal(tb, expBeacon, update.Data.Epoch.Beacon)
require.Len(tb, update.Data.Epoch.ActiveSet, expAsSize)
}

func TestGenerator_Generate(t *testing.T) {
Expand Down
86 changes: 64 additions & 22 deletions miner/proposal_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,11 @@ type ProposalBuilder struct {

mu sync.Mutex
signers map[types.NodeID]*signerSession
shared sharedSession

fallbackMtx sync.Mutex
fallback map[types.EpochID][]types.ATXID

shared sharedSession
}

type signerSession struct {
Expand Down Expand Up @@ -252,6 +256,7 @@ func New(
syncer: syncer,
conState: conState,
signers: map[types.NodeID]*signerSession{},
fallback: map[types.EpochID][]types.ATXID{},
}
for _, opt := range opts {
opt(pb)
Expand Down Expand Up @@ -368,6 +373,20 @@ func (pb *ProposalBuilder) decideMeshHash(ctx context.Context, current types.Lay
return mesh
}

func (pb *ProposalBuilder) UpdateActiveSet(epoch types.EpochID, activeSet []types.ATXID) {
pb.logger.With().Info("received activeset update",
epoch,
log.Int("size", len(activeSet)),
)
pb.fallbackMtx.Lock()
defer pb.fallbackMtx.Unlock()
if _, ok := pb.fallback[epoch]; ok {
pb.logger.With().Debug("fallback active set already exists", epoch)
return
}

Check warning on line 386 in miner/proposal_builder.go

View check run for this annotation

Codecov / codecov/patch

miner/proposal_builder.go#L384-L386

Added lines #L384 - L386 were not covered by tests
pb.fallback[epoch] = activeSet
}

func (pb *ProposalBuilder) initSharedData(ctx context.Context, lid types.LayerID) error {
if pb.shared.epoch != lid.GetEpoch() {
pb.shared = sharedSession{epoch: lid.GetEpoch()}
Expand All @@ -380,9 +399,7 @@ func (pb *ProposalBuilder) initSharedData(ctx context.Context, lid types.LayerID
pb.shared.beacon = beacon
}
if pb.shared.active.set == nil {
weight, set, err := generateActiveSet(
pb.logger,
pb.cdb,
weight, set, err := pb.generateActiveSet(
pb.shared.epoch,
pb.clock.LayerToTime(pb.shared.epoch.FirstLayer()),
pb.cfg.goodAtxPercent,
Expand All @@ -391,6 +408,9 @@ func (pb *ProposalBuilder) initSharedData(ctx context.Context, lid types.LayerID
if err != nil {
return err
}
sort.Slice(set, func(i, j int) bool {
return bytes.Compare(set[i].Bytes(), set[j].Bytes()) < 0
})
pb.shared.active.set = set
pb.shared.active.weight = weight
}
Expand Down Expand Up @@ -710,26 +730,51 @@ func activesFromFirstBlock(
return totalWeight, set, nil
}

func generateActiveSet(
logger log.Log,
cdb *datastore.CachedDB,
func (pb *ProposalBuilder) fallbackActiveSet(targetEpoch types.EpochID) (uint64, []types.ATXID, error) {
pb.fallbackMtx.Lock()
defer pb.fallbackMtx.Unlock()
set, ok := pb.fallback[targetEpoch]
if !ok {
return 0, nil, fmt.Errorf("no fallback active set for epoch %d", targetEpoch)
}

var totalWeight uint64
for _, id := range set {
atx, err := pb.cdb.GetAtxHeader(id)
if err != nil {
return 0, nil, err
}

Check warning on line 746 in miner/proposal_builder.go

View check run for this annotation

Codecov / codecov/patch

miner/proposal_builder.go#L745-L746

Added lines #L745 - L746 were not covered by tests
totalWeight += atx.GetWeight()
}
return totalWeight, set, nil
}

func (pb *ProposalBuilder) generateActiveSet(
target types.EpochID,
epochStart time.Time,
goodAtxPercent int,
networkDelay time.Duration,
) (uint64, []types.ATXID, error) {
if totalWeight, set, err := pb.fallbackActiveSet(target); err == nil {
pb.logger.With().Info("using fallback active set",
target,
log.Int("size", len(set)),
)
return totalWeight, set, nil
}

var (
totalWeight uint64
set []types.ATXID
numOmitted = 0
)
if err := cdb.IterateEpochATXHeaders(target, func(header *types.ActivationTxHeader) error {
grade, err := gradeAtx(cdb, header.NodeID, header.Received, epochStart, networkDelay)
if err := pb.cdb.IterateEpochATXHeaders(target, func(header *types.ActivationTxHeader) error {
grade, err := gradeAtx(pb.cdb, header.NodeID, header.Received, epochStart, networkDelay)
if err != nil {
return err
}
if grade != good {
logger.With().Debug("atx omitted from active set",
pb.logger.With().Debug("atx omitted from active set",
header.ID,
log.Int("grade", int(grade)),
log.Stringer("smesher", header.NodeID),
Expand All @@ -752,26 +797,23 @@ func generateActiveSet(
// if the node is not synced during `targetEpoch-1`, it doesn't have the correct receipt timestamp
// for all the atx and malfeasance proof. this active set is not usable.
// TODO: change after timing info of ATXs and malfeasance proofs is sync'ed from peers as well
var err error
totalWeight, set, err = activesFromFirstBlock(cdb, target)
totalWeight, set, err := activesFromFirstBlock(pb.cdb, target)
if err != nil {
return 0, nil, err
}
logger.With().Info("miner not synced during prior epoch, active set from first block",
pb.logger.With().Info("miner not synced during prior epoch, active set from first block",
log.Int("all atx", total),
log.Int("num omitted", numOmitted),
log.Int("num block atx", len(set)),
)
} else {
logger.With().Info("active set selected for proposal using grades",
log.Int("num atx", len(set)),
log.Int("num omitted", numOmitted),
log.Int("min atx good pct", goodAtxPercent),
)
return totalWeight, set, nil
}
sort.Slice(set, func(i, j int) bool {
return bytes.Compare(set[i].Bytes(), set[j].Bytes()) < 0
})

pb.logger.With().Info("active set selected for proposal using grades",
log.Int("num atx", len(set)),
log.Int("num omitted", numOmitted),
log.Int("min atx good pct", goodAtxPercent),
)
return totalWeight, set, nil
}

Expand Down
66 changes: 55 additions & 11 deletions miner/proposal_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,15 +204,20 @@ type aggHash struct {
}

type step struct {
lid types.LayerID
beacon types.Beacon
atxs []*types.VerifiedActivationTx
ballots []*types.Ballot
activeset types.ATXIDList
identitities []identity
blocks []*types.Block
hare []types.LayerID
aggHashes []aggHash
lid types.LayerID
beacon types.Beacon
atxs []*types.VerifiedActivationTx
ballots []*types.Ballot
activeset types.ATXIDList
identities []identity
blocks []*types.Block
hare []types.LayerID
aggHashes []aggHash

fallbackActiveSets []struct {
epoch types.EpochID
atxs types.ATXIDList
}

txs []types.TransactionID
latestComplete types.LayerID
Expand Down Expand Up @@ -271,6 +276,42 @@ func TestBuild(t *testing.T) {
},
},
},
{
desc: "activeset fallback with all ATXs from fallback available",
steps: []step{
{
lid: 15,
beacon: types.Beacon{1},
atxs: []*types.VerifiedActivationTx{
gatx(types.ATXID{1}, 2, signer.NodeID(), 1, genAtxWithNonce(777)),
gatx(types.ATXID{2}, 2, types.NodeID{2}, 1),
gatx(types.ATXID{3}, 2, types.NodeID{3}, 1),
gatx(types.ATXID{4}, 2, types.NodeID{4}, 1),
gatx(types.ATXID{5}, 2, types.NodeID{4}, 1),
gatx(types.ATXID{6}, 2, types.NodeID{4}, 1),
},
fallbackActiveSets: []struct {
epoch types.EpochID
atxs types.ATXIDList
}{
{3, types.ATXIDList{{1}, {2}, {5}, {6}}},
},
opinion: &types.Opinion{Hash: types.Hash32{1}},
txs: []types.TransactionID{{1}, {2}},
latestComplete: 14,
expectProposal: expectProposal(
signer, 15, types.ATXID{1}, types.Opinion{Hash: types.Hash32{1}},
expectEpochData(
gactiveset(types.ATXID{1}, types.ATXID{2}, types.ATXID{5}, types.ATXID{6}),
12,
types.Beacon{1},
),
expectTxs([]types.TransactionID{{1}, {2}}),
expectCounters(signer, 3, types.Beacon{1}, 777, 0, 6, 9),
),
},
},
},
{
desc: "min active weight",
opts: []Opt{WithMinimalActiveSetWeight([]types.EpochMinimalActiveWeight{{Weight: 1000}})},
Expand Down Expand Up @@ -460,7 +501,7 @@ func TestBuild(t *testing.T) {
gatx(types.ATXID{1}, 2, signer.NodeID(), 1, genAtxWithNonce(777)),
gatx(types.ATXID{2}, 2, types.NodeID{2}, 1),
},
identitities: []identity{{
identities: []identity{{
id: types.NodeID{2},
proof: types.MalfeasanceProof{Proof: types.Proof{
Type: types.HareEquivocation,
Expand Down Expand Up @@ -716,7 +757,7 @@ func TestBuild(t *testing.T) {
if step.beacon != types.EmptyBeacon {
require.NoError(t, beacons.Add(cdb, step.lid.GetEpoch(), step.beacon))
}
for _, iden := range step.identitities {
for _, iden := range step.identities {
require.NoError(
t,
identities.SetMalicious(
Expand Down Expand Up @@ -754,6 +795,9 @@ func TestBuild(t *testing.T) {
),
)
}
for _, activeSet := range step.fallbackActiveSets {
builder.UpdateActiveSet(activeSet.epoch, activeSet.atxs)
}
}
{
if step.opinion != nil {
Expand Down
14 changes: 11 additions & 3 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -1139,11 +1139,14 @@ func (app *App) listenToUpdates(ctx context.Context) {
app.errCh <- err
return nil
}
for update := range ch {
for {

Check warning on line 1142 in node/node.go

View check run for this annotation

Codecov / codecov/patch

node/node.go#L1142

Added line #L1142 was not covered by tests
select {
case <-ctx.Done():
return nil
default:
case update, ok := <-ch:
if !ok {
return nil
}
if update.Data.Beacon != types.EmptyBeacon {
if err := app.beaconProtocol.UpdateBeacon(update.Data.Epoch, update.Data.Beacon); err != nil {
app.errCh <- err
Expand All @@ -1152,10 +1155,15 @@ func (app *App) listenToUpdates(ctx context.Context) {
}
if len(update.Data.ActiveSet) > 0 {
app.hOracle.UpdateActiveSet(update.Data.Epoch, update.Data.ActiveSet)
app.proposalBuilder.UpdateActiveSet(update.Data.Epoch, update.Data.ActiveSet)

if err := app.fetcher.GetAtxs(ctx, update.Data.ActiveSet); err != nil {
app.errCh <- err
return nil
}

Check warning on line 1163 in node/node.go

View check run for this annotation

Codecov / codecov/patch

node/node.go#L1158-L1163

Added lines #L1158 - L1163 were not covered by tests
}
}
}
return nil
})
}

Expand Down

0 comments on commit 9294372

Please sign in to comment.