Skip to content

Commit

Permalink
test: implement index manager tests (#982)
Browse files Browse the repository at this point in the history
* test: implement index manager tests

- fix deadlock on fatal errors
  • Loading branch information
frrist authored Jun 16, 2022
1 parent d0712f5 commit 56fcf12
Show file tree
Hide file tree
Showing 13 changed files with 793 additions and 79 deletions.
3 changes: 2 additions & 1 deletion chain/gap/fill.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/filecoin-project/lily/chain/datasource"
"github.com/filecoin-project/lily/chain/indexer"
"github.com/filecoin-project/lily/chain/indexer/integrated"
"github.com/filecoin-project/lily/chain/indexer/integrated/tipset"
"github.com/filecoin-project/lily/lens"
"github.com/filecoin-project/lily/storage"
)
Expand Down Expand Up @@ -54,7 +55,7 @@ func (g *Filler) Run(ctx context.Context) error {
return err
}

index, err := integrated.NewManager(taskAPI, g.DB, g.name)
index, err := integrated.NewManager(g.DB, tipset.NewBuilder(taskAPI, g.name))
if err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions chain/indexer/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type ModelResult struct {
// - if data with height N and SR1 is being persisted and a request to persist data with the same values is made, allow it
// - if data with height N and SR2 is being persisted and a request to persist data with height N and SR1 is made, block
func (me *ModelExporter) ExportResult(ctx context.Context, strg model.Storage, height int64, results []*ModelResult) error {
if len(results) == 0 {
return nil
}
// lock exporting based on height only allowing a single height to be persisted simultaneously
heightKey := strconv.FormatInt(height, 10)
me.heightKeyMu.LockKey(heightKey)
Expand Down
40 changes: 0 additions & 40 deletions chain/indexer/integrated/builder.go

This file was deleted.

62 changes: 39 additions & 23 deletions chain/indexer/integrated/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import (
"github.com/filecoin-project/lotus/chain/types"
logging "github.com/ipfs/go-log/v2"
"go.opentelemetry.io/otel"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"

"github.com/filecoin-project/lily/chain/indexer"
"github.com/filecoin-project/lily/chain/indexer/integrated/tipset"
"github.com/filecoin-project/lily/model"
visormodel "github.com/filecoin-project/lily/model/visor"
"github.com/filecoin-project/lily/tasks"
)

var log = logging.Logger("lily/index/manager")
Expand All @@ -22,9 +24,8 @@ type Exporter interface {

// Manager manages the execution of an Indexer. It may be used to index TipSets both serially or in parallel.
type Manager struct {
api tasks.DataSource
storage model.Storage
indexBuilder *Builder
indexBuilder tipset.IndexerBuilder
exporter Exporter
window time.Duration
name string
Expand All @@ -41,23 +42,28 @@ func WithWindow(w time.Duration) ManagerOpt {
}
}

func WithExporter(e Exporter) ManagerOpt {
return func(m *Manager) {
m.exporter = e
}
}

// NewManager returns a default Manager. Any provided ManagerOpt's will override Manager's default values.
func NewManager(api tasks.DataSource, strg model.Storage, name string, opts ...ManagerOpt) (*Manager, error) {
func NewManager(strg model.Storage, idxBuilder tipset.IndexerBuilder, opts ...ManagerOpt) (*Manager, error) {
im := &Manager{
api: api,
storage: strg,
window: 0,
name: name,
name: idxBuilder.Name(),
}

for _, opt := range opts {
opt(im)
}

im.indexBuilder = NewBuilder(api, name)
im.indexBuilder = idxBuilder

if im.exporter == nil {
im.exporter = indexer.NewModelExporter(name)
im.exporter = indexer.NewModelExporter(idxBuilder.Name())
}
return im, nil
}
Expand Down Expand Up @@ -104,20 +110,17 @@ func (i *Manager) TipSet(ctx context.Context, ts *types.TipSet, options ...index
return true, nil
}

var modelResults []*indexer.ModelResult
success := true
// collect all the results, recording if any of the tasks were skipped or errored
for res := range taskResults {
select {
case fatal := <-taskErrors:
lg.Errorw("fatal indexer error", "error", fatal)
return false, fatal
default:
success := atomic.NewBool(true)
grp, ctx := errgroup.WithContext(ctx)
grp.Go(func() error {
var modelResults []*indexer.ModelResult
// collect all the results, recording if any of the tasks were skipped or errored
for res := range taskResults {
for _, report := range res.Report {
if report.Status != visormodel.ProcessingStatusOK &&
report.Status != visormodel.ProcessingStatusInfo {
lg.Warnw("task failed", "task", res.Name, "status", report.Status, "errors", report.ErrorsDetected, "info", report.StatusInformation)
success = false
success.Store(false)
} else {
lg.Infow("task success", "task", res.Name, "status", report.Status, "duration", report.CompletedAt.Sub(report.StartedAt))
}
Expand All @@ -128,13 +131,26 @@ func (i *Manager) TipSet(ctx context.Context, ts *types.TipSet, options ...index
})

}
}

// synchronously export extracted data and its report. If datas at this height are currently being persisted this method will block to avoid deadlocking the database.
if err := i.exporter.ExportResult(ctx, i.storage, int64(ts.Height()), modelResults); err != nil {
// synchronously export extracted data and its report. If datas at this height are currently being persisted this method will block to avoid deadlocking the database.
if err := i.exporter.ExportResult(ctx, i.storage, int64(ts.Height()), modelResults); err != nil {
return err
}

return nil
})

grp.Go(func() error {
for fatal := range taskErrors {
success.Store(false)
return fatal
}
return nil
})

if err := grp.Wait(); err != nil {
return false, err
}

lg.Infow("index tipset complete", "success", success)
return success, nil
return success.Load(), nil
}
Loading

0 comments on commit 56fcf12

Please sign in to comment.