diff --git a/chain/gap/fill.go b/chain/gap/fill.go index 475e5b581..1751917ed 100644 --- a/chain/gap/fill.go +++ b/chain/gap/fill.go @@ -55,7 +55,7 @@ func (g *Filler) Run(ctx context.Context) error { return err } - index, err := integrated.NewManager(g.DB, tipset.NewBuilder(taskAPI, g.name), g.name) + index, err := integrated.NewManager(g.DB, tipset.NewBuilder(taskAPI, g.name)) if err != nil { return err } diff --git a/chain/indexer/exporter.go b/chain/indexer/exporter.go index a9931a746..996393aa5 100644 --- a/chain/indexer/exporter.go +++ b/chain/indexer/exporter.go @@ -41,6 +41,9 @@ type ModelResult struct { // - if data with height N and SR1 is being persisted and a request to persist data with the same values is made, allow it // - if data with height N and SR2 is being persisted and a request to persist data with height N and SR1 is made, block func (me *ModelExporter) ExportResult(ctx context.Context, strg model.Storage, height int64, results []*ModelResult) error { + if len(results) == 0 { + return nil + } // lock exporting based on height only allowing a single height to be persisted simultaneously heightKey := strconv.FormatInt(height, 10) me.heightKeyMu.LockKey(heightKey) diff --git a/chain/indexer/integrated/manager.go b/chain/indexer/integrated/manager.go index 75bdc4187..6a745a02f 100644 --- a/chain/indexer/integrated/manager.go +++ b/chain/indexer/integrated/manager.go @@ -49,11 +49,11 @@ func WithExporter(e Exporter) ManagerOpt { } // NewManager returns a default Manager. Any provided ManagerOpt's will override Manager's default values. -func NewManager(strg model.Storage, idxBuilder tipset.IndexerBuilder, name string, opts ...ManagerOpt) (*Manager, error) { +func NewManager(strg model.Storage, idxBuilder tipset.IndexerBuilder, opts ...ManagerOpt) (*Manager, error) { im := &Manager{ storage: strg, window: 0, - name: name, + name: idxBuilder.Name(), } for _, opt := range opts { @@ -63,7 +63,7 @@ func NewManager(strg model.Storage, idxBuilder tipset.IndexerBuilder, name strin im.indexBuilder = idxBuilder if im.exporter == nil { - im.exporter = indexer.NewModelExporter(name) + im.exporter = indexer.NewModelExporter(idxBuilder.Name()) } return im, nil } @@ -132,11 +132,9 @@ func (i *Manager) TipSet(ctx context.Context, ts *types.TipSet, options ...index } - if len(modelResults) > 0 { - // synchronously export extracted data and its report. If datas at this height are currently being persisted this method will block to avoid deadlocking the database. - if err := i.exporter.ExportResult(ctx, i.storage, int64(ts.Height()), modelResults); err != nil { - return err - } + // synchronously export extracted data and its report. If datas at this height are currently being persisted this method will block to avoid deadlocking the database. + if err := i.exporter.ExportResult(ctx, i.storage, int64(ts.Height()), modelResults); err != nil { + return err } return nil diff --git a/chain/indexer/integrated/manager_test.go b/chain/indexer/integrated/manager_test.go index cdf966554..217772050 100644 --- a/chain/indexer/integrated/manager_test.go +++ b/chain/indexer/integrated/manager_test.go @@ -16,7 +16,7 @@ import ( visormodel "github.com/filecoin-project/lily/model/visor" ) -func TestManagerStatusOK(t *testing.T) { +func TestManagerWithResultStatusOK(t *testing.T) { ctx := context.Background() // mock index builder and mock indexer @@ -29,7 +29,7 @@ func TestManagerStatusOK(t *testing.T) { mExporter := new(test.MockExporter) // create new index manager with mocks values - manager, err := NewManager(fStorage, mIdxBuilder, t.Name(), WithExporter(mExporter)) + manager, err := NewManager(fStorage, mIdxBuilder, WithExporter(mExporter)) require.NoError(t, err) // a fake tipset to index @@ -81,7 +81,7 @@ func TestManagerStatusOK(t *testing.T) { require.True(t, success) } -func TestManagerStatusInfo(t *testing.T) { +func TestManagerWithResultStatusInfo(t *testing.T) { ctx := context.Background() // mock index builder and mock indexer @@ -94,7 +94,7 @@ func TestManagerStatusInfo(t *testing.T) { mExporter := new(test.MockExporter) // create new index manager with mocks values - manager, err := NewManager(fStorage, mIdxBuilder, t.Name(), WithExporter(mExporter)) + manager, err := NewManager(fStorage, mIdxBuilder, WithExporter(mExporter)) require.NoError(t, err) // a fake tipset to index @@ -146,7 +146,7 @@ func TestManagerStatusInfo(t *testing.T) { require.True(t, success) } -func TestManagerStatusOKAndError(t *testing.T) { +func TestManagerWithResultStatusContainOKAndError(t *testing.T) { ctx := context.Background() // mock index builder and mock indexer @@ -159,7 +159,7 @@ func TestManagerStatusOKAndError(t *testing.T) { mExporter := new(test.MockExporter) // create new index manager with mocks values - manager, err := NewManager(fStorage, mIdxBuilder, t.Name(), WithExporter(mExporter)) + manager, err := NewManager(fStorage, mIdxBuilder, WithExporter(mExporter)) require.NoError(t, err) // a fake tipset to index @@ -236,7 +236,7 @@ func TestManagerStatusOKAndError(t *testing.T) { } -func TestManagerStatusOKAndSkip(t *testing.T) { +func TestManagerWithResultStatusContainOKAndSkip(t *testing.T) { ctx := context.Background() // mock index builder and mock indexer @@ -249,7 +249,7 @@ func TestManagerStatusOKAndSkip(t *testing.T) { mExporter := new(test.MockExporter) // create new index manager with mocks values - manager, err := NewManager(fStorage, mIdxBuilder, t.Name(), WithExporter(mExporter)) + manager, err := NewManager(fStorage, mIdxBuilder, WithExporter(mExporter)) require.NoError(t, err) // a fake tipset to index @@ -326,7 +326,7 @@ func TestManagerStatusOKAndSkip(t *testing.T) { } -func TestManagerStatusError(t *testing.T) { +func TestManagerWithResultStatusError(t *testing.T) { ctx := context.Background() // mock index builder and mock indexer @@ -339,7 +339,7 @@ func TestManagerStatusError(t *testing.T) { mExporter := new(test.MockExporter) // create new index manager with mocks values - manager, err := NewManager(fStorage, mIdxBuilder, t.Name(), WithExporter(mExporter)) + manager, err := NewManager(fStorage, mIdxBuilder, WithExporter(mExporter)) require.NoError(t, err) // a fake tipset to index @@ -391,7 +391,7 @@ func TestManagerStatusError(t *testing.T) { require.False(t, success) } -func TestManagerStatusSkip(t *testing.T) { +func TestManagerWithResultStatusSkip(t *testing.T) { ctx := context.Background() // mock index builder and mock indexer @@ -404,7 +404,7 @@ func TestManagerStatusSkip(t *testing.T) { mExporter := new(test.MockExporter) // create new index manager with mocks values - manager, err := NewManager(fStorage, mIdxBuilder, t.Name(), WithExporter(mExporter)) + manager, err := NewManager(fStorage, mIdxBuilder, WithExporter(mExporter)) require.NoError(t, err) // a fake tipset to index @@ -469,7 +469,7 @@ func TestManagerFatalError(t *testing.T) { mExporter := new(test.MockExporter) // create new index manager with mocks values - manager, err := NewManager(fStorage, mIdxBuilder, t.Name(), WithExporter(mExporter)) + manager, err := NewManager(fStorage, mIdxBuilder, WithExporter(mExporter)) require.NoError(t, err) // a fake tipset to index @@ -482,6 +482,7 @@ func TestManagerFatalError(t *testing.T) { // expect index manager to pass MockedIndexer anything (ctx) and the tipset, returning the channels created above. mIdxBuilder.MockIndexer.On("TipSet", mock.Anything, ts1).Return(resChan, errChan, nil) + mExporter.On("ExportResult", mock.Anything, fStorage, tsHeight, []*indexer.ModelResult(nil)).Return(nil) // send a fatal error to the index manager errChan <- fmt.Errorf("fatal error") @@ -493,7 +494,7 @@ func TestManagerFatalError(t *testing.T) { require.False(t, success) } -func TestManagerFatalErrorAndOkReport(t *testing.T) { +func TestManagerFatalErrorAndResultStatusOk(t *testing.T) { ctx := context.Background() // mock index builder and mock indexer @@ -506,7 +507,7 @@ func TestManagerFatalErrorAndOkReport(t *testing.T) { mExporter := new(test.MockExporter) // create new index manager with mocks values - manager, err := NewManager(fStorage, mIdxBuilder, t.Name(), WithExporter(mExporter)) + manager, err := NewManager(fStorage, mIdxBuilder, WithExporter(mExporter)) require.NoError(t, err) // a fake tipset to index diff --git a/chain/indexer/integrated/testing/builder.go b/chain/indexer/integrated/testing/builder.go index bfc98697b..86f6c8358 100644 --- a/chain/indexer/integrated/testing/builder.go +++ b/chain/indexer/integrated/testing/builder.go @@ -16,6 +16,10 @@ type MockIndexBuilder struct { mock.Mock } +func (t *MockIndexBuilder) Name() string { + return "mockindexbuilder" +} + func (t *MockIndexBuilder) WithTasks(tasks []string) tipset.IndexerBuilder { return t } diff --git a/chain/indexer/integrated/tipset/builder.go b/chain/indexer/integrated/tipset/builder.go index 110612ded..cc8e6e8ac 100644 --- a/chain/indexer/integrated/tipset/builder.go +++ b/chain/indexer/integrated/tipset/builder.go @@ -11,6 +11,7 @@ import ( type IndexerBuilder interface { WithTasks(tasks []string) IndexerBuilder Build() (Indexer, error) + Name() string } type Indexer interface { @@ -29,6 +30,10 @@ type Builder struct { name string } +func (b *Builder) Name() string { + return b.name +} + func (b *Builder) add(cb func(ti *TipSetIndexer)) { b.options = append(b.options, cb) } diff --git a/chain/walk/walker_test.go b/chain/walk/walker_test.go index 2c66c5441..3617092ff 100644 --- a/chain/walk/walker_test.go +++ b/chain/walk/walker_test.go @@ -64,7 +64,7 @@ func TestWalker(t *testing.T) { taskAPI, err := datasource.NewDataSource(nodeAPI) require.NoError(t, err) - im, err := integrated.NewManager(strg, tipset.NewBuilder(taskAPI, t.Name()), t.Name(), integrated.WithWindow(builtin.EpochDurationSeconds*time.Second)) + im, err := integrated.NewManager(strg, tipset.NewBuilder(taskAPI, t.Name()), integrated.WithWindow(builtin.EpochDurationSeconds*time.Second)) require.NoError(t, err, "NewManager") t.Logf("initializing indexer") diff --git a/chain/watch/watcher_test.go b/chain/watch/watcher_test.go index f77f0b1b8..99d643945 100644 --- a/chain/watch/watcher_test.go +++ b/chain/watch/watcher_test.go @@ -72,7 +72,7 @@ func TestWatcher(t *testing.T) { taskAPI, err := datasource.NewDataSource(nodeAPI) require.NoError(t, err) - im, err := integrated.NewManager(strg, tipset.NewBuilder(taskAPI, t.Name()), t.Name(), integrated.WithWindow(builtin.EpochDurationSeconds*time.Second)) + im, err := integrated.NewManager(strg, tipset.NewBuilder(taskAPI, t.Name()), integrated.WithWindow(builtin.EpochDurationSeconds*time.Second)) require.NoError(t, err, "NewManager") t.Logf("initializing indexer") idx := NewWatcher(nil, im, t.Name(), WithConfidence(0), WithConcurrentWorkers(1), WithBufferSize(5), WithTasks(tasktype.BlocksTask)) diff --git a/lens/lily/impl.go b/lens/lily/impl.go index 5a4f0a7b2..b49947499 100644 --- a/lens/lily/impl.go +++ b/lens/lily/impl.go @@ -96,7 +96,7 @@ func (m *LilyNodeAPI) StartTipSetWorker(_ context.Context, cfg *LilyTipSetWorker return nil, err } - im, err := integrated.NewManager(strg, tipset.NewBuilder(taskAPI, cfg.JobConfig.Name), cfg.JobConfig.Name) + im, err := integrated.NewManager(strg, tipset.NewBuilder(taskAPI, cfg.JobConfig.Name)) if err != nil { return nil, err } @@ -140,7 +140,7 @@ func (m *LilyNodeAPI) LilyIndex(_ context.Context, cfg *LilyIndexConfig) (interf } // instantiate an indexer to extract block, message, and actor state data from observed tipsets and persists it to the storage. - im, err := integrated.NewManager(strg, tipset.NewBuilder(taskAPI, cfg.JobConfig.Name), cfg.JobConfig.Name, integrated.WithWindow(cfg.JobConfig.Window)) + im, err := integrated.NewManager(strg, tipset.NewBuilder(taskAPI, cfg.JobConfig.Name), integrated.WithWindow(cfg.JobConfig.Window)) if err != nil { return nil, err } @@ -204,7 +204,7 @@ func (m *LilyNodeAPI) LilyWatch(_ context.Context, cfg *LilyWatchConfig) (*sched } // instantiate an indexer to extract block, message, and actor state data from observed tipsets and persists it to the storage. - idx, err := integrated.NewManager(strg, tipset.NewBuilder(taskAPI, cfg.JobConfig.Name), cfg.JobConfig.Name, integrated.WithWindow(cfg.JobConfig.Window)) + idx, err := integrated.NewManager(strg, tipset.NewBuilder(taskAPI, cfg.JobConfig.Name), integrated.WithWindow(cfg.JobConfig.Window)) if err != nil { return nil, err } @@ -292,7 +292,7 @@ func (m *LilyNodeAPI) LilyWalk(_ context.Context, cfg *LilyWalkConfig) (*schedul } // instantiate an indexer to extract block, message, and actor state data from observed tipsets and persists it to the storage. - idx, err := integrated.NewManager(strg, tipset.NewBuilder(taskAPI, cfg.JobConfig.Name), cfg.JobConfig.Name, integrated.WithWindow(cfg.JobConfig.Window)) + idx, err := integrated.NewManager(strg, tipset.NewBuilder(taskAPI, cfg.JobConfig.Name), integrated.WithWindow(cfg.JobConfig.Window)) if err != nil { return nil, err }