Skip to content

Commit

Permalink
miner: select active sets that were received 2 hours before start (#5289
Browse files Browse the repository at this point in the history
)

related: #5282

this should reduce variability in encoded active sets.
previous grader was using hare preround delay, which is 25s and not sufficient for atx to propagate across the network.
  • Loading branch information
dshulyak committed Nov 24, 2023
1 parent 5bf93f9 commit cb83ce8
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 27 deletions.
5 changes: 5 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,10 @@ type BaseConfig struct {
MinerGoodAtxsPercent int `mapstructure:"miner-good-atxs-percent"`

RegossipAtxInterval time.Duration `mapstructure:"regossip-atx-interval"`

// ATXGradeDelay is used to grade ATXs for selection in tortoise active set.
// See grading fuction in miner/proposals_builder.go
ATXGradeDelay time.Duration `mapstructure:"atx-grade-delay"`
}

type PublicMetrics struct {
Expand Down Expand Up @@ -193,6 +197,7 @@ func defaultBaseConfig() BaseConfig {
DatabaseSizeMeteringInterval: 10 * time.Minute,
DatabasePruneInterval: 30 * time.Minute,
NetworkHRP: "sm",
ATXGradeDelay: 10 * time.Second,
}
}

Expand Down
1 change: 1 addition & 0 deletions config/mainnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ func MainnetConfig() Config {
"https://poet-112.spacemesh.network",
},
RegossipAtxInterval: 2 * time.Hour,
ATXGradeDelay: 30 * time.Minute,
},
Genesis: &GenesisConfig{
GenesisTime: "2023-07-14T08:00:00Z",
Expand Down
1 change: 1 addition & 0 deletions config/presets/testnet.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func testnet() config.Config {
TickSize: 666514,
PoETServers: []string{},
RegossipAtxInterval: time.Hour,
ATXGradeDelay: 30 * time.Minute,
},
Genesis: &config.GenesisConfig{
GenesisTime: "2023-09-13T18:00:00Z",
Expand Down
171 changes: 144 additions & 27 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,7 +448,9 @@ func (app *App) Initialize() error {
// vote against all blocks in that layer. so it's important to make sure zdist takes longer than
// hare's max time duration to run consensus for a layer
maxHareRoundsPerLayer := 1 + app.Config.HARE.LimitIterations*hare.RoundsPerIteration // pre-round + 4 rounds per iteration
maxHareLayerDuration := app.Config.HARE.WakeupDelta + time.Duration(maxHareRoundsPerLayer)*app.Config.HARE.RoundDuration
maxHareLayerDuration := app.Config.HARE.WakeupDelta + time.Duration(
maxHareRoundsPerLayer,
)*app.Config.HARE.RoundDuration
if app.Config.LayerDuration*time.Duration(app.Config.Tortoise.Zdist) <= maxHareLayerDuration {
app.log.With().Error("incompatible params",
log.Uint32("tortoise_zdist", app.Config.Tortoise.Zdist),
Expand Down Expand Up @@ -549,7 +551,11 @@ func (app *App) initServices(ctx context.Context) error {
nipostValidatorLogger := app.addLogger(NipostValidatorLogger, lg)
postVerifiers := make([]activation.PostVerifier, 0, app.Config.SMESHING.VerifyingOpts.Workers)
lg.Debug("creating post verifier")
verifier, err := activation.NewPostVerifier(app.Config.POST, nipostValidatorLogger, verifying.WithPowFlags(app.Config.SMESHING.VerifyingOpts.Flags))
verifier, err := activation.NewPostVerifier(
app.Config.POST,
nipostValidatorLogger,
verifying.WithPowFlags(app.Config.SMESHING.VerifyingOpts.Flags),
)
lg.With().Debug("created post verifier", log.Err(err))
if err != nil {
return err
Expand All @@ -559,7 +565,13 @@ func (app *App) initServices(ctx context.Context) error {
}
app.postVerifier = activation.NewOffloadingPostVerifier(postVerifiers, nipostValidatorLogger)

validator := activation.NewValidator(poetDb, app.Config.POST, app.Config.SMESHING.Opts.Scrypt, nipostValidatorLogger, app.postVerifier)
validator := activation.NewValidator(
poetDb,
app.Config.POST,
app.Config.SMESHING.Opts.Scrypt,
nipostValidatorLogger,
app.postVerifier,
)
app.validator = validator

cfg := vm.DefaultConfig()
Expand Down Expand Up @@ -669,11 +681,23 @@ func (app *App) initServices(ctx context.Context) error {
// we can't have an epoch offset which is greater/equal than the number of layers in an epoch

if app.Config.HareEligibility.ConfidenceParam >= app.Config.BaseConfig.LayersPerEpoch {
return fmt.Errorf("confidence param should be smaller than layers per epoch. eligibility-confidence-param: %d. layers-per-epoch: %d",
app.Config.HareEligibility.ConfidenceParam, app.Config.BaseConfig.LayersPerEpoch)
return fmt.Errorf(
"confidence param should be smaller than layers per epoch. eligibility-confidence-param: %d. layers-per-epoch: %d",
app.Config.HareEligibility.ConfidenceParam,
app.Config.BaseConfig.LayersPerEpoch,
)
}

proposalListener := proposals.NewHandler(app.cachedDB, app.edVerifier, app.host, fetcherWrapped, beaconProtocol, msh, trtl, vrfVerifier, app.clock,
proposalListener := proposals.NewHandler(
app.cachedDB,
app.edVerifier,
app.host,
fetcherWrapped,
beaconProtocol,
msh,
trtl,
vrfVerifier,
app.clock,
proposals.WithLogger(app.addLogger(ProposalListenerLogger, lg)),
proposals.WithConfig(proposals.Config{
LayerSize: layerSize,
Expand All @@ -694,7 +718,15 @@ func (app *App) initServices(ctx context.Context) error {
app.addLogger(TxHandlerLogger, lg),
)

app.hOracle = eligibility.New(beaconProtocol, app.cachedDB, vrfVerifier, vrfSigner, app.Config.LayersPerEpoch, app.Config.HareEligibility, app.addLogger(HareOracleLogger, lg))
app.hOracle = eligibility.New(
beaconProtocol,
app.cachedDB,
vrfVerifier,
vrfSigner,
app.Config.LayersPerEpoch,
app.Config.HareEligibility,
app.addLogger(HareOracleLogger, lg),
)
// TODO: genesisMinerWeight is set to app.Config.SpaceToCommit, because PoET ticks are currently hardcoded to 1

bscfg := app.Config.Bootstrap
Expand All @@ -706,7 +738,16 @@ func (app *App) initServices(ctx context.Context) error {
bootstrap.WithLogger(app.addLogger(BootstrapLogger, lg)),
)

app.certifier = blocks.NewCertifier(app.cachedDB, app.hOracle, app.edSgn.NodeID(), app.edSgn, app.edVerifier, app.host, app.clock, beaconProtocol, trtl,
app.certifier = blocks.NewCertifier(
app.cachedDB,
app.hOracle,
app.edSgn.NodeID(),
app.edSgn,
app.edVerifier,
app.host,
app.clock,
beaconProtocol,
trtl,
blocks.WithCertContext(ctx),
blocks.WithCertConfig(blocks.CertConfig{
CommitteeSize: app.Config.HARE.N,
Expand Down Expand Up @@ -809,7 +850,7 @@ func (app *App) initServices(ctx context.Context) error {
miner.WithLayerPerEpoch(layersPerEpoch),
miner.WithMinimalActiveSetWeight(app.Config.Tortoise.MinimalActiveSetWeight),
miner.WithHdist(app.Config.Tortoise.Hdist),
miner.WithNetworkDelay(app.Config.HARE.WakeupDelta),
miner.WithNetworkDelay(app.Config.ATXGradeDelay),
miner.WithMinGoodAtxPercent(minerGoodAtxPct),
miner.WithLogger(app.addLogger(ProposalBuilderLogger, lg)),
)
Expand Down Expand Up @@ -892,8 +933,12 @@ func (app *App) initServices(ctx context.Context) error {
fetch.ValidatorFunc(pubsub.DropPeerOnSyncValidationReject(blockHandler.HandleSyncedBlock, app.host, lg)),
fetch.ValidatorFunc(pubsub.DropPeerOnSyncValidationReject(proposalListener.HandleSyncedProposal, app.host, lg)),
fetch.ValidatorFunc(pubsub.DropPeerOnSyncValidationReject(app.txHandler.HandleBlockTransaction, app.host, lg)),
fetch.ValidatorFunc(pubsub.DropPeerOnSyncValidationReject(app.txHandler.HandleProposalTransaction, app.host, lg)),
fetch.ValidatorFunc(pubsub.DropPeerOnSyncValidationReject(malfeasanceHandler.HandleSyncedMalfeasanceProof, app.host, lg)),
fetch.ValidatorFunc(
pubsub.DropPeerOnSyncValidationReject(app.txHandler.HandleProposalTransaction, app.host, lg),
),
fetch.ValidatorFunc(
pubsub.DropPeerOnSyncValidationReject(malfeasanceHandler.HandleSyncedMalfeasanceProof, app.host, lg),
),
)

syncHandler := func(_ context.Context, _ p2p.Peer, _ []byte) error {
Expand All @@ -910,17 +955,36 @@ func (app *App) initServices(ctx context.Context) error {
}

if app.Config.Beacon.RoundsNumber > 0 {
app.host.Register(pubsub.BeaconWeakCoinProtocol, pubsub.ChainGossipHandler(syncHandler, beaconProtocol.HandleWeakCoinProposal), pubsub.WithValidatorInline(true))
app.host.Register(pubsub.BeaconProposalProtocol, pubsub.ChainGossipHandler(syncHandler, beaconProtocol.HandleProposal), pubsub.WithValidatorInline(true))
app.host.Register(pubsub.BeaconFirstVotesProtocol, pubsub.ChainGossipHandler(syncHandler, beaconProtocol.HandleFirstVotes), pubsub.WithValidatorInline(true))
app.host.Register(pubsub.BeaconFollowingVotesProtocol, pubsub.ChainGossipHandler(syncHandler, beaconProtocol.HandleFollowingVotes), pubsub.WithValidatorInline(true))
app.host.Register(
pubsub.BeaconWeakCoinProtocol,
pubsub.ChainGossipHandler(syncHandler, beaconProtocol.HandleWeakCoinProposal),
pubsub.WithValidatorInline(true),
)
app.host.Register(
pubsub.BeaconProposalProtocol,
pubsub.ChainGossipHandler(syncHandler, beaconProtocol.HandleProposal),
pubsub.WithValidatorInline(true),
)
app.host.Register(
pubsub.BeaconFirstVotesProtocol,
pubsub.ChainGossipHandler(syncHandler, beaconProtocol.HandleFirstVotes),
pubsub.WithValidatorInline(true),
)
app.host.Register(
pubsub.BeaconFollowingVotesProtocol,
pubsub.ChainGossipHandler(syncHandler, beaconProtocol.HandleFollowingVotes),
pubsub.WithValidatorInline(true),
)
}
app.host.Register(pubsub.ProposalProtocol, pubsub.ChainGossipHandler(syncHandler, proposalListener.HandleProposal))
app.host.Register(pubsub.AtxProtocol, pubsub.ChainGossipHandler(atxSyncHandler, atxHandler.HandleGossipAtx))
app.host.Register(pubsub.TxProtocol, pubsub.ChainGossipHandler(syncHandler, app.txHandler.HandleGossipTransaction))
app.host.Register(pubsub.HareProtocol, pubsub.ChainGossipHandler(syncHandler, app.hare.GetHareMsgHandler()))
app.host.Register(pubsub.BlockCertify, pubsub.ChainGossipHandler(syncHandler, app.certifier.HandleCertifyMessage))
app.host.Register(pubsub.MalfeasanceProof, pubsub.ChainGossipHandler(atxSyncHandler, malfeasanceHandler.HandleMalfeasanceProof))
app.host.Register(
pubsub.MalfeasanceProof,
pubsub.ChainGossipHandler(atxSyncHandler, malfeasanceHandler.HandleMalfeasanceProof),
)

app.proposalBuilder = proposalBuilder
app.proposalListener = proposalListener
Expand Down Expand Up @@ -953,7 +1017,10 @@ func (app *App) launchStandalone(ctx context.Context) error {
return nil
}
if len(app.Config.PoETServers) != 1 {
return fmt.Errorf("to launch in a standalone mode provide single local address for poet: %v", app.Config.PoETServers)
return fmt.Errorf(
"to launch in a standalone mode provide single local address for poet: %v",
app.Config.PoETServers,
)
}
value := types.Beacon{}
genesis := app.Config.Genesis.GenesisID()
Expand Down Expand Up @@ -1045,7 +1112,11 @@ func (app *App) startServices(ctx context.Context) error {
if app.Config.SMESHING.Start {
coinbaseAddr, err := types.StringToAddress(app.Config.SMESHING.CoinbaseAccount)
if err != nil {
app.log.Panic("failed to parse CoinbaseAccount address on start `%s`: %v", app.Config.SMESHING.CoinbaseAccount, err)
app.log.Panic(
"failed to parse CoinbaseAccount address on start `%s`: %v",
app.Config.SMESHING.CoinbaseAccount,
err,
)
}
if err := app.atxBuilder.StartSmeshing(coinbaseAddr, app.Config.SMESHING.Opts); err != nil {
app.log.Panic("failed to start smeshing: %v", err)
Expand All @@ -1071,15 +1142,37 @@ func (app *App) initService(ctx context.Context, svc grpcserver.Service) (grpcse
case grpcserver.GlobalState:
return grpcserver.NewGlobalStateService(app.mesh, app.conState), nil
case grpcserver.Mesh:
return grpcserver.NewMeshService(app.cachedDB, app.mesh, app.conState, app.clock, app.Config.LayersPerEpoch, app.Config.Genesis.GenesisID(), app.Config.LayerDuration, app.Config.LayerAvgSize, uint32(app.Config.TxsPerProposal)), nil
return grpcserver.NewMeshService(
app.cachedDB,
app.mesh,
app.conState,
app.clock,
app.Config.LayersPerEpoch,
app.Config.Genesis.GenesisID(),
app.Config.LayerDuration,
app.Config.LayerAvgSize,
uint32(app.Config.TxsPerProposal),
), nil
case grpcserver.Node:
return grpcserver.NewNodeService(app.host, app.mesh, app.clock, app.syncer, cmd.Version, cmd.Commit), nil
case grpcserver.Admin:
return grpcserver.NewAdminService(app.db, app.Config.DataDir(), app.host), nil
case grpcserver.Smesher:
return grpcserver.NewSmesherService(app.postSetupMgr, app.atxBuilder, app.Config.API.SmesherStreamInterval, app.Config.SMESHING.Opts), nil
return grpcserver.NewSmesherService(
app.postSetupMgr,
app.atxBuilder,
app.Config.API.SmesherStreamInterval,
app.Config.SMESHING.Opts,
), nil
case grpcserver.Transaction:
return grpcserver.NewTransactionService(app.db, app.host, app.mesh, app.conState, app.syncer, app.txHandler), nil
return grpcserver.NewTransactionService(
app.db,
app.host,
app.mesh,
app.conState,
app.syncer,
app.txHandler,
), nil
case grpcserver.Activation:
return grpcserver.NewActivationService(app.cachedDB, types.ATXID(app.Config.Genesis.GoldenATX())), nil
}
Expand All @@ -1091,15 +1184,30 @@ func unaryGrpcLogStart(ctx context.Context, req any, _ *grpc.UnaryServerInfo, ha
return handler(ctx, req)
}

func streamingGrpcLogStart(srv any, stream grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
func streamingGrpcLogStart(
srv any,
stream grpc.ServerStream,
_ *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
ctxzap.Info(stream.Context(), "started streaming call")
return handler(srv, stream)
}

func (app *App) newGrpc(logger log.Log, endpoint string) *grpcserver.Server {
return grpcserver.New(endpoint, logger,
grpc.ChainStreamInterceptor(grpctags.StreamServerInterceptor(), grpczap.StreamServerInterceptor(logger.Zap()), streamingGrpcLogStart),
grpc.ChainUnaryInterceptor(grpctags.UnaryServerInterceptor(), grpczap.UnaryServerInterceptor(logger.Zap()), unaryGrpcLogStart),
return grpcserver.New(
endpoint,
logger,
grpc.ChainStreamInterceptor(
grpctags.StreamServerInterceptor(),
grpczap.StreamServerInterceptor(logger.Zap()),
streamingGrpcLogStart,
),
grpc.ChainUnaryInterceptor(
grpctags.UnaryServerInterceptor(),
grpczap.UnaryServerInterceptor(logger.Zap()),
unaryGrpcLogStart,
),
grpc.MaxSendMsgSize(app.Config.API.GrpcSendMsgSize),
grpc.MaxRecvMsgSize(app.Config.API.GrpcRecvMsgSize),
)
Expand Down Expand Up @@ -1335,9 +1443,18 @@ func (app *App) setupDBs(ctx context.Context, lg log.Log) error {
}
app.db = sqlDB
if app.Config.CollectMetrics && app.Config.DatabaseSizeMeteringInterval != 0 {
app.dbMetrics = dbmetrics.NewDBMetricsCollector(ctx, sqlDB, app.addLogger(StateDbLogger, lg), app.Config.DatabaseSizeMeteringInterval)
app.dbMetrics = dbmetrics.NewDBMetricsCollector(
ctx,
sqlDB,
app.addLogger(StateDbLogger, lg),
app.Config.DatabaseSizeMeteringInterval,
)
}
app.cachedDB = datastore.NewCachedDB(sqlDB, app.addLogger(CachedDBLogger, lg), datastore.WithConfig(app.Config.Cache))
app.cachedDB = datastore.NewCachedDB(
sqlDB,
app.addLogger(CachedDBLogger, lg),
datastore.WithConfig(app.Config.Cache),
)
return nil
}

Expand Down

0 comments on commit cb83ce8

Please sign in to comment.