Skip to content

Commit

Permalink
[command] add currentheight to job list (#1131)
Browse files Browse the repository at this point in the history
* Add currentHeight for walk, watch and fill.

* Add schedule.Reporter

---------

Co-authored-by: Terry <[email protected]>
  • Loading branch information
Terryhung and Terry authored Feb 14, 2023
1 parent 582edce commit 10608e8
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 31 deletions.
7 changes: 5 additions & 2 deletions chain/gap/fill.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/filecoin-project/lily/chain/indexer/integrated"
"github.com/filecoin-project/lily/chain/indexer/integrated/tipset"
"github.com/filecoin-project/lily/lens"
"github.com/filecoin-project/lily/schedule"
"github.com/filecoin-project/lily/storage"
)

Expand All @@ -25,16 +26,18 @@ type Filler struct {
minHeight, maxHeight int64
tasks []string
done chan struct{}
report *schedule.Reporter
}

func NewFiller(node lens.API, db *storage.Database, name string, minHeight, maxHeight int64, tasks []string) *Filler {
func NewFiller(node lens.API, db *storage.Database, name string, minHeight, maxHeight int64, tasks []string, r *schedule.Reporter) *Filler {
return &Filler{
DB: db,
node: node,
name: name,
maxHeight: maxHeight,
minHeight: minHeight,
tasks: tasks,
report: r,
}
}

Expand Down Expand Up @@ -69,7 +72,7 @@ func (g *Filler) Run(ctx context.Context) error {
runStart := time.Now()

log.Infow("filling gap", "height", heights, "reporter", g.name)

g.report.UpdateCurrentHeight(height)
ts, err := g.node.ChainGetTipSetByHeight(ctx, abi.ChainEpoch(height), types.EmptyTSK)
if err != nil {
return err
Expand Down
6 changes: 5 additions & 1 deletion chain/walk/walker.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,20 @@ import (

"github.com/filecoin-project/lily/chain/indexer"
"github.com/filecoin-project/lily/lens"
"github.com/filecoin-project/lily/schedule"
)

var log = logging.Logger("lily/chain/walk")

func NewWalker(obs indexer.Indexer, node lens.API, name string, tasks []string, minHeight, maxHeight int64) *Walker {
func NewWalker(obs indexer.Indexer, node lens.API, name string, tasks []string, minHeight, maxHeight int64, r *schedule.Reporter) *Walker {
return &Walker{
node: node,
obs: obs,
name: name,
tasks: tasks,
minHeight: minHeight,
maxHeight: maxHeight,
report: r,
}
}

Expand All @@ -36,6 +38,7 @@ type Walker struct {
minHeight int64 // limit persisting to tipsets equal to or above this height
maxHeight int64 // limit persisting to tipsets equal to or below this height}
done chan struct{}
report *schedule.Reporter
}

// Run starts walking the chain history and continues until the context is done or
Expand Down Expand Up @@ -96,6 +99,7 @@ func (c *Walker) WalkChain(ctx context.Context, node lens.API, ts *types.TipSet)
default:
}
log.Infow("walk tipset", "height", ts.Height(), "reporter", c.name)
c.report.UpdateCurrentHeight(int64(ts.Height()))
if success, err := c.obs.TipSet(ctx, ts, indexer.WithIndexerType(indexer.Walk), indexer.WithTasks(c.tasks)); err != nil {
span.RecordError(err)
return fmt.Errorf("notify tipset: %w", err)
Expand Down
4 changes: 3 additions & 1 deletion chain/walk/walker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/filecoin-project/lily/chain/indexer/integrated/tipset"
"github.com/filecoin-project/lily/chain/indexer/tasktype"
"github.com/filecoin-project/lily/model/blocks"
"github.com/filecoin-project/lily/schedule"
"github.com/filecoin-project/lily/storage"
"github.com/filecoin-project/lily/testutil"
)
Expand Down Expand Up @@ -54,7 +55,8 @@ func TestWalker(t *testing.T) {
require.NoError(t, err, "NewManager")

t.Logf("initializing indexer")
idx := NewWalker(im, nodeAPI, t.Name(), []string{tasktype.BlocksTask}, 0, int64(head.Height()))
reporter := &schedule.Reporter{}
idx := NewWalker(im, nodeAPI, t.Name(), []string{tasktype.BlocksTask}, 0, int64(head.Height()), reporter)

t.Logf("indexing chain")
err = idx.WalkChain(ctx, nodeAPI, head)
Expand Down
7 changes: 5 additions & 2 deletions chain/watch/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/filecoin-project/lily/chain/indexer"
"github.com/filecoin-project/lily/chain/indexer/tasktype"
"github.com/filecoin-project/lily/metrics"
"github.com/filecoin-project/lily/schedule"
)

var log = logging.Logger("lily/chain/watch")
Expand Down Expand Up @@ -75,6 +76,7 @@ type Watcher struct {

// metric tracking
active int64 // must be accessed using atomic operations, updated automatically.
report *schedule.Reporter

// error handling
fatalMu sync.Mutex
Expand All @@ -91,7 +93,7 @@ var (
// NewWatcher creates a new Watcher. confidence sets the number of tipsets that will be held
// in a cache awaiting possible reversion. Tipsets will be written to the database when they are evicted from
// the cache due to incoming later tipsets.
func NewWatcher(api WatcherAPI, indexer indexer.Indexer, name string, opts ...WatcherOpt) *Watcher {
func NewWatcher(api WatcherAPI, indexer indexer.Indexer, name string, r *schedule.Reporter, opts ...WatcherOpt) *Watcher {
w := &Watcher{
api: api,
name: name,
Expand All @@ -101,6 +103,7 @@ func NewWatcher(api WatcherAPI, indexer indexer.Indexer, name string, opts ...Wa
confidence: WatcherDefaultConfidence,
poolSize: WatcherDefaultConcurrentWorkers,
tasks: WatcherDefaultTasks,
report: r,
}

for _, opt := range opts {
Expand Down Expand Up @@ -235,7 +238,7 @@ func (c *Watcher) indexTipSetAsync(ctx context.Context, ts *types.TipSet) error
log.Warnw("queuing worker in watcher pool", "waiting", c.pool.WaitingQueueSize(), "reporter", c.name)
}
log.Infow("submitting tipset for async indexing", "height", ts.Height(), "active", c.active, "reporter", c.name)

c.report.UpdateCurrentHeight(int64(ts.Height()))
ctx, span := otel.Tracer("").Start(ctx, "Watcher.indexTipSetAsync")
c.pool.Submit(func() {
atomic.AddInt64(&c.active, 1)
Expand Down
4 changes: 3 additions & 1 deletion chain/watch/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/filecoin-project/lily/chain/indexer/integrated/tipset"
"github.com/filecoin-project/lily/chain/indexer/tasktype"
"github.com/filecoin-project/lily/model/blocks"
"github.com/filecoin-project/lily/schedule"
"github.com/filecoin-project/lily/storage"
"github.com/filecoin-project/lily/testutil"
)
Expand Down Expand Up @@ -81,7 +82,8 @@ func TestWatcher(t *testing.T) {
im, err := integrated.NewManager(strg, tipset.NewBuilder(taskAPI, t.Name()), integrated.WithWindow(builtin.EpochDurationSeconds*time.Second))
require.NoError(t, err, "NewManager")
t.Logf("initializing indexer")
idx := NewWatcher(nil, im, t.Name(), WithConfidence(0), WithConcurrentWorkers(1), WithBufferSize(5), WithTasks(tasktype.BlocksTask))
reporter := &schedule.Reporter{}
idx := NewWatcher(nil, im, t.Name(), reporter, WithConfidence(0), WithConcurrentWorkers(1), WithBufferSize(5), WithTasks(tasktype.BlocksTask))
idx.cache = cache.NewTipSetCache(0)
// the watchers worker pool and cache are initialized in its Run method, since we don't call that here initialize them now.
idx.pool = workerpool.New(1)
Expand Down
54 changes: 32 additions & 22 deletions lens/lily/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,14 +220,15 @@ func (m *LilyNodeAPI) LilyWatch(_ context.Context, cfg *LilyWatchConfig) (*sched
return nil, err
}

reporter := &schedule.Reporter{}
watchJob := watch.NewWatcher(wapi, idx, cfg.JobConfig.Name,
reporter,
watch.WithTasks(cfg.JobConfig.Tasks...),
watch.WithConfidence(cfg.Confidence),
watch.WithConcurrentWorkers(cfg.Workers),
watch.WithBufferSize(cfg.BufferSize),
)

res := m.Scheduler.Submit(&schedule.JobConfig{
jobConfig := &schedule.JobConfig{
Name: cfg.JobConfig.Name,
Type: "watch",
Params: map[string]string{
Expand All @@ -238,12 +239,14 @@ func (m *LilyNodeAPI) LilyWatch(_ context.Context, cfg *LilyWatchConfig) (*sched
"buffer": strconv.Itoa(cfg.BufferSize),
},
Tasks: cfg.JobConfig.Tasks,
Job: watchJob,
RestartOnFailure: cfg.JobConfig.RestartOnFailure,
RestartOnCompletion: cfg.JobConfig.RestartOnCompletion,
RestartDelay: cfg.JobConfig.RestartDelay,
})
Job: watchJob,
Reporter: reporter,
}

res := m.Scheduler.Submit(jobConfig)
return res, nil
}

Expand All @@ -258,14 +261,14 @@ func (m *LilyNodeAPI) LilyWatchNotify(_ context.Context, cfg *LilyWatchNotifyCon
return nil, err
}
idx := distributed.NewTipSetIndexer(queue.NewAsynq(notifier))

reporter := &schedule.Reporter{}
watchJob := watch.NewWatcher(wapi, idx, cfg.JobConfig.Name,
reporter,
watch.WithTasks(cfg.JobConfig.Tasks...),
watch.WithConfidence(cfg.Confidence),
watch.WithBufferSize(cfg.BufferSize),
)

res := m.Scheduler.Submit(&schedule.JobConfig{
jobConfig := &schedule.JobConfig{
Name: cfg.JobConfig.Name,
Type: "watch-notify",
Params: map[string]string{
Expand All @@ -274,12 +277,13 @@ func (m *LilyNodeAPI) LilyWatchNotify(_ context.Context, cfg *LilyWatchNotifyCon
"queue": cfg.Queue,
},
Tasks: cfg.JobConfig.Tasks,
Job: watchJob,
RestartOnFailure: cfg.JobConfig.RestartOnFailure,
RestartOnCompletion: cfg.JobConfig.RestartOnCompletion,
RestartDelay: cfg.JobConfig.RestartDelay,
})

Job: watchJob,
Reporter: reporter,
}
res := m.Scheduler.Submit(jobConfig)
return res, err
}

Expand Down Expand Up @@ -308,7 +312,8 @@ func (m *LilyNodeAPI) LilyWalk(_ context.Context, cfg *LilyWalkConfig) (*schedul
return nil, err
}

res := m.Scheduler.Submit(&schedule.JobConfig{
reporter := &schedule.Reporter{}
jobConfig := &schedule.JobConfig{
Name: cfg.JobConfig.Name,
Type: "walk",
Params: map[string]string{
Expand All @@ -318,12 +323,14 @@ func (m *LilyNodeAPI) LilyWalk(_ context.Context, cfg *LilyWalkConfig) (*schedul
"storage": cfg.JobConfig.Storage,
},
Tasks: cfg.JobConfig.Tasks,
Job: walk.NewWalker(idx, m, cfg.JobConfig.Name, cfg.JobConfig.Tasks, cfg.From, cfg.To),
RestartOnFailure: cfg.JobConfig.RestartOnFailure,
RestartOnCompletion: cfg.JobConfig.RestartOnCompletion,
RestartDelay: cfg.JobConfig.RestartDelay,
})
Job: walk.NewWalker(idx, m, cfg.JobConfig.Name, cfg.JobConfig.Tasks, cfg.From, cfg.To, reporter),
Reporter: reporter,
}

res := m.Scheduler.Submit(jobConfig)
return res, nil
}

Expand All @@ -334,7 +341,8 @@ func (m *LilyNodeAPI) LilyWalkNotify(_ context.Context, cfg *LilyWalkNotifyConfi
}
idx := distributed.NewTipSetIndexer(queue.NewAsynq(notifier))

res := m.Scheduler.Submit(&schedule.JobConfig{
reporter := &schedule.Reporter{}
jobConfig := &schedule.JobConfig{
Name: cfg.WalkConfig.JobConfig.Name,
Type: "walk-notify",
Params: map[string]string{
Expand All @@ -343,12 +351,13 @@ func (m *LilyNodeAPI) LilyWalkNotify(_ context.Context, cfg *LilyWalkNotifyConfi
"queue": cfg.Queue,
},
Tasks: cfg.WalkConfig.JobConfig.Tasks,
Job: walk.NewWalker(idx, m, cfg.WalkConfig.JobConfig.Name, cfg.WalkConfig.JobConfig.Tasks, cfg.WalkConfig.From, cfg.WalkConfig.To),
RestartOnFailure: cfg.WalkConfig.JobConfig.RestartOnFailure,
RestartOnCompletion: cfg.WalkConfig.JobConfig.RestartOnCompletion,
RestartDelay: cfg.WalkConfig.JobConfig.RestartDelay,
})

Job: walk.NewWalker(idx, m, cfg.WalkConfig.JobConfig.Name, cfg.WalkConfig.JobConfig.Tasks, cfg.WalkConfig.From, cfg.WalkConfig.To, reporter),
Reporter: reporter,
}
res := m.Scheduler.Submit(jobConfig)
return res, nil
}

Expand Down Expand Up @@ -397,8 +406,8 @@ func (m *LilyNodeAPI) LilyGapFill(_ context.Context, cfg *LilyGapFillConfig) (*s
if err != nil {
return nil, err
}

res := m.Scheduler.Submit(&schedule.JobConfig{
reporter := &schedule.Reporter{}
jobConfig := &schedule.JobConfig{
Name: cfg.JobConfig.Name,
Type: "fill",
Params: map[string]string{
Expand All @@ -407,12 +416,13 @@ func (m *LilyNodeAPI) LilyGapFill(_ context.Context, cfg *LilyGapFillConfig) (*s
"storage": cfg.JobConfig.Storage,
},
Tasks: cfg.JobConfig.Tasks,
Job: gap.NewFiller(m, db, cfg.JobConfig.Name, cfg.From, cfg.To, cfg.JobConfig.Tasks),
RestartOnFailure: cfg.JobConfig.RestartOnFailure,
RestartOnCompletion: cfg.JobConfig.RestartOnCompletion,
RestartDelay: cfg.JobConfig.RestartDelay,
})

Reporter: reporter,
Job: gap.NewFiller(m, db, cfg.JobConfig.Name, cfg.From, cfg.To, cfg.JobConfig.Tasks, reporter),
}
res := m.Scheduler.Submit(jobConfig)
return res, nil
}

Expand Down
20 changes: 18 additions & 2 deletions schedule/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ type JobConfig struct {

log *zap.SugaredLogger

// Reporter is a job report
Reporter *Reporter

// Name is a human readable name for the job for use in logging
Name string

Expand Down Expand Up @@ -79,6 +82,15 @@ type JobConfig struct {
EndedAt time.Time
}

type Reporter struct {
// Current Height is the current height of the job
CurrentHeight int64
}

func (r *Reporter) UpdateCurrentHeight(height int64) {
r.CurrentHeight = height
}

// Locker represents a general lock that a job may need to take before operating.
type Locker interface {
Lock(context.Context) error
Expand Down Expand Up @@ -347,6 +359,8 @@ type JobListResult struct {
Params map[string]string
StartedAt time.Time
EndedAt time.Time

Report *Reporter
}

var InvalidJobID = JobID(0)
Expand All @@ -363,7 +377,7 @@ func (s *Scheduler) Jobs() []JobListResult {
var out []JobListResult
for _, j := range s.jobs {
j.lk.Lock()
out = append(out, JobListResult{
result := JobListResult{
ID: j.id,
Name: j.Name,
Tasks: j.Tasks,
Expand All @@ -376,7 +390,9 @@ func (s *Scheduler) Jobs() []JobListResult {
Params: j.Params,
StartedAt: j.StartedAt,
EndedAt: j.EndedAt,
})
Report: j.Reporter,
}
out = append(out, result)
j.lk.Unlock()
}
sort.Slice(out, func(i, j int) bool {
Expand Down

0 comments on commit 10608e8

Please sign in to comment.