Skip to content

Commit

Permalink
fix: avoid deadlock in indexer when processor errors (#407)
Browse files Browse the repository at this point in the history
  • Loading branch information
iand authored Mar 3, 2021
1 parent ade9b43 commit 8f367a3
Showing 1 changed file with 14 additions and 3 deletions.
17 changes: 14 additions & 3 deletions chain/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func (t *TipSetIndexer) TipSet(ctx context.Context, ts *types.TipSet) error {
if res.Error != nil {
llt.Errorw("task returned with error", "error", res.Error.Error())
// tell all the processors to close their connections to the lens, they can reopen when needed
if err := t.Close(); err != nil {
if err := t.closeProcessors(); err != nil {
log.Errorw("error received while closing tipset indexer", "error", err)
}
return res.Error
Expand Down Expand Up @@ -432,7 +432,7 @@ func (t *TipSetIndexer) runActorProcessor(ctx context.Context, p ActorProcessor,
}
}

func (t *TipSetIndexer) Close() error {
func (t *TipSetIndexer) closeProcessors() error {
if t.closer != nil {
t.closer()
t.closer = nil
Expand All @@ -444,14 +444,25 @@ func (t *TipSetIndexer) Close() error {
log.Errorw("error received while closing task processor", "error", err, "task", name)
}
}
for name, p := range t.messageProcessors {
if err := p.Close(); err != nil {
log.Errorw("error received while closing message task processor", "error", err, "task", name)
}
}
for name, p := range t.actorProcessors {
if err := p.Close(); err != nil {
log.Errorw("error received while closing actor task processor", "error", err, "task", name)
}
}

return nil
}

func (t *TipSetIndexer) Close() error {
// ensure there are no persist go routines left running
t.persistSlot <- struct{}{}
return nil

return t.closeProcessors()
}

// A TaskResult is either some data to persist or an error which indicates that the task did not complete. Partial
Expand Down

0 comments on commit 8f367a3

Please sign in to comment.