Skip to content

Commit

Permalink
fix(lily): Consider partially completed heights w ERRORs and SKIPs (#791
Browse files Browse the repository at this point in the history
)

* fix(gapfind): Find considers partially completed heights w ERRORs

* fix(gapfind): Find will ignore SKIPs which also have OKs

* fix(find): Find recognizes all heights w ERRORs as gaps

* chore(find): Simplify query, document, and remove redundancy

* chore: gofmt and sql bug fix

Co-authored-by: Mike Greenberg <[email protected]>
  • Loading branch information
placer14 and placer14 authored Dec 2, 2021
1 parent d21a706 commit e2bb246
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 26 deletions.
71 changes: 45 additions & 26 deletions chain/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"time"

"github.com/deckarep/golang-set"
mapset "github.com/deckarep/golang-set"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lily/lens"
"github.com/filecoin-project/lily/model/visor"
Expand Down Expand Up @@ -151,9 +151,10 @@ func (g *GapIndexer) findEpochGapsAndNullRounds(ctx context.Context, node GapInd
`
SELECT s.i AS missing_epoch
FROM generate_series(?, ?) s(i)
WHERE NOT EXISTS (SELECT 1 FROM visor_processing_reports WHERE height = s.i);
WHERE NOT EXISTS (SELECT 1 FROM visor_processing_reports WHERE height = s.i AND status = ?)
;
`,
g.minHeight, g.maxHeight)
g.minHeight, g.maxHeight, visor.ProcessingStatusOK)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -213,31 +214,49 @@ func (g *GapIndexer) findTaskEpochGaps(ctx context.Context) (visor.GapReportList
&result,
`
select height, task
from (
select distinct vpr.task, vpr.height, vpr.status_information
from visor_processing_reports vpr
right join(
select height
from (
select task_height_count.height, count(task_height_count.height) cheight
from (
select distinct(task) as task, height
from visor_processing_reports
group by height, task
order by height desc
) task_height_count
group by task_height_count.height
) task_count_per_height
where task_count_per_height.cheight != ?
) incomplete
on vpr.height = incomplete.height
where vpr.status_information != ?
or vpr.status_information is null
) incomplete_heights_and_their_completed_tasks
where height >= ? and height <= ?
from (
-- incomplete_heights_and_their_completed_tasks: for every incomplete
-- heightin incomplete_task_counts we will take all processing_reports
-- with the same height and return height,tasks which are 'OK'
-- which provides all incomplete heights and their completed tasks
select distinct vpr.task, vpr.height
from visor_processing_reports vpr
right join(
-- incomplete_task_counts: count of all tasks by height,status
-- and filter for status = 'OK' and all task counts which
-- don't equal len(AllTasks)
select incomplete_task_counts.height, status, count(incomplete_task_counts.height) cheight
from (
-- all unique height,tasks,status within range of interest
select task, height, status
from visor_processing_reports
where height >= ?2 and height <= ?3
group by height, task, status
order by height desc, task
) incomplete_task_counts
where status = ?4
group by incomplete_task_counts.height, incomplete_task_counts.status
having count(incomplete_task_counts.height) != ?0
) incomplete_heights
on vpr.height = incomplete_heights.height
where (vpr.status_information != ?1
or vpr.status_information is null)
and vpr.status = ?4
) incomplete_heights_and_their_completed_tasks
order by height desc
;
`,
len(AllTasks), visor.ProcessingStatusInformationNullRound, g.minHeight, g.maxHeight,
len(AllTasks), // arg 0
visor.ProcessingStatusInformationNullRound, // arg 1
g.minHeight, // arg 2
g.maxHeight, // arg 3
visor.ProcessingStatusOK, // arg 4
)
if err != nil {
return nil, err
Expand Down
68 changes: 68 additions & 0 deletions chain/find_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ func TestFind(t *testing.T) {
require.NoError(t, err)

expected := makeGapReportList(tsh2, ActorStatesMinerTask, ActorStatesInitTask)
expected = append(expected, makeGapReportList(fakeTipset(t, 8), ActorStatesMinerTask)...)
assertGapReportsEqual(t, expected, actual)
})

