Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(lily): Consider partially completed heights w ERRORs and SKIPs #791

Merged
merged 5 commits into from
Dec 2, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 33 additions & 23 deletions chain/find.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,31 +213,41 @@ func (g *GapIndexer) findTaskEpochGaps(ctx context.Context) (visor.GapReportList
&result,
`
select height, task
from (
select distinct vpr.task, vpr.height, vpr.status_information
from visor_processing_reports vpr
right join(
select height
from (
select task_height_count.height, count(task_height_count.height) cheight
from (
select distinct(task) as task, height
from visor_processing_reports
group by height, task
order by height desc
) task_height_count
group by task_height_count.height
) task_count_per_height
where task_count_per_height.cheight != ?
) incomplete
on vpr.height = incomplete.height
where vpr.status_information != ?
or vpr.status_information is null
) incomplete_heights_and_their_completed_tasks
where height >= ? and height <= ?
from (
select distinct vpr.task, vpr.height
from visor_processing_reports vpr
right join(
select height, cheight
from (
select task_height_count.height, status, count(task_height_count.height) cheight
from (
select task, height, status
from visor_processing_reports
where height >= ?2 and height <= ?3
group by height, task, status
order by height desc, task
) task_height_count
where status = ?5
group by task_height_count.height, task_height_count.status
) task_count_per_height
where task_count_per_height.cheight != ?0
and status = ?5
) incomplete
on vpr.height = incomplete.height
where (vpr.status_information != ?1
or vpr.status_information is null)
and vpr.status = ?5
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixes #773?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct.

Copy link
Contributor Author

@placer14 placer14 Dec 1, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added notes within the query to explain what's happening here.

and vpr.height >= ?2 and vpr.height <= ?3
) incomplete_heights_and_their_completed_tasks
order by height desc
;
`,
len(AllTasks), visor.ProcessingStatusInformationNullRound, g.minHeight, g.maxHeight,
len(AllTasks), // arg 0
visor.ProcessingStatusInformationNullRound, // arg 1
g.minHeight, // arg 2
g.maxHeight, // arg 3
visor.ProcessingStatusError, // arg 4
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Marks errors as gaps?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added notes within the query to explain what's happening here.

visor.ProcessingStatusOK, //arg 5
)
if err != nil {
return nil, err
Expand Down
47 changes: 47 additions & 0 deletions chain/find_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ func TestFind(t *testing.T) {
require.NoError(t, err)

expected := makeGapReportList(tsh2, ActorStatesMinerTask, ActorStatesInitTask)
expected = append(expected, makeGapReportList(fakeTipset(t, 8), ActorStatesMinerTask)...)
assertGapReportsEqual(t, expected, actual)
})

Expand Down Expand Up @@ -319,6 +320,41 @@ func TestFind(t *testing.T) {
expected := makeGapReportList(tsh2, MessagesTask, ActorStatesInitTask)
assertGapReportsEqual(t, expected, actual)
})

t.Run("(#775) for each task at epoch 2 there exists an ERROR _and_ an OK on some tasks", func(t *testing.T) {
truncateVPR(t, db)
initializeVPR(t, db, maxHeight, t.Name(), AllTasks...)
// error on some tasks
errorEpochTasksVPR(t, db, 2, ActorStatesInitTask, ActorStatesMinerTask)

strg, err := storage.NewDatabaseFromDB(ctx, db, "public")
require.NoError(t, err, "NewDatabaseFromDB")

actual, err := NewGapIndexer(nil, strg, t.Name(), minHeight, maxHeight, AllTasks).
findTaskEpochGaps(ctx)
require.NoError(t, err)

// only expect gaps at height 2
expected := makeGapReportList(fakeTipset(t, 2), ActorStatesInitTask, ActorStatesMinerTask)
assertGapReportsEqual(t, expected, actual)
})

t.Run("(#773) for each task at epoch 2 there exists a SKIP and an OK", func(t *testing.T) {
truncateVPR(t, db)
initializeVPR(t, db, maxHeight, t.Name(), AllTasks...)
skipEpochSkippedVRP(t, db, 2, AllTasks...)
appendOKAtEpochVPR(t, db, 2, AllTasks...)

strg, err := storage.NewDatabaseFromDB(ctx, db, "public")
require.NoError(t, err, "NewDatabaseFromDB")

actual, err := NewGapIndexer(nil, strg, t.Name(), minHeight, maxHeight, AllTasks).
findTaskEpochGaps(ctx)
require.NoError(t, err)

// no gaps should be found since the epoch has OK's for all tasks; the SKIPS are ignored.
require.Len(t, actual, 0)
})
}

type assertFields struct {
Expand Down Expand Up @@ -421,6 +457,17 @@ func skipEpochSkippedVRP(tb testing.TB, db *pg.DB, epoch int, tasks ...string) {
}
}

func appendOKAtEpochVPR(tb testing.TB, db *pg.DB, epoch int, tasks ...string) {
for _, task := range tasks {
qsrt := fmt.Sprintf(`
insert into public.visor_processing_reports(height, state_root, reporter, task, started_at, completed_at, status, status_information, errors_detected)
values(%d, concat(%d, '_state_root'), '%s_appendok', '%s', '2021-01-01 00:00:00.000000 +00:00', '2021-01-21 00:00:00.000000 +00:00', 'OK',null, null);
`, epoch, epoch, tb.Name(), task)
_, err := db.Exec(qsrt)
require.NoError(tb, err)
}
}

func truncateVPR(tb testing.TB, db *pg.DB) {
_, err := db.Exec(`TRUNCATE TABLE visor_processing_reports`)
require.NoError(tb, err, "visor_processing_report")
Expand Down