diff --git a/chain/indexer.go b/chain/indexer.go index 305e72dc5..67925e415 100644 --- a/chain/indexer.go +++ b/chain/indexer.go @@ -88,7 +88,7 @@ func NewTipSetIndexer(o lens.APIOpener, d model.Storage, window time.Duration, n case BlocksTask: tsi.processors[BlocksTask] = blocks.NewTask() case MessagesTask: - tsi.messageProcessors[MessagesTask] = messages.NewTask(o) + tsi.messageProcessors[MessagesTask] = messages.NewTask() case ChainEconomicsTask: tsi.processors[ChainEconomicsTask] = chaineconomics.NewTask(o) case ActorStatesRawTask: diff --git a/tasks/actorstate/task.go b/tasks/actorstate/task.go index 446797334..fd11dda6c 100644 --- a/tasks/actorstate/task.go +++ b/tasks/actorstate/task.go @@ -3,6 +3,7 @@ package actorstate import ( "context" "fmt" + "sync" "time" "github.com/filecoin-project/go-address" @@ -19,9 +20,11 @@ import ( // A Task processes the extraction of actor state according the allowed types in its extracter map. type Task struct { - node lens.API - opener lens.APIOpener - closer lens.APICloser + nodeMu sync.Mutex // guards mutations to node, opener and closer + node lens.API + opener lens.APIOpener + closer lens.APICloser + extracterMap ActorExtractorMap } @@ -34,14 +37,18 @@ func NewTask(opener lens.APIOpener, extracterMap ActorExtractorMap) *Task { } func (t *Task) ProcessActors(ctx context.Context, ts *types.TipSet, pts *types.TipSet, candidates map[string]types.Actor) (model.Persistable, *visormodel.ProcessingReport, error) { + // t.node is used only by goroutines started by this method + t.nodeMu.Lock() if t.node == nil { node, closer, err := t.opener.Open(ctx) if err != nil { + t.nodeMu.Unlock() return nil, nil, xerrors.Errorf("unable to open lens: %w", err) } t.node = node t.closer = closer } + t.nodeMu.Unlock() log.Debugw("processing actor state changes", "height", ts.Height(), "parent_height", pts.Height()) @@ -150,8 +157,18 @@ func (t *Task) runActorStateExtraction(ctx context.Context, ts *types.TipSet, pt if !ok { res.SkippedParse = true } else { + // get reference to the lens api, which may have been closed due to a failure elsewhere + t.nodeMu.Lock() + nodeAPI := t.node + t.nodeMu.Unlock() + + if nodeAPI == nil { + res.Error = xerrors.Errorf("failed to extract parsed actor state: no connection to api") + return + } + // Parse state - data, err := extracter.Extract(ctx, info, t.node) + data, err := extracter.Extract(ctx, info, nodeAPI) if err != nil { res.Error = xerrors.Errorf("failed to extract parsed actor state: %w", err) return @@ -161,6 +178,8 @@ func (t *Task) runActorStateExtraction(ctx context.Context, ts *types.TipSet, pt } func (t *Task) Close() error { + t.nodeMu.Lock() + defer t.nodeMu.Unlock() if t.closer != nil { t.closer() t.closer = nil diff --git a/tasks/chaineconomics/task.go b/tasks/chaineconomics/task.go index 9c2beb9f6..4f3cf6224 100644 --- a/tasks/chaineconomics/task.go +++ b/tasks/chaineconomics/task.go @@ -2,6 +2,7 @@ package chaineconomics import ( "context" + "sync" "github.com/filecoin-project/lotus/chain/types" logging "github.com/ipfs/go-log/v2" @@ -15,6 +16,7 @@ import ( var log = logging.Logger("chaineconomics") type Task struct { + nodeMu sync.Mutex // guards mutations to node, opener and closer node lens.API opener lens.APIOpener closer lens.APICloser @@ -27,6 +29,10 @@ func NewTask(opener lens.APIOpener) *Task { } func (p *Task) ProcessTipSet(ctx context.Context, ts *types.TipSet) (model.Persistable, *visormodel.ProcessingReport, error) { + // We use p.node continually through this method so take a broad lock + p.nodeMu.Lock() + defer p.nodeMu.Unlock() + if p.node == nil { node, closer, err := p.opener.Open(ctx) if err != nil { @@ -55,6 +61,9 @@ func (p *Task) ProcessTipSet(ctx context.Context, ts *types.TipSet) (model.Persi } func (p *Task) Close() error { + p.nodeMu.Lock() + defer p.nodeMu.Unlock() + if p.closer != nil { p.closer() p.closer = nil diff --git a/tasks/messages/message.go b/tasks/messages/message.go index 5a2de539d..74753e06b 100644 --- a/tasks/messages/message.go +++ b/tasks/messages/message.go @@ -28,27 +28,13 @@ import ( var log = logging.Logger("messages") type Task struct { - node lens.API - opener lens.APIOpener - closer lens.APICloser } -func NewTask(opener lens.APIOpener) *Task { - return &Task{ - opener: opener, - } +func NewTask() *Task { + return &Task{} } func (p *Task) ProcessMessages(ctx context.Context, ts *types.TipSet, pts *types.TipSet, emsgs []*lens.ExecutedMessage) (model.Persistable, *visormodel.ProcessingReport, error) { - if p.node == nil { - node, closer, err := p.opener.Open(ctx) - if err != nil { - return nil, nil, xerrors.Errorf("unable to open lens: %w", err) - } - p.node = node - p.closer = closer - } - report := &visormodel.ProcessingReport{ Height: int64(pts.Height()), StateRoot: pts.ParentState().String(), @@ -258,11 +244,6 @@ func (p *Task) parseMessageParams(m *types.Message, destCode cid.Cid) (string, s } func (p *Task) Close() error { - if p.closer != nil { - p.closer() - p.closer = nil - } - p.node = nil return nil } diff --git a/tasks/msapprovals/msapprovals.go b/tasks/msapprovals/msapprovals.go index eda684fce..890f56858 100644 --- a/tasks/msapprovals/msapprovals.go +++ b/tasks/msapprovals/msapprovals.go @@ -5,6 +5,7 @@ package msapprovals import ( "bytes" "context" + "sync" "github.com/filecoin-project/lotus/chain/actors/builtin/multisig" "github.com/filecoin-project/lotus/chain/types" @@ -27,6 +28,7 @@ const ( ) type Task struct { + nodeMu sync.Mutex // guards mutations to node, opener and closer node lens.API opener lens.APIOpener closer lens.APICloser @@ -39,6 +41,10 @@ func NewTask(opener lens.APIOpener) *Task { } func (p *Task) ProcessMessages(ctx context.Context, ts *types.TipSet, pts *types.TipSet, emsgs []*lens.ExecutedMessage) (model.Persistable, *visormodel.ProcessingReport, error) { + // We use p.node continually through this method so take a broad lock + p.nodeMu.Lock() + defer p.nodeMu.Unlock() + // TODO: refactor this boilerplate into a helper if p.node == nil { node, closer, err := p.opener.Open(ctx) @@ -169,6 +175,9 @@ func (p *Task) ProcessMessages(ctx context.Context, ts *types.TipSet, pts *types } func (p *Task) Close() error { + p.nodeMu.Lock() + defer p.nodeMu.Unlock() + if p.closer != nil { p.closer() p.closer = nil