Expand Down Expand Up @@ -319,6 +320,62 @@ func TestFind(t *testing.T) {
expected := makeGapReportList(tsh2, MessagesTask, ActorStatesInitTask)
assertGapReportsEqual(t, expected, actual)
})

t.Run("(#775) for each task at epoch 2 there exists an ERROR", func(t *testing.T) {
truncateVPR(t, db)
initializeVPR(t, db, maxHeight, t.Name(), AllTasks...)
errorEpochTasksVPR(t, db, 2, AllTasks...)

strg, err := storage.NewDatabaseFromDB(ctx, db, "public")
require.NoError(t, err, "NewDatabaseFromDB")

tsh2 := fakeTipset(t, 2)
mlens := new(MockedFindLens)
mlens.On("ChainGetTipSetByHeight", mock.Anything, tsh2.Height(), types.EmptyTSK).
Return(tsh2, nil)

actual, _, err := NewGapIndexer(nil, strg, t.Name(), minHeight, maxHeight, AllTasks).
findEpochGapsAndNullRounds(ctx, mlens)
require.NoError(t, err)

expected := makeGapReportList(tsh2, AllTasks...)
assertGapReportsEqual(t, expected, actual)
})

t.Run("(#775) for each task at epoch 2 there exists an ERROR _and_ an OK on some tasks", func(t *testing.T) {
truncateVPR(t, db)
initializeVPR(t, db, maxHeight, t.Name(), AllTasks...)
// error on some tasks
errorEpochTasksVPR(t, db, 2, ActorStatesInitTask, ActorStatesMinerTask)

strg, err := storage.NewDatabaseFromDB(ctx, db, "public")
require.NoError(t, err, "NewDatabaseFromDB")

actual, err := NewGapIndexer(nil, strg, t.Name(), minHeight, maxHeight, AllTasks).
findTaskEpochGaps(ctx)
require.NoError(t, err)

// only expect gaps at height 2
expected := makeGapReportList(fakeTipset(t, 2), ActorStatesInitTask, ActorStatesMinerTask)
assertGapReportsEqual(t, expected, actual)
})

t.Run("(#773) for each task at epoch 2 there exists a SKIP and an OK", func(t *testing.T) {
truncateVPR(t, db)
initializeVPR(t, db, maxHeight, t.Name(), AllTasks...)
skipEpochSkippedVRP(t, db, 2, AllTasks...)
appendOKAtEpochVPR(t, db, 2, AllTasks...)

strg, err := storage.NewDatabaseFromDB(ctx, db, "public")
require.NoError(t, err, "NewDatabaseFromDB")

actual, err := NewGapIndexer(nil, strg, t.Name(), minHeight, maxHeight, AllTasks).
findTaskEpochGaps(ctx)
require.NoError(t, err)

// no gaps should be found since the epoch has OK's for all tasks; the SKIPS are ignored.
require.Len(t, actual, 0)
})
}

type assertFields struct {
Expand Down Expand Up @@ -421,6 +478,17 @@ func skipEpochSkippedVRP(tb testing.TB, db *pg.DB, epoch int, tasks ...string) {
}
}

func appendOKAtEpochVPR(tb testing.TB, db *pg.DB, epoch int, tasks ...string) {
for _, task := range tasks {
qsrt := fmt.Sprintf(`
insert into public.visor_processing_reports(height, state_root, reporter, task, started_at, completed_at, status, status_information, errors_detected)
values(%d, concat(%d, '_state_root'), '%s_appendok', '%s', '2021-01-01 00:00:00.000000 +00:00', '2021-01-21 00:00:00.000000 +00:00', 'OK',null, null);
`, epoch, epoch, tb.Name(), task)
_, err := db.Exec(qsrt)
require.NoError(tb, err)
}
}

func truncateVPR(tb testing.TB, db *pg.DB) {
_, err := db.Exec(`TRUNCATE TABLE visor_processing_reports`)
require.NoError(tb, err, "visor_processing_report")
Expand Down

0 comments on commit e2bb246

Please sign in to comment.