diff --git a/chain/gap/fill.go b/chain/gap/fill.go index 0cfb3b9f3..1751917ed 100644 --- a/chain/gap/fill.go +++ b/chain/gap/fill.go @@ -11,6 +11,7 @@ import ( "github.com/filecoin-project/lily/chain/datasource" "github.com/filecoin-project/lily/chain/indexer" "github.com/filecoin-project/lily/chain/indexer/integrated" + "github.com/filecoin-project/lily/chain/indexer/integrated/tipset" "github.com/filecoin-project/lily/lens" "github.com/filecoin-project/lily/storage" ) @@ -54,7 +55,7 @@ func (g *Filler) Run(ctx context.Context) error { return err } - index, err := integrated.NewManager(taskAPI, g.DB, g.name) + index, err := integrated.NewManager(g.DB, tipset.NewBuilder(taskAPI, g.name)) if err != nil { return err } diff --git a/chain/indexer/exporter.go b/chain/indexer/exporter.go index a9931a746..996393aa5 100644 --- a/chain/indexer/exporter.go +++ b/chain/indexer/exporter.go @@ -41,6 +41,9 @@ type ModelResult struct { // - if data with height N and SR1 is being persisted and a request to persist data with the same values is made, allow it // - if data with height N and SR2 is being persisted and a request to persist data with height N and SR1 is made, block func (me *ModelExporter) ExportResult(ctx context.Context, strg model.Storage, height int64, results []*ModelResult) error { + if len(results) == 0 { + return nil + } // lock exporting based on height only allowing a single height to be persisted simultaneously heightKey := strconv.FormatInt(height, 10) me.heightKeyMu.LockKey(heightKey) diff --git a/chain/indexer/integrated/builder.go b/chain/indexer/integrated/builder.go deleted file mode 100644 index ded780eb7..000000000 --- a/chain/indexer/integrated/builder.go +++ /dev/null @@ -1,40 +0,0 @@ -package integrated - -import "github.com/filecoin-project/lily/tasks" - -func NewBuilder(node tasks.DataSource, name string) *Builder { - return &Builder{api: node, name: name} -} - -type Builder struct { - options []func(ti *TipSetIndexer) - api tasks.DataSource - name string -} - -func (b *Builder) add(cb func(ti *TipSetIndexer)) { - b.options = append(b.options, cb) -} - -func (b *Builder) WithTasks(tasks []string) *Builder { - b.add(func(ti *TipSetIndexer) { - ti.taskNames = tasks - }) - return b -} - -func (b *Builder) Build() (*TipSetIndexer, error) { - ti := &TipSetIndexer{ - name: b.name, - node: b.api, - } - - for _, opt := range b.options { - opt(ti) - } - - if err := ti.init(); err != nil { - return nil, err - } - return ti, nil -} diff --git a/chain/indexer/integrated/manager.go b/chain/indexer/integrated/manager.go index 701e2dc42..6a745a02f 100644 --- a/chain/indexer/integrated/manager.go +++ b/chain/indexer/integrated/manager.go @@ -7,11 +7,13 @@ import ( "github.com/filecoin-project/lotus/chain/types" logging "github.com/ipfs/go-log/v2" "go.opentelemetry.io/otel" + "go.uber.org/atomic" + "golang.org/x/sync/errgroup" "github.com/filecoin-project/lily/chain/indexer" + "github.com/filecoin-project/lily/chain/indexer/integrated/tipset" "github.com/filecoin-project/lily/model" visormodel "github.com/filecoin-project/lily/model/visor" - "github.com/filecoin-project/lily/tasks" ) var log = logging.Logger("lily/index/manager") @@ -22,9 +24,8 @@ type Exporter interface { // Manager manages the execution of an Indexer. It may be used to index TipSets both serially or in parallel. type Manager struct { - api tasks.DataSource storage model.Storage - indexBuilder *Builder + indexBuilder tipset.IndexerBuilder exporter Exporter window time.Duration name string @@ -41,23 +42,28 @@ func WithWindow(w time.Duration) ManagerOpt { } } +func WithExporter(e Exporter) ManagerOpt { + return func(m *Manager) { + m.exporter = e + } +} + // NewManager returns a default Manager. Any provided ManagerOpt's will override Manager's default values. -func NewManager(api tasks.DataSource, strg model.Storage, name string, opts ...ManagerOpt) (*Manager, error) { +func NewManager(strg model.Storage, idxBuilder tipset.IndexerBuilder, opts ...ManagerOpt) (*Manager, error) { im := &Manager{ - api: api, storage: strg, window: 0, - name: name, + name: idxBuilder.Name(), } for _, opt := range opts { opt(im) } - im.indexBuilder = NewBuilder(api, name) + im.indexBuilder = idxBuilder if im.exporter == nil { - im.exporter = indexer.NewModelExporter(name) + im.exporter = indexer.NewModelExporter(idxBuilder.Name()) } return im, nil } @@ -104,20 +110,17 @@ func (i *Manager) TipSet(ctx context.Context, ts *types.TipSet, options ...index return true, nil } - var modelResults []*indexer.ModelResult - success := true - // collect all the results, recording if any of the tasks were skipped or errored - for res := range taskResults { - select { - case fatal := <-taskErrors: - lg.Errorw("fatal indexer error", "error", fatal) - return false, fatal - default: + success := atomic.NewBool(true) + grp, ctx := errgroup.WithContext(ctx) + grp.Go(func() error { + var modelResults []*indexer.ModelResult + // collect all the results, recording if any of the tasks were skipped or errored + for res := range taskResults { for _, report := range res.Report { if report.Status != visormodel.ProcessingStatusOK && report.Status != visormodel.ProcessingStatusInfo { lg.Warnw("task failed", "task", res.Name, "status", report.Status, "errors", report.ErrorsDetected, "info", report.StatusInformation) - success = false + success.Store(false) } else { lg.Infow("task success", "task", res.Name, "status", report.Status, "duration", report.CompletedAt.Sub(report.StartedAt)) } @@ -128,13 +131,26 @@ func (i *Manager) TipSet(ctx context.Context, ts *types.TipSet, options ...index }) } - } - // synchronously export extracted data and its report. If datas at this height are currently being persisted this method will block to avoid deadlocking the database. - if err := i.exporter.ExportResult(ctx, i.storage, int64(ts.Height()), modelResults); err != nil { + // synchronously export extracted data and its report. If datas at this height are currently being persisted this method will block to avoid deadlocking the database. + if err := i.exporter.ExportResult(ctx, i.storage, int64(ts.Height()), modelResults); err != nil { + return err + } + + return nil + }) + + grp.Go(func() error { + for fatal := range taskErrors { + success.Store(false) + return fatal + } + return nil + }) + + if err := grp.Wait(); err != nil { return false, err } - lg.Infow("index tipset complete", "success", success) - return success, nil + return success.Load(), nil } diff --git a/chain/indexer/integrated/manager_test.go b/chain/indexer/integrated/manager_test.go new file mode 100644 index 000000000..217772050 --- /dev/null +++ b/chain/indexer/integrated/manager_test.go @@ -0,0 +1,563 @@ +package integrated + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/lily/chain/indexer" + test "github.com/filecoin-project/lily/chain/indexer/integrated/testing" + "github.com/filecoin-project/lily/chain/indexer/integrated/tipset" + "github.com/filecoin-project/lily/model" + visormodel "github.com/filecoin-project/lily/model/visor" +) + +func TestManagerWithResultStatusOK(t *testing.T) { + ctx := context.Background() + + // mock index builder and mock indexer + mIdxBuilder := new(test.MockIndexBuilder) + mIdx := new(test.MockIndexer) + mIdxBuilder.MockIndexer = mIdx + + // fake storage and mock exporter + fStorage := new(test.FakeStorage) + mExporter := new(test.MockExporter) + + // create new index manager with mocks values + manager, err := NewManager(fStorage, mIdxBuilder, WithExporter(mExporter)) + require.NoError(t, err) + + // a fake tipset to index + tsHeight := int64(1) + ts1 := test.MustFakeTipSet(t, tsHeight) + + // results channel and error channel MockIndexer will return. + resChan := make(chan *tipset.Result, 1) + errChan := make(chan error) + + // expect index manager to pass MockedIndexer anything (ctx) and the tipset, returning the channels created above. + mIdxBuilder.MockIndexer.On("TipSet", mock.Anything, ts1).Return(resChan, errChan, nil) + + // create some fake data and a processing report + data := &test.FakePersistable{} + report := visormodel.ProcessingReportList{ + &visormodel.ProcessingReport{ + Height: tsHeight, + StateRoot: "stateroot", + Reporter: t.Name(), + Task: "task", + StartedAt: time.Unix(0, 0), + CompletedAt: time.Unix(0, 0), + // status of OK means indexing was successful + Status: visormodel.ProcessingStatusOK, + StatusInformation: "", + ErrorsDetected: nil, + }, + } + // send report to index manager and close the channel to signal MockIndexer is done indexing data. + resChan <- &tipset.Result{ + Name: t.Name(), + Data: data, + Report: report, + } + close(resChan) + close(errChan) + + // mock exporter expects to receieve a result with data and report + mExporter.On("ExportResult", mock.Anything, fStorage, int64(ts1.Height()), []*indexer.ModelResult{ + { + Name: t.Name(), + Model: model.PersistableList{report, data}, + }, + }).Return(nil) + + success, err := manager.TipSet(ctx, ts1) + require.NoError(t, err) + require.True(t, success) +} + +func TestManagerWithResultStatusInfo(t *testing.T) { + ctx := context.Background() + + // mock index builder and mock indexer + mIdxBuilder := new(test.MockIndexBuilder) + mIdx := new(test.MockIndexer) + mIdxBuilder.MockIndexer = mIdx + + // fake storage and mock exporter + fStorage := new(test.FakeStorage) + mExporter := new(test.MockExporter) + + // create new index manager with mocks values + manager, err := NewManager(fStorage, mIdxBuilder, WithExporter(mExporter)) + require.NoError(t, err) + + // a fake tipset to index + tsHeight := int64(1) + ts1 := test.MustFakeTipSet(t, tsHeight) + + // results channel and error channel MockIndexer will return. + resChan := make(chan *tipset.Result, 1) + errChan := make(chan error) + + // expect index manager to pass MockedIndexer anything (ctx) and the tipset, returning the channels created above. + mIdxBuilder.MockIndexer.On("TipSet", mock.Anything, ts1).Return(resChan, errChan, nil) + + // create some fake data and a processing report + data := &test.FakePersistable{} + report := visormodel.ProcessingReportList{ + &visormodel.ProcessingReport{ + Height: tsHeight, + StateRoot: "stateroot", + Reporter: t.Name(), + Task: "task", + StartedAt: time.Unix(0, 0), + CompletedAt: time.Unix(0, 0), + // status of Info means indexing was successful + Status: visormodel.ProcessingStatusInfo, + StatusInformation: "", + ErrorsDetected: nil, + }, + } + // send report to index manager and close the channel to signal MockIndexer is done indexing data. + resChan <- &tipset.Result{ + Name: t.Name(), + Data: data, + Report: report, + } + close(resChan) + close(errChan) + + // mock exporter expects to receieve a result with data and report + mExporter.On("ExportResult", mock.Anything, fStorage, int64(ts1.Height()), []*indexer.ModelResult{ + { + Name: t.Name(), + Model: model.PersistableList{report, data}, + }, + }).Return(nil) + + success, err := manager.TipSet(ctx, ts1) + require.NoError(t, err) + require.True(t, success) +} + +func TestManagerWithResultStatusContainOKAndError(t *testing.T) { + ctx := context.Background() + + // mock index builder and mock indexer + mIdxBuilder := new(test.MockIndexBuilder) + mIdx := new(test.MockIndexer) + mIdxBuilder.MockIndexer = mIdx + + // fake storage and mock exporter + fStorage := new(test.FakeStorage) + mExporter := new(test.MockExporter) + + // create new index manager with mocks values + manager, err := NewManager(fStorage, mIdxBuilder, WithExporter(mExporter)) + require.NoError(t, err) + + // a fake tipset to index + tsHeight := int64(1) + ts1 := test.MustFakeTipSet(t, tsHeight) + + // results channel and error channel MockIndexer will return. + resChan := make(chan *tipset.Result, 2) + errChan := make(chan error) + + // expect index manager to pass MockedIndexer anything (ctx) and the tipset, returning the channels created above. + mIdxBuilder.MockIndexer.On("TipSet", mock.Anything, ts1).Return(resChan, errChan, nil) + + // create some fake data and a processing report + data := &test.FakePersistable{} + reportOK := visormodel.ProcessingReportList{ + &visormodel.ProcessingReport{ + Height: tsHeight, + StateRoot: "stateroot", + Reporter: t.Name(), + Task: "taskOk", + StartedAt: time.Unix(0, 0), + CompletedAt: time.Unix(0, 0), + // status of OK means indexing was successful + Status: visormodel.ProcessingStatusOK, + StatusInformation: "", + ErrorsDetected: nil, + }, + } + // send report to index manager + resChan <- &tipset.Result{ + Name: t.Name(), + Data: data, + Report: reportOK, + } + reportError := visormodel.ProcessingReportList{ + &visormodel.ProcessingReport{ + Height: tsHeight, + StateRoot: "stateroot", + Reporter: t.Name(), + Task: "taskError", + StartedAt: time.Unix(0, 0), + CompletedAt: time.Unix(0, 0), + // status of Error means indexing was unsuccessful + Status: visormodel.ProcessingStatusError, + StatusInformation: "", + ErrorsDetected: nil, + }, + } + // send to index manager + resChan <- &tipset.Result{ + Name: t.Name(), + Data: data, + Report: reportError, + } + close(resChan) + close(errChan) + + // mock exporter expects to receieve two results + mExporter.On("ExportResult", mock.Anything, fStorage, int64(ts1.Height()), []*indexer.ModelResult{ + { + Name: t.Name(), + Model: model.PersistableList{reportOK, data}, + }, + { + Name: t.Name(), + Model: model.PersistableList{reportError, data}, + }, + }).Return(nil) + + success, err := manager.TipSet(ctx, ts1) + require.NoError(t, err) + require.False(t, success) + +} + +func TestManagerWithResultStatusContainOKAndSkip(t *testing.T) { + ctx := context.Background() + + // mock index builder and mock indexer + mIdxBuilder := new(test.MockIndexBuilder) + mIdx := new(test.MockIndexer) + mIdxBuilder.MockIndexer = mIdx + + // fake storage and mock exporter + fStorage := new(test.FakeStorage) + mExporter := new(test.MockExporter) + + // create new index manager with mocks values + manager, err := NewManager(fStorage, mIdxBuilder, WithExporter(mExporter)) + require.NoError(t, err) + + // a fake tipset to index + tsHeight := int64(1) + ts1 := test.MustFakeTipSet(t, tsHeight) + + // results channel and error channel MockIndexer will return. + resChan := make(chan *tipset.Result, 2) + errChan := make(chan error) + + // expect index manager to pass MockedIndexer anything (ctx) and the tipset, returning the channels created above. + mIdxBuilder.MockIndexer.On("TipSet", mock.Anything, ts1).Return(resChan, errChan, nil) + + // create some fake data and a processing report + data := &test.FakePersistable{} + reportOK := visormodel.ProcessingReportList{ + &visormodel.ProcessingReport{ + Height: tsHeight, + StateRoot: "stateroot", + Reporter: t.Name(), + Task: "taskOk", + StartedAt: time.Unix(0, 0), + CompletedAt: time.Unix(0, 0), + // status of OK means indexing was successful + Status: visormodel.ProcessingStatusOK, + StatusInformation: "", + ErrorsDetected: nil, + }, + } + // send report to index manager + resChan <- &tipset.Result{ + Name: t.Name(), + Data: data, + Report: reportOK, + } + reportError := visormodel.ProcessingReportList{ + &visormodel.ProcessingReport{ + Height: tsHeight, + StateRoot: "stateroot", + Reporter: t.Name(), + Task: "taskError", + StartedAt: time.Unix(0, 0), + CompletedAt: time.Unix(0, 0), + // status of Skip means indexing was unsuccessful + Status: visormodel.ProcessingStatusSkip, + StatusInformation: "", + ErrorsDetected: nil, + }, + } + // send to index manager + resChan <- &tipset.Result{ + Name: t.Name(), + Data: data, + Report: reportError, + } + close(resChan) + close(errChan) + + // mock exporter expects to receieve two results + mExporter.On("ExportResult", mock.Anything, fStorage, int64(ts1.Height()), []*indexer.ModelResult{ + { + Name: t.Name(), + Model: model.PersistableList{reportOK, data}, + }, + { + Name: t.Name(), + Model: model.PersistableList{reportError, data}, + }, + }).Return(nil) + + success, err := manager.TipSet(ctx, ts1) + require.NoError(t, err) + require.False(t, success) + +} + +func TestManagerWithResultStatusError(t *testing.T) { + ctx := context.Background() + + // mock index builder and mock indexer + mIdxBuilder := new(test.MockIndexBuilder) + mIdx := new(test.MockIndexer) + mIdxBuilder.MockIndexer = mIdx + + // fake storage and mock exporter + fStorage := new(test.FakeStorage) + mExporter := new(test.MockExporter) + + // create new index manager with mocks values + manager, err := NewManager(fStorage, mIdxBuilder, WithExporter(mExporter)) + require.NoError(t, err) + + // a fake tipset to index + tsHeight := int64(1) + ts1 := test.MustFakeTipSet(t, tsHeight) + + // results channel and error channel MockIndexer will return. + resChan := make(chan *tipset.Result, 1) + errChan := make(chan error) + + // expect index manager to pass MockedIndexer anything (ctx) and the tipset, returning the channels created above. + mIdxBuilder.MockIndexer.On("TipSet", mock.Anything, ts1).Return(resChan, errChan, nil) + + // create some fake data and a processing report + data := &test.FakePersistable{} + report := visormodel.ProcessingReportList{ + &visormodel.ProcessingReport{ + Height: tsHeight, + StateRoot: "stateroot", + Reporter: t.Name(), + Task: "task", + StartedAt: time.Unix(0, 0), + CompletedAt: time.Unix(0, 0), + // status Error means indexing was unsuccessful + Status: visormodel.ProcessingStatusError, + StatusInformation: "", + ErrorsDetected: nil, + }, + } + // send report to index manager and close the channel to signal MockIndexer is done indexing data. + resChan <- &tipset.Result{ + Name: t.Name(), + Data: data, + Report: report, + } + close(resChan) + close(errChan) + + // mock exporter expects to receieve a result with data and report + mExporter.On("ExportResult", mock.Anything, fStorage, int64(ts1.Height()), []*indexer.ModelResult{ + { + Name: t.Name(), + Model: model.PersistableList{report, data}, + }, + }).Return(nil) + + success, err := manager.TipSet(ctx, ts1) + require.NoError(t, err) + require.False(t, success) +} + +func TestManagerWithResultStatusSkip(t *testing.T) { + ctx := context.Background() + + // mock index builder and mock indexer + mIdxBuilder := new(test.MockIndexBuilder) + mIdx := new(test.MockIndexer) + mIdxBuilder.MockIndexer = mIdx + + // fake storage and mock exporter + fStorage := new(test.FakeStorage) + mExporter := new(test.MockExporter) + + // create new index manager with mocks values + manager, err := NewManager(fStorage, mIdxBuilder, WithExporter(mExporter)) + require.NoError(t, err) + + // a fake tipset to index + tsHeight := int64(1) + ts1 := test.MustFakeTipSet(t, tsHeight) + + // results channel and error channel MockIndexer will return. + resChan := make(chan *tipset.Result, 1) + errChan := make(chan error) + + // expect index manager to pass MockedIndexer anything (ctx) and the tipset, returning the channels created above. + mIdxBuilder.MockIndexer.On("TipSet", mock.Anything, ts1).Return(resChan, errChan, nil) + + // create some fake data and a processing report + data := &test.FakePersistable{} + report := visormodel.ProcessingReportList{ + &visormodel.ProcessingReport{ + Height: tsHeight, + StateRoot: "stateroot", + Reporter: t.Name(), + Task: "task", + StartedAt: time.Unix(0, 0), + CompletedAt: time.Unix(0, 0), + // status Skip means indexing was unsuccessful + Status: visormodel.ProcessingStatusSkip, + StatusInformation: "", + ErrorsDetected: nil, + }, + } + // send report to index manager and close the channel to signal MockIndexer is done indexing data. + resChan <- &tipset.Result{ + Name: t.Name(), + Data: data, + Report: report, + } + close(resChan) + close(errChan) + + // mock exporter expects to receieve a result with data and report + mExporter.On("ExportResult", mock.Anything, fStorage, int64(ts1.Height()), []*indexer.ModelResult{ + { + Name: t.Name(), + Model: model.PersistableList{report, data}, + }, + }).Return(nil) + + success, err := manager.TipSet(ctx, ts1) + require.NoError(t, err) + require.False(t, success) +} + +func TestManagerFatalError(t *testing.T) { + ctx := context.Background() + + // mock index builder and mock indexer + mIdxBuilder := new(test.MockIndexBuilder) + mIdx := new(test.MockIndexer) + mIdxBuilder.MockIndexer = mIdx + + // fake storage and mock exporter + fStorage := new(test.FakeStorage) + mExporter := new(test.MockExporter) + + // create new index manager with mocks values + manager, err := NewManager(fStorage, mIdxBuilder, WithExporter(mExporter)) + require.NoError(t, err) + + // a fake tipset to index + tsHeight := int64(1) + ts1 := test.MustFakeTipSet(t, tsHeight) + + // results channel and error channel MockIndexer will return. + resChan := make(chan *tipset.Result, 1) + errChan := make(chan error, 1) + + // expect index manager to pass MockedIndexer anything (ctx) and the tipset, returning the channels created above. + mIdxBuilder.MockIndexer.On("TipSet", mock.Anything, ts1).Return(resChan, errChan, nil) + mExporter.On("ExportResult", mock.Anything, fStorage, tsHeight, []*indexer.ModelResult(nil)).Return(nil) + + // send a fatal error to the index manager + errChan <- fmt.Errorf("fatal error") + close(resChan) + close(errChan) + + success, err := manager.TipSet(ctx, ts1) + require.Error(t, err) + require.False(t, success) +} + +func TestManagerFatalErrorAndResultStatusOk(t *testing.T) { + ctx := context.Background() + + // mock index builder and mock indexer + mIdxBuilder := new(test.MockIndexBuilder) + mIdx := new(test.MockIndexer) + mIdxBuilder.MockIndexer = mIdx + + // fake storage and mock exporter + fStorage := new(test.FakeStorage) + mExporter := new(test.MockExporter) + + // create new index manager with mocks values + manager, err := NewManager(fStorage, mIdxBuilder, WithExporter(mExporter)) + require.NoError(t, err) + + // a fake tipset to index + tsHeight := int64(1) + ts1 := test.MustFakeTipSet(t, tsHeight) + + // results channel and error channel MockIndexer will return. + resChan := make(chan *tipset.Result, 1) + errChan := make(chan error, 1) + + // expect index manager to pass MockedIndexer anything (ctx) and the tipset, returning the channels created above. + mIdxBuilder.MockIndexer.On("TipSet", mock.Anything, ts1).Return(resChan, errChan, nil) + + // create some fake data and a processing report + data := &test.FakePersistable{} + report := visormodel.ProcessingReportList{ + &visormodel.ProcessingReport{ + Height: tsHeight, + StateRoot: "stateroot", + Reporter: t.Name(), + Task: "task", + StartedAt: time.Unix(0, 0), + CompletedAt: time.Unix(0, 0), + // status of OK means indexing was successful + Status: visormodel.ProcessingStatusOK, + StatusInformation: "", + ErrorsDetected: nil, + }, + } + // send report to index manager and close the channel to signal MockIndexer is done indexing data. + resChan <- &tipset.Result{ + Name: t.Name(), + Data: data, + Report: report, + } + + // send a fatal error to the index manager + errChan <- fmt.Errorf("fatal error") + close(resChan) + close(errChan) + + // mock exporter expects to receieve a result with data and report + mExporter.On("ExportResult", mock.Anything, fStorage, int64(ts1.Height()), []*indexer.ModelResult{ + { + Name: t.Name(), + Model: model.PersistableList{report, data}, + }, + }).Return(nil) + + success, err := manager.TipSet(ctx, ts1) + require.Error(t, err) + require.False(t, success) +} diff --git a/chain/indexer/integrated/testing/builder.go b/chain/indexer/integrated/testing/builder.go new file mode 100644 index 000000000..86f6c8358 --- /dev/null +++ b/chain/indexer/integrated/testing/builder.go @@ -0,0 +1,64 @@ +package testing + +import ( + "context" + + "github.com/filecoin-project/lotus/chain/types" + "github.com/stretchr/testify/mock" + + "github.com/filecoin-project/lily/chain/indexer" + "github.com/filecoin-project/lily/chain/indexer/integrated/tipset" + "github.com/filecoin-project/lily/model" +) + +type MockIndexBuilder struct { + MockIndexer *MockIndexer + mock.Mock +} + +func (t *MockIndexBuilder) Name() string { + return "mockindexbuilder" +} + +func (t *MockIndexBuilder) WithTasks(tasks []string) tipset.IndexerBuilder { + return t +} + +func (t *MockIndexBuilder) Build() (tipset.Indexer, error) { + return t.MockIndexer, nil +} + +type MockIndexer struct { + mock.Mock +} + +func (t *MockIndexer) TipSet(ctx context.Context, ts *types.TipSet) (chan *tipset.Result, chan error, error) { + args := t.Called(ctx, ts) + resChan := args.Get(0) + errChan := args.Get(1) + err := args.Error(2) + return resChan.(chan *tipset.Result), errChan.(chan error), err +} + +type MockExporter struct { + mock.Mock +} + +func (t *MockExporter) ExportResult(ctx context.Context, strg model.Storage, height int64, m []*indexer.ModelResult) error { + args := t.Called(ctx, strg, height, m) + return args.Error(0) +} + +type FakeStorage struct { +} + +func (t *FakeStorage) PersistBatch(ctx context.Context, ps ...model.Persistable) error { + return nil +} + +type FakePersistable struct { +} + +func (t *FakePersistable) Persist(ctx context.Context, s model.StorageBatch, version model.Version) error { + return nil +} diff --git a/chain/indexer/integrated/testing/types.go b/chain/indexer/integrated/testing/types.go new file mode 100644 index 000000000..12cea67da --- /dev/null +++ b/chain/indexer/integrated/testing/types.go @@ -0,0 +1,38 @@ +package testing + +import ( + "testing" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/crypto" + "github.com/filecoin-project/lotus/chain/types" + "github.com/ipfs/go-cid" + "github.com/stretchr/testify/require" +) + +var dummyCid cid.Cid + +func init() { + dummyCid, _ = cid.Parse("bafkqaaa") +} + +func MustFakeTipSet(t *testing.T, height int64) *types.TipSet { + minerAddr, err := address.NewFromString("t00") + require.NoError(t, err) + + ts, err := types.NewTipSet([]*types.BlockHeader{{ + Miner: minerAddr, + Height: abi.ChainEpoch(height), + ParentStateRoot: dummyCid, + Messages: dummyCid, + ParentMessageReceipts: dummyCid, + BlockSig: &crypto.Signature{Type: crypto.SigTypeBLS}, + BLSAggregate: &crypto.Signature{Type: crypto.SigTypeBLS}, + Timestamp: 1, + }}) + if err != nil { + t.Fatal(err) + } + return ts +} diff --git a/chain/indexer/integrated/tipset/builder.go b/chain/indexer/integrated/tipset/builder.go new file mode 100644 index 000000000..cc8e6e8ac --- /dev/null +++ b/chain/indexer/integrated/tipset/builder.go @@ -0,0 +1,62 @@ +package tipset + +import ( + "context" + + "github.com/filecoin-project/lotus/chain/types" + + "github.com/filecoin-project/lily/tasks" +) + +type IndexerBuilder interface { + WithTasks(tasks []string) IndexerBuilder + Build() (Indexer, error) + Name() string +} + +type Indexer interface { + TipSet(ctx context.Context, ts *types.TipSet) (chan *Result, chan error, error) +} + +var _ IndexerBuilder = (*Builder)(nil) + +func NewBuilder(node tasks.DataSource, name string) IndexerBuilder { + return &Builder{api: node, name: name} +} + +type Builder struct { + options []func(ti *TipSetIndexer) + api tasks.DataSource + name string +} + +func (b *Builder) Name() string { + return b.name +} + +func (b *Builder) add(cb func(ti *TipSetIndexer)) { + b.options = append(b.options, cb) +} + +func (b *Builder) WithTasks(tasks []string) IndexerBuilder { + b.add(func(ti *TipSetIndexer) { + ti.taskNames = tasks + }) + return b +} + +func (b *Builder) Build() (Indexer, error) { + ti := &TipSetIndexer{ + name: b.name, + node: b.api, + } + + for _, opt := range b.options { + opt(ti) + } + + if err := ti.init(); err != nil { + return nil, err + } + return ti, nil +} diff --git a/chain/indexer/integrated/tipset.go b/chain/indexer/integrated/tipset/tipset.go similarity index 99% rename from chain/indexer/integrated/tipset.go rename to chain/indexer/integrated/tipset/tipset.go index aae3ec22d..f512e97b5 100644 --- a/chain/indexer/integrated/tipset.go +++ b/chain/indexer/integrated/tipset/tipset.go @@ -1,4 +1,4 @@ -package integrated +package tipset import ( "context" @@ -6,6 +6,7 @@ import ( "time" "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" "go.opencensus.io/stats" "go.opencensus.io/tag" "go.opentelemetry.io/otel" @@ -60,6 +61,8 @@ import ( "github.com/filecoin-project/lily/tasks/msapprovals" ) +var log = logging.Logger("lily/integrated/tipset") + // TipSetIndexer extracts block, message and actor state data from a tipset and persists it to storage. Extraction // and persistence are concurrent. Extraction of the a tipset can proceed while data from the previous extraction is // being persisted. The indexer may be given a time window in which to complete data extraction. The name of the @@ -331,11 +334,12 @@ func (t *TipSetIndexer) TipSet(ctx context.Context, ts *types.TipSet) (chan *Res var ( outCh = make(chan *Result, len(stateResults)) - errCh = make(chan error) + errCh = make(chan error, 1) ) go func() { defer func() { close(outCh) + close(errCh) defer span.End() }() @@ -365,8 +369,8 @@ func (t *TipSetIndexer) TipSet(ctx context.Context, ts *types.TipSet) (chan *Res } } return - // received a result default: + // received a result llt := log.With("height", current.Height(), "task", res.Task, "reporter", t.name) diff --git a/chain/walk/walker_test.go b/chain/walk/walker_test.go index b3cac13b5..3617092ff 100644 --- a/chain/walk/walker_test.go +++ b/chain/walk/walker_test.go @@ -14,6 +14,7 @@ import ( "github.com/filecoin-project/lily/chain/actors/builtin" "github.com/filecoin-project/lily/chain/datasource" "github.com/filecoin-project/lily/chain/indexer/integrated" + "github.com/filecoin-project/lily/chain/indexer/integrated/tipset" "github.com/filecoin-project/lily/chain/indexer/tasktype" "github.com/filecoin-project/lily/model/blocks" "github.com/filecoin-project/lily/storage" @@ -63,7 +64,7 @@ func TestWalker(t *testing.T) { taskAPI, err := datasource.NewDataSource(nodeAPI) require.NoError(t, err) - im, err := integrated.NewManager(taskAPI, strg, t.Name(), integrated.WithWindow(builtin.EpochDurationSeconds*time.Second)) + im, err := integrated.NewManager(strg, tipset.NewBuilder(taskAPI, t.Name()), integrated.WithWindow(builtin.EpochDurationSeconds*time.Second)) require.NoError(t, err, "NewManager") t.Logf("initializing indexer") diff --git a/chain/watch/watcher_test.go b/chain/watch/watcher_test.go index f05ee1f60..99d643945 100644 --- a/chain/watch/watcher_test.go +++ b/chain/watch/watcher_test.go @@ -23,6 +23,7 @@ import ( "github.com/filecoin-project/lily/chain/cache" "github.com/filecoin-project/lily/chain/datasource" "github.com/filecoin-project/lily/chain/indexer/integrated" + "github.com/filecoin-project/lily/chain/indexer/integrated/tipset" "github.com/filecoin-project/lily/chain/indexer/tasktype" "github.com/filecoin-project/lily/model/blocks" "github.com/filecoin-project/lily/storage" @@ -71,7 +72,7 @@ func TestWatcher(t *testing.T) { taskAPI, err := datasource.NewDataSource(nodeAPI) require.NoError(t, err) - im, err := integrated.NewManager(taskAPI, strg, t.Name(), integrated.WithWindow(builtin.EpochDurationSeconds*time.Second)) + im, err := integrated.NewManager(strg, tipset.NewBuilder(taskAPI, t.Name()), integrated.WithWindow(builtin.EpochDurationSeconds*time.Second)) require.NoError(t, err, "NewManager") t.Logf("initializing indexer") idx := NewWatcher(nil, im, t.Name(), WithConfidence(0), WithConcurrentWorkers(1), WithBufferSize(5), WithTasks(tasktype.BlocksTask)) diff --git a/go.mod b/go.mod index db5908966..b052c7ef7 100644 --- a/go.mod +++ b/go.mod @@ -68,6 +68,7 @@ require ( github.com/hibiken/asynq/x v0.0.0-20220413130846-5c723f597e01 github.com/jedib0t/go-pretty/v6 v6.2.7 go.opentelemetry.io/otel/trace v1.3.0 + go.uber.org/atomic v1.9.0 ) require ( @@ -335,7 +336,6 @@ require ( github.com/zondax/ledger-go v0.12.1 // indirect go.opentelemetry.io/otel/metric v0.25.0 // indirect go.opentelemetry.io/otel/sdk/export/metric v0.25.0 // indirect - go.uber.org/atomic v1.9.0 // indirect go.uber.org/dig v1.12.0 // indirect go4.org v0.0.0-20200411211856-f5505b9728dd // indirect golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect diff --git a/lens/lily/impl.go b/lens/lily/impl.go index 12ebb8537..b49947499 100644 --- a/lens/lily/impl.go +++ b/lens/lily/impl.go @@ -26,6 +26,7 @@ import ( "github.com/filecoin-project/lily/chain/indexer/distributed" "github.com/filecoin-project/lily/chain/indexer/distributed/queue" "github.com/filecoin-project/lily/chain/indexer/integrated" + "github.com/filecoin-project/lily/chain/indexer/integrated/tipset" "github.com/filecoin-project/lily/chain/walk" "github.com/filecoin-project/lily/chain/watch" "github.com/filecoin-project/lily/lens" @@ -95,7 +96,7 @@ func (m *LilyNodeAPI) StartTipSetWorker(_ context.Context, cfg *LilyTipSetWorker return nil, err } - im, err := integrated.NewManager(taskAPI, strg, cfg.JobConfig.Name) + im, err := integrated.NewManager(strg, tipset.NewBuilder(taskAPI, cfg.JobConfig.Name)) if err != nil { return nil, err } @@ -139,7 +140,7 @@ func (m *LilyNodeAPI) LilyIndex(_ context.Context, cfg *LilyIndexConfig) (interf } // instantiate an indexer to extract block, message, and actor state data from observed tipsets and persists it to the storage. - im, err := integrated.NewManager(taskAPI, strg, cfg.JobConfig.Name, integrated.WithWindow(cfg.JobConfig.Window)) + im, err := integrated.NewManager(strg, tipset.NewBuilder(taskAPI, cfg.JobConfig.Name), integrated.WithWindow(cfg.JobConfig.Window)) if err != nil { return nil, err } @@ -203,7 +204,7 @@ func (m *LilyNodeAPI) LilyWatch(_ context.Context, cfg *LilyWatchConfig) (*sched } // instantiate an indexer to extract block, message, and actor state data from observed tipsets and persists it to the storage. - idx, err := integrated.NewManager(taskAPI, strg, cfg.JobConfig.Name, integrated.WithWindow(cfg.JobConfig.Window)) + idx, err := integrated.NewManager(strg, tipset.NewBuilder(taskAPI, cfg.JobConfig.Name), integrated.WithWindow(cfg.JobConfig.Window)) if err != nil { return nil, err } @@ -291,7 +292,7 @@ func (m *LilyNodeAPI) LilyWalk(_ context.Context, cfg *LilyWalkConfig) (*schedul } // instantiate an indexer to extract block, message, and actor state data from observed tipsets and persists it to the storage. - idx, err := integrated.NewManager(taskAPI, strg, cfg.JobConfig.Name, integrated.WithWindow(cfg.JobConfig.Window)) + idx, err := integrated.NewManager(strg, tipset.NewBuilder(taskAPI, cfg.JobConfig.Name), integrated.WithWindow(cfg.JobConfig.Window)) if err != nil { return nil, err } @@ -562,19 +563,19 @@ func (m *LilyNodeAPI) StateGetReceipt(ctx context.Context, msg cid.Cid, from typ return &ml.Receipt, nil } -func (m *LilyNodeAPI) LogList(ctx context.Context) ([]string, error) { +func (m *LilyNodeAPI) LogList(_ context.Context) ([]string, error) { return logging.GetSubsystems(), nil } -func (m *LilyNodeAPI) LogSetLevel(ctx context.Context, subsystem, level string) error { +func (m *LilyNodeAPI) LogSetLevel(_ context.Context, subsystem, level string) error { return logging.SetLogLevel(subsystem, level) } -func (m *LilyNodeAPI) LogSetLevelRegex(ctx context.Context, regex, level string) error { +func (m *LilyNodeAPI) LogSetLevelRegex(_ context.Context, regex, level string) error { return logging.SetLogLevelRegex(regex, level) } -func (m *LilyNodeAPI) Shutdown(ctx context.Context) error { +func (m *LilyNodeAPI) Shutdown(_ context.Context) error { m.ShutdownChan <- struct{}{} return nil } @@ -628,6 +629,6 @@ func (l *LogQueryHook) BeforeQuery(ctx context.Context, evt *pg.QueryEvent) (con return ctx, nil } -func (l *LogQueryHook) AfterQuery(ctx context.Context, event *pg.QueryEvent) error { +func (l *LogQueryHook) AfterQuery(_ context.Context, _ *pg.QueryEvent) error { return nil }