Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: implement index manager tests #982

Merged
merged 2 commits into from
Jun 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion chain/gap/fill.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/filecoin-project/lily/chain/datasource"
"github.com/filecoin-project/lily/chain/indexer"
"github.com/filecoin-project/lily/chain/indexer/integrated"
"github.com/filecoin-project/lily/chain/indexer/integrated/tipset"
"github.com/filecoin-project/lily/lens"
"github.com/filecoin-project/lily/storage"
)
Expand Down Expand Up @@ -54,7 +55,7 @@ func (g *Filler) Run(ctx context.Context) error {
return err
}

index, err := integrated.NewManager(taskAPI, g.DB, g.name)
index, err := integrated.NewManager(g.DB, tipset.NewBuilder(taskAPI, g.name))
if err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions chain/indexer/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type ModelResult struct {
// - if data with height N and SR1 is being persisted and a request to persist data with the same values is made, allow it
// - if data with height N and SR2 is being persisted and a request to persist data with height N and SR1 is made, block
func (me *ModelExporter) ExportResult(ctx context.Context, strg model.Storage, height int64, results []*ModelResult) error {
if len(results) == 0 {
return nil
}
// lock exporting based on height only allowing a single height to be persisted simultaneously
heightKey := strconv.FormatInt(height, 10)
me.heightKeyMu.LockKey(heightKey)
Expand Down
40 changes: 0 additions & 40 deletions chain/indexer/integrated/builder.go

This file was deleted.

62 changes: 39 additions & 23 deletions chain/indexer/integrated/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,13 @@ import (
"github.com/filecoin-project/lotus/chain/types"
logging "github.com/ipfs/go-log/v2"
"go.opentelemetry.io/otel"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"

"github.com/filecoin-project/lily/chain/indexer"
"github.com/filecoin-project/lily/chain/indexer/integrated/tipset"
"github.com/filecoin-project/lily/model"
visormodel "github.com/filecoin-project/lily/model/visor"
"github.com/filecoin-project/lily/tasks"
)

var log = logging.Logger("lily/index/manager")
Expand All @@ -22,9 +24,8 @@ type Exporter interface {

// Manager manages the execution of an Indexer. It may be used to index TipSets both serially or in parallel.
type Manager struct {
api tasks.DataSource
storage model.Storage
indexBuilder *Builder
indexBuilder tipset.IndexerBuilder
exporter Exporter
window time.Duration
name string
Expand All @@ -41,23 +42,28 @@ func WithWindow(w time.Duration) ManagerOpt {
}
}

func WithExporter(e Exporter) ManagerOpt {
return func(m *Manager) {
m.exporter = e
}
}

// NewManager returns a default Manager. Any provided ManagerOpt's will override Manager's default values.
func NewManager(api tasks.DataSource, strg model.Storage, name string, opts ...ManagerOpt) (*Manager, error) {
func NewManager(strg model.Storage, idxBuilder tipset.IndexerBuilder, opts ...ManagerOpt) (*Manager, error) {
im := &Manager{
api: api,
storage: strg,
window: 0,
name: name,
name: idxBuilder.Name(),
}

for _, opt := range opts {
opt(im)
}

im.indexBuilder = NewBuilder(api, name)
im.indexBuilder = idxBuilder

if im.exporter == nil {
im.exporter = indexer.NewModelExporter(name)
im.exporter = indexer.NewModelExporter(idxBuilder.Name())
}
return im, nil
}
Expand Down Expand Up @@ -104,20 +110,17 @@ func (i *Manager) TipSet(ctx context.Context, ts *types.TipSet, options ...index
return true, nil
}

var modelResults []*indexer.ModelResult
success := true
// collect all the results, recording if any of the tasks were skipped or errored
for res := range taskResults {
select {
case fatal := <-taskErrors:
lg.Errorw("fatal indexer error", "error", fatal)
return false, fatal
default:
success := atomic.NewBool(true)
grp, ctx := errgroup.WithContext(ctx)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

introduction of errgroup fixes deadlock by allowing both channels to be read from simultaneously. Prior to this taskErrors was only read when a taskResults was received. If there wasn't a taskResult then the tipset indexer deadlocked writing to the taskErrors channel since nothing was reading from it.

grp.Go(func() error {
var modelResults []*indexer.ModelResult
// collect all the results, recording if any of the tasks were skipped or errored
for res := range taskResults {
for _, report := range res.Report {
if report.Status != visormodel.ProcessingStatusOK &&
report.Status != visormodel.ProcessingStatusInfo {
lg.Warnw("task failed", "task", res.Name, "status", report.Status, "errors", report.ErrorsDetected, "info", report.StatusInformation)
success = false
success.Store(false)
} else {
lg.Infow("task success", "task", res.Name, "status", report.Status, "duration", report.CompletedAt.Sub(report.StartedAt))
}
Expand All @@ -128,13 +131,26 @@ func (i *Manager) TipSet(ctx context.Context, ts *types.TipSet, options ...index
})

}
}

// synchronously export extracted data and its report. If datas at this height are currently being persisted this method will block to avoid deadlocking the database.
if err := i.exporter.ExportResult(ctx, i.storage, int64(ts.Height()), modelResults); err != nil {
// synchronously export extracted data and its report. If datas at this height are currently being persisted this method will block to avoid deadlocking the database.
if err := i.exporter.ExportResult(ctx, i.storage, int64(ts.Height()), modelResults); err != nil {
return err
}

return nil
})

grp.Go(func() error {
for fatal := range taskErrors {
success.Store(false)
return fatal
}
return nil
})

if err := grp.Wait(); err != nil {
return false, err
}

lg.Infow("index tipset complete", "success", success)
return success, nil
return success.Load(), nil
}
Loading