Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(lily): Consider partially completed heights w ERRORs and SKIPs #791

Merged
merged 5 commits into from
Dec 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 45 additions & 26 deletions chain/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"context"
"time"

"github.com/deckarep/golang-set"
mapset "github.com/deckarep/golang-set"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lily/lens"
"github.com/filecoin-project/lily/model/visor"
Expand Down Expand Up @@ -151,9 +151,10 @@ func (g *GapIndexer) findEpochGapsAndNullRounds(ctx context.Context, node GapInd
`
SELECT s.i AS missing_epoch
FROM generate_series(?, ?) s(i)
WHERE NOT EXISTS (SELECT 1 FROM visor_processing_reports WHERE height = s.i);
WHERE NOT EXISTS (SELECT 1 FROM visor_processing_reports WHERE height = s.i AND status = ?)
;
`,
g.minHeight, g.maxHeight)
g.minHeight, g.maxHeight, visor.ProcessingStatusOK)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -213,31 +214,49 @@ func (g *GapIndexer) findTaskEpochGaps(ctx context.Context) (visor.GapReportList
&result,
`
select height, task
from (
select distinct vpr.task, vpr.height, vpr.status_information
from visor_processing_reports vpr
right join(
select height
from (
select task_height_count.height, count(task_height_count.height) cheight
from (
select distinct(task) as task, height
from visor_processing_reports
group by height, task
order by height desc
) task_height_count
group by task_height_count.height
) task_count_per_height
where task_count_per_height.cheight != ?
) incomplete
on vpr.height = incomplete.height
where vpr.status_information != ?
or vpr.status_information is null
) incomplete_heights_and_their_completed_tasks
where height >= ? and height <= ?
from (

-- incomplete_heights_and_their_completed_tasks: for every incomplete
-- heightin incomplete_task_counts we will take all processing_reports
-- with the same height and return height,tasks which are 'OK'
-- which provides all incomplete heights and their completed tasks
select distinct vpr.task, vpr.height
from visor_processing_reports vpr
right join(

-- incomplete_task_counts: count of all tasks by height,status
-- and filter for status = 'OK' and all task counts which
-- don't equal len(AllTasks)
select incomplete_task_counts.height, status, count(incomplete_task_counts.height) cheight
from (

-- all unique height,tasks,status within range of interest
select task, height, status
from visor_processing_reports
where height >= ?2 and height <= ?3
group by height, task, status
order by height desc, task

) incomplete_task_counts
where status = ?4
group by incomplete_task_counts.height, incomplete_task_counts.status
having count(incomplete_task_counts.height) != ?0

) incomplete_heights

on vpr.height = incomplete_heights.height
where (vpr.status_information != ?1
or vpr.status_information is null)
and vpr.status = ?4
) incomplete_heights_and_their_completed_tasks
order by height desc
;
`,
len(AllTasks), visor.ProcessingStatusInformationNullRound, g.minHeight, g.maxHeight,
len(AllTasks), // arg 0
visor.ProcessingStatusInformationNullRound, // arg 1
g.minHeight, // arg 2
g.maxHeight, // arg 3
visor.ProcessingStatusOK, // arg 4
)
if err != nil {
return nil, err
Expand Down
68 changes: 68 additions & 0 deletions chain/find_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ func TestFind(t *testing.T) {
require.NoError(t, err)

expected := makeGapReportList(tsh2, ActorStatesMinerTask, ActorStatesInitTask)
expected = append(expected, makeGapReportList(fakeTipset(t, 8), ActorStatesMinerTask)...)
assertGapReportsEqual(t, expected, actual)
})

Expand Down Expand Up @@ -319,6 +320,62 @@ func TestFind(t *testing.T) {
expected := makeGapReportList(tsh2, MessagesTask, ActorStatesInitTask)
assertGapReportsEqual(t, expected, actual)
})

t.Run("(#775) for each task at epoch 2 there exists an ERROR", func(t *testing.T) {
truncateVPR(t, db)
initializeVPR(t, db, maxHeight, t.Name(), AllTasks...)
errorEpochTasksVPR(t, db, 2, AllTasks...)

strg, err := storage.NewDatabaseFromDB(ctx, db, "public")
require.NoError(t, err, "NewDatabaseFromDB")

tsh2 := fakeTipset(t, 2)
mlens := new(MockedFindLens)
mlens.On("ChainGetTipSetByHeight", mock.Anything, tsh2.Height(), types.EmptyTSK).
Return(tsh2, nil)

actual, _, err := NewGapIndexer(nil, strg, t.Name(), minHeight, maxHeight, AllTasks).
findEpochGapsAndNullRounds(ctx, mlens)
require.NoError(t, err)

expected := makeGapReportList(tsh2, AllTasks...)
assertGapReportsEqual(t, expected, actual)
})

t.Run("(#775) for each task at epoch 2 there exists an ERROR _and_ an OK on some tasks", func(t *testing.T) {
truncateVPR(t, db)
initializeVPR(t, db, maxHeight, t.Name(), AllTasks...)
// error on some tasks
errorEpochTasksVPR(t, db, 2, ActorStatesInitTask, ActorStatesMinerTask)

strg, err := storage.NewDatabaseFromDB(ctx, db, "public")
require.NoError(t, err, "NewDatabaseFromDB")

actual, err := NewGapIndexer(nil, strg, t.Name(), minHeight, maxHeight, AllTasks).
findTaskEpochGaps(ctx)
require.NoError(t, err)

// only expect gaps at height 2
expected := makeGapReportList(fakeTipset(t, 2), ActorStatesInitTask, ActorStatesMinerTask)
assertGapReportsEqual(t, expected, actual)
})

t.Run("(#773) for each task at epoch 2 there exists a SKIP and an OK", func(t *testing.T) {
truncateVPR(t, db)
initializeVPR(t, db, maxHeight, t.Name(), AllTasks...)
skipEpochSkippedVRP(t, db, 2, AllTasks...)
appendOKAtEpochVPR(t, db, 2, AllTasks...)

strg, err := storage.NewDatabaseFromDB(ctx, db, "public")
require.NoError(t, err, "NewDatabaseFromDB")

actual, err := NewGapIndexer(nil, strg, t.Name(), minHeight, maxHeight, AllTasks).
findTaskEpochGaps(ctx)
require.NoError(t, err)

// no gaps should be found since the epoch has OK's for all tasks; the SKIPS are ignored.
require.Len(t, actual, 0)
})
}

type assertFields struct {
Expand Down Expand Up @@ -421,6 +478,17 @@ func skipEpochSkippedVRP(tb testing.TB, db *pg.DB, epoch int, tasks ...string) {
}
}

func appendOKAtEpochVPR(tb testing.TB, db *pg.DB, epoch int, tasks ...string) {
for _, task := range tasks {
qsrt := fmt.Sprintf(`
insert into public.visor_processing_reports(height, state_root, reporter, task, started_at, completed_at, status, status_information, errors_detected)
values(%d, concat(%d, '_state_root'), '%s_appendok', '%s', '2021-01-01 00:00:00.000000 +00:00', '2021-01-21 00:00:00.000000 +00:00', 'OK',null, null);
`, epoch, epoch, tb.Name(), task)
_, err := db.Exec(qsrt)
require.NoError(tb, err)
}
}

func truncateVPR(tb testing.TB, db *pg.DB) {
_, err := db.Exec(`TRUNCATE TABLE visor_processing_reports`)
require.NoError(tb, err, "visor_processing_report")
Expand Down