Skip to content

Commit

Permalink
address feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
frrist committed Jun 16, 2022
1 parent 72c7ff3 commit e9515ea
Show file tree
Hide file tree
Showing 9 changed files with 41 additions and 30 deletions.
2 changes: 1 addition & 1 deletion chain/gap/fill.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func (g *Filler) Run(ctx context.Context) error {
return err
}

index, err := integrated.NewManager(g.DB, tipset.NewBuilder(taskAPI, g.name), g.name)
index, err := integrated.NewManager(g.DB, tipset.NewBuilder(taskAPI, g.name))
if err != nil {
return err
}
Expand Down
3 changes: 3 additions & 0 deletions chain/indexer/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type ModelResult struct {
// - if data with height N and SR1 is being persisted and a request to persist data with the same values is made, allow it
// - if data with height N and SR2 is being persisted and a request to persist data with height N and SR1 is made, block
func (me *ModelExporter) ExportResult(ctx context.Context, strg model.Storage, height int64, results []*ModelResult) error {
if len(results) == 0 {
return nil
}
// lock exporting based on height only allowing a single height to be persisted simultaneously
heightKey := strconv.FormatInt(height, 10)
me.heightKeyMu.LockKey(heightKey)
Expand Down
14 changes: 6 additions & 8 deletions chain/indexer/integrated/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ func WithExporter(e Exporter) ManagerOpt {
}

// NewManager returns a default Manager. Any provided ManagerOpt's will override Manager's default values.
func NewManager(strg model.Storage, idxBuilder tipset.IndexerBuilder, name string, opts ...ManagerOpt) (*Manager, error) {
func NewManager(strg model.Storage, idxBuilder tipset.IndexerBuilder, opts ...ManagerOpt) (*Manager, error) {
im := &Manager{
storage: strg,
window: 0,
name: name,
name: idxBuilder.Name(),
}

for _, opt := range opts {
Expand All @@ -63,7 +63,7 @@ func NewManager(strg model.Storage, idxBuilder tipset.IndexerBuilder, name strin
im.indexBuilder = idxBuilder

if im.exporter == nil {
im.exporter = indexer.NewModelExporter(name)
im.exporter = indexer.NewModelExporter(idxBuilder.Name())
}
return im, nil
}
Expand Down Expand Up @@ -132,11 +132,9 @@ func (i *Manager) TipSet(ctx context.Context, ts *types.TipSet, options ...index

}

if len(modelResults) > 0 {
// synchronously export extracted data and its report. If datas at this height are currently being persisted this method will block to avoid deadlocking the database.
if err := i.exporter.ExportResult(ctx, i.storage, int64(ts.Height()), modelResults); err != nil {
return err
}
// synchronously export extracted data and its report. If datas at this height are currently being persisted this method will block to avoid deadlocking the database.
if err := i.exporter.ExportResult(ctx, i.storage, int64(ts.Height()), modelResults); err != nil {
return err
}

return nil
Expand Down
31 changes: 16 additions & 15 deletions chain/indexer/integrated/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
visormodel "github.com/filecoin-project/lily/model/visor"
)

func TestManagerStatusOK(t *testing.T) {
func TestManagerWithResultStatusOK(t *testing.T) {
ctx := context.Background()

// mock index builder and mock indexer
Expand All @@ -29,7 +29,7 @@ func TestManagerStatusOK(t *testing.T) {
mExporter := new(test.MockExporter)

// create new index manager with mocks values
manager, err := NewManager(fStorage, mIdxBuilder, t.Name(), WithExporter(mExporter))
manager, err := NewManager(fStorage, mIdxBuilder, WithExporter(mExporter))
require.NoError(t, err)

// a fake tipset to index
Expand Down Expand Up @@ -81,7 +81,7 @@ func TestManagerStatusOK(t *testing.T) {
require.True(t, success)
}

func TestManagerStatusInfo(t *testing.T) {
func TestManagerWithResultStatusInfo(t *testing.T) {
ctx := context.Background()

// mock index builder and mock indexer
Expand All @@ -94,7 +94,7 @@ func TestManagerStatusInfo(t *testing.T) {
mExporter := new(test.MockExporter)

// create new index manager with mocks values
manager, err := NewManager(fStorage, mIdxBuilder, t.Name(), WithExporter(mExporter))
manager, err := NewManager(fStorage, mIdxBuilder, WithExporter(mExporter))
require.NoError(t, err)

// a fake tipset to index
Expand Down Expand Up @@ -146,7 +146,7 @@ func TestManagerStatusInfo(t *testing.T) {
require.True(t, success)
}

func TestManagerStatusOKAndError(t *testing.T) {
func TestManagerWithResultStatusContainOKAndError(t *testing.T) {
ctx := context.Background()

// mock index builder and mock indexer
Expand All @@ -159,7 +159,7 @@ func TestManagerStatusOKAndError(t *testing.T) {
mExporter := new(test.MockExporter)

// create new index manager with mocks values
manager, err := NewManager(fStorage, mIdxBuilder, t.Name(), WithExporter(mExporter))
manager, err := NewManager(fStorage, mIdxBuilder, WithExporter(mExporter))
require.NoError(t, err)

// a fake tipset to index
Expand Down Expand Up @@ -236,7 +236,7 @@ func TestManagerStatusOKAndError(t *testing.T) {

}

func TestManagerStatusOKAndSkip(t *testing.T) {
func TestManagerWithResultStatusContainOKAndSkip(t *testing.T) {
ctx := context.Background()

// mock index builder and mock indexer
Expand All @@ -249,7 +249,7 @@ func TestManagerStatusOKAndSkip(t *testing.T) {
mExporter := new(test.MockExporter)

// create new index manager with mocks values
manager, err := NewManager(fStorage, mIdxBuilder, t.Name(), WithExporter(mExporter))
manager, err := NewManager(fStorage, mIdxBuilder, WithExporter(mExporter))
require.NoError(t, err)

// a fake tipset to index
Expand Down Expand Up @@ -326,7 +326,7 @@ func TestManagerStatusOKAndSkip(t *testing.T) {

}

func TestManagerStatusError(t *testing.T) {
func TestManagerWithResultStatusError(t *testing.T) {
ctx := context.Background()

// mock index builder and mock indexer
Expand All @@ -339,7 +339,7 @@ func TestManagerStatusError(t *testing.T) {
mExporter := new(test.MockExporter)

// create new index manager with mocks values
manager, err := NewManager(fStorage, mIdxBuilder, t.Name(), WithExporter(mExporter))
manager, err := NewManager(fStorage, mIdxBuilder, WithExporter(mExporter))
require.NoError(t, err)

// a fake tipset to index
Expand Down Expand Up @@ -391,7 +391,7 @@ func TestManagerStatusError(t *testing.T) {
require.False(t, success)
}

func TestManagerStatusSkip(t *testing.T) {
func TestManagerWithResultStatusSkip(t *testing.T) {
ctx := context.Background()

// mock index builder and mock indexer
Expand All @@ -404,7 +404,7 @@ func TestManagerStatusSkip(t *testing.T) {
mExporter := new(test.MockExporter)

// create new index manager with mocks values
manager, err := NewManager(fStorage, mIdxBuilder, t.Name(), WithExporter(mExporter))
manager, err := NewManager(fStorage, mIdxBuilder, WithExporter(mExporter))
require.NoError(t, err)

// a fake tipset to index
Expand Down Expand Up @@ -469,7 +469,7 @@ func TestManagerFatalError(t *testing.T) {
mExporter := new(test.MockExporter)

// create new index manager with mocks values
manager, err := NewManager(fStorage, mIdxBuilder, t.Name(), WithExporter(mExporter))
manager, err := NewManager(fStorage, mIdxBuilder, WithExporter(mExporter))
require.NoError(t, err)

// a fake tipset to index
Expand All @@ -482,6 +482,7 @@ func TestManagerFatalError(t *testing.T) {

// expect index manager to pass MockedIndexer anything (ctx) and the tipset, returning the channels created above.
mIdxBuilder.MockIndexer.On("TipSet", mock.Anything, ts1).Return(resChan, errChan, nil)
mExporter.On("ExportResult", mock.Anything, fStorage, tsHeight, []*indexer.ModelResult(nil)).Return(nil)

// send a fatal error to the index manager
errChan <- fmt.Errorf("fatal error")
Expand All @@ -493,7 +494,7 @@ func TestManagerFatalError(t *testing.T) {
require.False(t, success)
}

func TestManagerFatalErrorAndOkReport(t *testing.T) {
func TestManagerFatalErrorAndResultStatusOk(t *testing.T) {
ctx := context.Background()

// mock index builder and mock indexer
Expand All @@ -506,7 +507,7 @@ func TestManagerFatalErrorAndOkReport(t *testing.T) {
mExporter := new(test.MockExporter)

// create new index manager with mocks values
manager, err := NewManager(fStorage, mIdxBuilder, t.Name(), WithExporter(mExporter))
manager, err := NewManager(fStorage, mIdxBuilder, WithExporter(mExporter))
require.NoError(t, err)

// a fake tipset to index
Expand Down
4 changes: 4 additions & 0 deletions chain/indexer/integrated/testing/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ type MockIndexBuilder struct {
mock.Mock
}

func (t *MockIndexBuilder) Name() string {
return "mockindexbuilder"
}

func (t *MockIndexBuilder) WithTasks(tasks []string) tipset.IndexerBuilder {
return t
}
Expand Down
5 changes: 5 additions & 0 deletions chain/indexer/integrated/tipset/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
type IndexerBuilder interface {
WithTasks(tasks []string) IndexerBuilder
Build() (Indexer, error)
Name() string
}

type Indexer interface {
Expand All @@ -29,6 +30,10 @@ type Builder struct {
name string
}

func (b *Builder) Name() string {
return b.name
}

func (b *Builder) add(cb func(ti *TipSetIndexer)) {
b.options = append(b.options, cb)
}
Expand Down
2 changes: 1 addition & 1 deletion chain/walk/walker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func TestWalker(t *testing.T) {
taskAPI, err := datasource.NewDataSource(nodeAPI)
require.NoError(t, err)

im, err := integrated.NewManager(strg, tipset.NewBuilder(taskAPI, t.Name()), t.Name(), integrated.WithWindow(builtin.EpochDurationSeconds*time.Second))
im, err := integrated.NewManager(strg, tipset.NewBuilder(taskAPI, t.Name()), integrated.WithWindow(builtin.EpochDurationSeconds*time.Second))
require.NoError(t, err, "NewManager")

t.Logf("initializing indexer")
Expand Down
2 changes: 1 addition & 1 deletion chain/watch/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestWatcher(t *testing.T) {

taskAPI, err := datasource.NewDataSource(nodeAPI)
require.NoError(t, err)
im, err := integrated.NewManager(strg, tipset.NewBuilder(taskAPI, t.Name()), t.Name(), integrated.WithWindow(builtin.EpochDurationSeconds*time.Second))
im, err := integrated.NewManager(strg, tipset.NewBuilder(taskAPI, t.Name()), integrated.WithWindow(builtin.EpochDurationSeconds*time.Second))
require.NoError(t, err, "NewManager")
t.Logf("initializing indexer")
idx := NewWatcher(nil, im, t.Name(), WithConfidence(0), WithConcurrentWorkers(1), WithBufferSize(5), WithTasks(tasktype.BlocksTask))
Expand Down
8 changes: 4 additions & 4 deletions lens/lily/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (m *LilyNodeAPI) StartTipSetWorker(_ context.Context, cfg *LilyTipSetWorker
return nil, err
}

im, err := integrated.NewManager(strg, tipset.NewBuilder(taskAPI, cfg.JobConfig.Name), cfg.JobConfig.Name)
im, err := integrated.NewManager(strg, tipset.NewBuilder(taskAPI, cfg.JobConfig.Name))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -140,7 +140,7 @@ func (m *LilyNodeAPI) LilyIndex(_ context.Context, cfg *LilyIndexConfig) (interf
}

// instantiate an indexer to extract block, message, and actor state data from observed tipsets and persists it to the storage.
im, err := integrated.NewManager(strg, tipset.NewBuilder(taskAPI, cfg.JobConfig.Name), cfg.JobConfig.Name, integrated.WithWindow(cfg.JobConfig.Window))
im, err := integrated.NewManager(strg, tipset.NewBuilder(taskAPI, cfg.JobConfig.Name), integrated.WithWindow(cfg.JobConfig.Window))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -204,7 +204,7 @@ func (m *LilyNodeAPI) LilyWatch(_ context.Context, cfg *LilyWatchConfig) (*sched
}

// instantiate an indexer to extract block, message, and actor state data from observed tipsets and persists it to the storage.
idx, err := integrated.NewManager(strg, tipset.NewBuilder(taskAPI, cfg.JobConfig.Name), cfg.JobConfig.Name, integrated.WithWindow(cfg.JobConfig.Window))
idx, err := integrated.NewManager(strg, tipset.NewBuilder(taskAPI, cfg.JobConfig.Name), integrated.WithWindow(cfg.JobConfig.Window))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -292,7 +292,7 @@ func (m *LilyNodeAPI) LilyWalk(_ context.Context, cfg *LilyWalkConfig) (*schedul
}

// instantiate an indexer to extract block, message, and actor state data from observed tipsets and persists it to the storage.
idx, err := integrated.NewManager(strg, tipset.NewBuilder(taskAPI, cfg.JobConfig.Name), cfg.JobConfig.Name, integrated.WithWindow(cfg.JobConfig.Window))
idx, err := integrated.NewManager(strg, tipset.NewBuilder(taskAPI, cfg.JobConfig.Name), integrated.WithWindow(cfg.JobConfig.Window))
if err != nil {
return nil, err
}
Expand Down

0 comments on commit e9515ea

Please sign in to comment.