Skip to content

Commit

Permalink
Lily distributed worker pattern (#929)
Browse files Browse the repository at this point in the history
* feat: implement distributed indexer
- refactor indexer package to make boundaries clearer
- implement gap fill notifier to distribute work
- implement indexer option interface
- implement worker tracing across nodes
  • Loading branch information
frrist authored Apr 26, 2022
1 parent da4a88a commit a35ae36
Show file tree
Hide file tree
Showing 58 changed files with 2,447 additions and 1,049 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ actors-gen:

.PHONY: tasks-gen
tasks-gen:
go run ./chain/indexer/tablegen
go run ./chain/indexer/tasktype/tablegen
go fmt ./...

.PHONY: itest
Expand Down
5 changes: 1 addition & 4 deletions chain/tipset.go → chain/cache/tipset.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
package chain
package cache

import (
"context"
"errors"

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/types"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
)

var log = logging.Logger("lily/chain")

var (
ErrCacheEmpty = errors.New("cache empty")
ErrAddOutOfOrder = errors.New("added tipset height lower than current head")
Expand Down
2 changes: 1 addition & 1 deletion chain/tipset_test.go → chain/cache/tipset_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package chain
package cache

import (
"context"
Expand Down
2 changes: 1 addition & 1 deletion chain/export/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"gopkg.in/cheggaaa/pb.v1"
)

var log = logging.Logger("lily/chain")
var log = logging.Logger("lily/chain/export")

type ChainExporter struct {
store blockstore.Blockstore // blockstore chain is exported from
Expand Down
155 changes: 0 additions & 155 deletions chain/fill.go

This file was deleted.

98 changes: 98 additions & 0 deletions chain/gap/fill.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package gap

import (
"context"
"time"

"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/chain/types"
logging "github.com/ipfs/go-log/v2"

"github.com/filecoin-project/lily/chain/datasource"
"github.com/filecoin-project/lily/chain/indexer"
"github.com/filecoin-project/lily/chain/indexer/integrated"
"github.com/filecoin-project/lily/lens"
"github.com/filecoin-project/lily/storage"
)

var log = logging.Logger("lily/chain/gap")

type Filler struct {
DB *storage.Database
node lens.API
name string
minHeight, maxHeight uint64
tasks []string
done chan struct{}
}

func NewFiller(node lens.API, db *storage.Database, name string, minHeight, maxHeight uint64, tasks []string) *Filler {
return &Filler{
DB: db,
node: node,
name: name,
maxHeight: maxHeight,
minHeight: minHeight,
tasks: tasks,
}
}

func (g *Filler) Run(ctx context.Context) error {
// init the done channel for each run since jobs may be started and stopped.
g.done = make(chan struct{})
defer close(g.done)

gaps, heights, err := g.DB.ConsolidateGaps(ctx, g.minHeight, g.maxHeight, g.tasks...)
if err != nil {
return err
}
fillStart := time.Now()
log.Infow("gap fill start", "start", fillStart.String(), "total_epoch_gaps", len(gaps), "from", g.minHeight, "to", g.maxHeight, "task", g.tasks, "reporter", g.name)

taskAPI, err := datasource.NewDataSource(g.node)
if err != nil {
return err
}

index, err := integrated.NewManager(taskAPI, g.DB, g.name)
if err != nil {
return err
}

for _, height := range heights {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
runStart := time.Now()

log.Infow("filling gap", "height", heights, "reporter", g.name)

ts, err := g.node.ChainGetTipSetByHeight(ctx, abi.ChainEpoch(height), types.EmptyTSK)
if err != nil {
return err
}

log.Infof("got tipset for height %d, tipset height %d", heights, ts.Height())
if success, err := index.TipSet(ctx, ts, indexer.WithTasks(gaps[height])); err != nil {
log.Errorw("fill indexing encountered fatal error", "height", height, "tipset", ts.Key().String(), "error", err, "tasks", gaps[height], "reporter", g.name)
return err
} else if !success {
log.Errorw("fill indexing failed to successfully index tipset, skipping fill for tipset, gap remains", "height", height, "tipset", ts.Key().String(), "tasks", gaps[height], "reporter", g.name)
continue
}
log.Infow("fill success", "epoch", ts.Height(), "tasks_filled", gaps[height], "duration", time.Since(runStart), "reporter", g.name)

if err := g.DB.SetGapsFilled(ctx, height, gaps[height]...); err != nil {
return err
}
}
log.Infow("gap fill complete", "duration", time.Since(fillStart), "total_epoch_gaps", len(gaps), "from", g.minHeight, "to", g.maxHeight, "task", g.tasks, "reporter", g.name)

return nil
}

func (g *Filler) Done() <-chan struct{} {
return g.done
}
14 changes: 7 additions & 7 deletions chain/find.go → chain/gap/find.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package chain
package gap

import (
"context"
Expand All @@ -12,7 +12,7 @@ import (
"github.com/filecoin-project/lily/storage"
)

type GapIndexer struct {
type Finder struct {
DB *storage.Database
node lens.API
name string
Expand All @@ -21,8 +21,8 @@ type GapIndexer struct {
done chan struct{}
}

func NewGapIndexer(node lens.API, db *storage.Database, name string, minHeight, maxHeight uint64, tasks []string) *GapIndexer {
return &GapIndexer{
func NewFinder(node lens.API, db *storage.Database, name string, minHeight, maxHeight uint64, tasks []string) *Finder {
return &Finder{
DB: db,
node: node,
name: name,
Expand All @@ -38,7 +38,7 @@ type TaskHeight struct {
Status string
}

func (g *GapIndexer) Find(ctx context.Context) (visor.GapReportList, error) {
func (g *Finder) Find(ctx context.Context) (visor.GapReportList, error) {
log.Debug("finding task epoch gaps")
start := time.Now()
var result []TaskHeight
Expand Down Expand Up @@ -80,7 +80,7 @@ SELECT * FROM gap_find(?,?,?,?,?);
return out, nil
}

func (g *GapIndexer) Run(ctx context.Context) error {
func (g *Finder) Run(ctx context.Context) error {
// init the done channel for each run since jobs may be started and stopped.
g.done = make(chan struct{})
defer close(g.done)
Expand All @@ -101,6 +101,6 @@ func (g *GapIndexer) Run(ctx context.Context) error {
return g.DB.PersistBatch(ctx, gaps)
}

func (g *GapIndexer) Done() <-chan struct{} {
func (g *Finder) Done() <-chan struct{} {
return g.done
}
Loading

0 comments on commit a35ae36

Please sign in to comment.