diff --git a/chain/store/store.go b/chain/store/store.go index 00a78500ef9..cf934f257dd 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -5,6 +5,7 @@ import ( "context" "encoding/binary" "encoding/json" + "errors" "io" "os" "strconv" @@ -59,6 +60,8 @@ var blockValidationCacheKeyPrefix = dstore.NewKey("blockValidation") var DefaultTipSetCacheSize = 8192 var DefaultMsgMetaCacheSize = 2048 +var ErrNotifeeDone = errors.New("notifee is done and should be removed") + func init() { if s := os.Getenv("LOTUS_CHAIN_TIPSET_CACHE"); s != "" { tscs, err := strconv.Atoi(s) @@ -404,11 +407,36 @@ func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNo apply[i], apply[opp] = apply[opp], apply[i] } - for _, hcf := range notifees { - if err := hcf(revert, apply); err != nil { + var toremove map[int]struct{} + for i, hcf := range notifees { + err := hcf(revert, apply) + + switch err { + case nil: + + case ErrNotifeeDone: + if toremove == nil { + toremove = make(map[int]struct{}) + } + toremove[i] = struct{}{} + + default: log.Error("head change func errored (BAD): ", err) } } + + if len(toremove) > 0 { + newNotifees := make([]ReorgNotifee, 0, len(notifees)-len(toremove)) + for i, hcf := range notifees { + _, remove := toremove[i] + if remove { + continue + } + newNotifees = append(newNotifees, hcf) + } + notifees = newNotifees + } + case <-ctx.Done(): return } diff --git a/node/modules/services.go b/node/modules/services.go index e0a7c2edab3..011b8916313 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -2,6 +2,9 @@ package modules import ( "context" + "os" + "strconv" + "time" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" @@ -25,6 +28,7 @@ import ( "github.com/filecoin-project/lotus/chain/stmgr" "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/sub" + "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/journal" "github.com/filecoin-project/lotus/lib/peermgr" marketevents "github.com/filecoin-project/lotus/markets/loggers" @@ -34,6 +38,19 @@ import ( "github.com/filecoin-project/lotus/node/repo" ) +var pubsubMsgsSyncEpochs = 10 + +func init() { + if s := os.Getenv("LOTUS_MSGS_SYNC_EPOCHS"); s != "" { + val, err := strconv.Atoi(s) + if err != nil { + log.Errorf("failed to parse LOTUS_MSGS_SYNC_EPOCHS: %s", err) + return + } + pubsubMsgsSyncEpochs = val + } +} + func RunHello(mctx helpers.MetricsCtx, lc fx.Lifecycle, h host.Host, svc *hello.Service) error { h.SetStreamHandler(hello.ProtocolID, svc.HandleStream) @@ -82,14 +99,45 @@ func RunChainExchange(h host.Host, svc exchange.Server) { h.SetStreamHandler(exchange.ChainExchangeProtocolID, svc.HandleStream) // new } -func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, s *chain.Syncer, bserv dtypes.ChainBlockService, chain *store.ChainStore, stmgr *stmgr.StateManager, h host.Host, nn dtypes.NetworkName) { - ctx := helpers.LifecycleCtx(mctx, lc) +func waitForSync(stmgr *stmgr.StateManager, epochs int, subscribe func()) { + nearsync := time.Duration(epochs*int(build.BlockDelaySecs)) * time.Second - blocksub, err := ps.Subscribe(build.BlocksTopic(nn)) //nolint - if err != nil { - panic(err) + // early check, are we synced at start up? + ts := stmgr.ChainStore().GetHeaviestTipSet() + timestamp := ts.MinTimestamp() + timestampTime := time.Unix(int64(timestamp), 0) + if build.Clock.Since(timestampTime) < nearsync { + subscribe() + return } + // we are not synced, subscribe to head changes and wait for sync + stmgr.ChainStore().SubscribeHeadChanges(func(rev, app []*types.TipSet) error { + if len(app) == 0 { + return nil + } + + latest := app[0].MinTimestamp() + for _, ts := range app[1:] { + timestamp := ts.MinTimestamp() + if timestamp > latest { + latest = timestamp + } + } + + latestTime := time.Unix(int64(latest), 0) + if build.Clock.Since(latestTime) < nearsync { + subscribe() + return store.ErrNotifeeDone + } + + return nil + }) +} + +func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, s *chain.Syncer, bserv dtypes.ChainBlockService, chain *store.ChainStore, stmgr *stmgr.StateManager, h host.Host, nn dtypes.NetworkName) { + ctx := helpers.LifecycleCtx(mctx, lc) + v := sub.NewBlockValidator( h.ID(), chain, stmgr, func(p peer.ID) { @@ -101,24 +149,43 @@ func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.P panic(err) } - go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager()) -} - -func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName) { - ctx := helpers.LifecycleCtx(mctx, lc) + log.Infof("subscribing to pubsub topic %s", build.BlocksTopic(nn)) - msgsub, err := ps.Subscribe(build.MessagesTopic(nn)) //nolint:staticcheck + blocksub, err := ps.Subscribe(build.BlocksTopic(nn)) //nolint if err != nil { panic(err) } + go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager()) +} + +func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, stmgr *stmgr.StateManager, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName, bootstrapper dtypes.Bootstrapper) { + ctx := helpers.LifecycleCtx(mctx, lc) + v := sub.NewMessageValidator(h.ID(), mpool) if err := ps.RegisterTopicValidator(build.MessagesTopic(nn), v.Validate); err != nil { panic(err) } - go sub.HandleIncomingMessages(ctx, mpool, msgsub) + subscribe := func() { + log.Infof("subscribing to pubsub topic %s", build.MessagesTopic(nn)) + + msgsub, err := ps.Subscribe(build.MessagesTopic(nn)) //nolint + if err != nil { + panic(err) + } + + go sub.HandleIncomingMessages(ctx, mpool, msgsub) + } + + if bootstrapper { + subscribe() + return + } + + // wait until we are synced within 10 epochs -- env var can override + waitForSync(stmgr, pubsubMsgsSyncEpochs, subscribe) } func NewLocalDiscovery(lc fx.Lifecycle, ds dtypes.MetadataDS) (*discoveryimpl.Local, error) { diff --git a/node/test/builder.go b/node/test/builder.go index ea9a8222048..9e5ffc40df7 100644 --- a/node/test/builder.go +++ b/node/test/builder.go @@ -38,6 +38,7 @@ import ( lotusminer "github.com/filecoin-project/lotus/miner" "github.com/filecoin-project/lotus/node" "github.com/filecoin-project/lotus/node/modules" + "github.com/filecoin-project/lotus/node/modules/dtypes" testing2 "github.com/filecoin-project/lotus/node/modules/testing" "github.com/filecoin-project/lotus/node/repo" "github.com/filecoin-project/lotus/storage/mockstorage" @@ -403,6 +404,9 @@ func mockSbBuilderOpts(t *testing.T, fullOpts []test.FullNodeOpts, storage []tes node.Override(new(ffiwrapper.Verifier), mock.MockVerifier), + // so that we subscribe to pubsub topics immediately + node.Override(new(dtypes.Bootstrapper), dtypes.Bootstrapper(true)), + genesis, fullOpts[i].Opts(fulls),