From e2bb246096ee4ef2701bd91a49cc0a5a70c23e0d Mon Sep 17 00:00:00 2001 From: Mike Greenberg <305332+placer14@users.noreply.github.com> Date: Thu, 2 Dec 2021 14:34:02 -0500 Subject: [PATCH] fix(lily): Consider partially completed heights w ERRORs and SKIPs (#791) * fix(gapfind): Find considers partially completed heights w ERRORs * fix(gapfind): Find will ignore SKIPs which also have OKs * fix(find): Find recognizes all heights w ERRORs as gaps * chore(find): Simplify query, document, and remove redundancy * chore: gofmt and sql bug fix Co-authored-by: Mike Greenberg --- chain/find.go | 71 +++++++++++++++++++++++++++++----------------- chain/find_test.go | 68 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 113 insertions(+), 26 deletions(-) diff --git a/chain/find.go b/chain/find.go index 531006295..34adfd9fa 100644 --- a/chain/find.go +++ b/chain/find.go @@ -4,7 +4,7 @@ import ( "context" "time" - "github.com/deckarep/golang-set" + mapset "github.com/deckarep/golang-set" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/lily/lens" "github.com/filecoin-project/lily/model/visor" @@ -151,9 +151,10 @@ func (g *GapIndexer) findEpochGapsAndNullRounds(ctx context.Context, node GapInd ` SELECT s.i AS missing_epoch FROM generate_series(?, ?) s(i) - WHERE NOT EXISTS (SELECT 1 FROM visor_processing_reports WHERE height = s.i); + WHERE NOT EXISTS (SELECT 1 FROM visor_processing_reports WHERE height = s.i AND status = ?) + ; `, - g.minHeight, g.maxHeight) + g.minHeight, g.maxHeight, visor.ProcessingStatusOK) if err != nil { return nil, nil, err } @@ -213,31 +214,49 @@ func (g *GapIndexer) findTaskEpochGaps(ctx context.Context) (visor.GapReportList &result, ` select height, task -from ( - select distinct vpr.task, vpr.height, vpr.status_information - from visor_processing_reports vpr - right join( - select height - from ( - select task_height_count.height, count(task_height_count.height) cheight - from ( - select distinct(task) as task, height - from visor_processing_reports - group by height, task - order by height desc - ) task_height_count - group by task_height_count.height - ) task_count_per_height - where task_count_per_height.cheight != ? - ) incomplete - on vpr.height = incomplete.height - where vpr.status_information != ? - or vpr.status_information is null - ) incomplete_heights_and_their_completed_tasks -where height >= ? and height <= ? + from ( + + -- incomplete_heights_and_their_completed_tasks: for every incomplete + -- heightin incomplete_task_counts we will take all processing_reports + -- with the same height and return height,tasks which are 'OK' + -- which provides all incomplete heights and their completed tasks + select distinct vpr.task, vpr.height + from visor_processing_reports vpr + right join( + + -- incomplete_task_counts: count of all tasks by height,status + -- and filter for status = 'OK' and all task counts which + -- don't equal len(AllTasks) + select incomplete_task_counts.height, status, count(incomplete_task_counts.height) cheight + from ( + + -- all unique height,tasks,status within range of interest + select task, height, status + from visor_processing_reports + where height >= ?2 and height <= ?3 + group by height, task, status + order by height desc, task + + ) incomplete_task_counts + where status = ?4 + group by incomplete_task_counts.height, incomplete_task_counts.status + having count(incomplete_task_counts.height) != ?0 + + ) incomplete_heights + + on vpr.height = incomplete_heights.height + where (vpr.status_information != ?1 + or vpr.status_information is null) + and vpr.status = ?4 +) incomplete_heights_and_their_completed_tasks order by height desc +; `, - len(AllTasks), visor.ProcessingStatusInformationNullRound, g.minHeight, g.maxHeight, + len(AllTasks), // arg 0 + visor.ProcessingStatusInformationNullRound, // arg 1 + g.minHeight, // arg 2 + g.maxHeight, // arg 3 + visor.ProcessingStatusOK, // arg 4 ) if err != nil { return nil, err diff --git a/chain/find_test.go b/chain/find_test.go index 360d42fc9..2016885e8 100644 --- a/chain/find_test.go +++ b/chain/find_test.go @@ -247,6 +247,7 @@ func TestFind(t *testing.T) { require.NoError(t, err) expected := makeGapReportList(tsh2, ActorStatesMinerTask, ActorStatesInitTask) + expected = append(expected, makeGapReportList(fakeTipset(t, 8), ActorStatesMinerTask)...) assertGapReportsEqual(t, expected, actual) }) @@ -319,6 +320,62 @@ func TestFind(t *testing.T) { expected := makeGapReportList(tsh2, MessagesTask, ActorStatesInitTask) assertGapReportsEqual(t, expected, actual) }) + + t.Run("(#775) for each task at epoch 2 there exists an ERROR", func(t *testing.T) { + truncateVPR(t, db) + initializeVPR(t, db, maxHeight, t.Name(), AllTasks...) + errorEpochTasksVPR(t, db, 2, AllTasks...) + + strg, err := storage.NewDatabaseFromDB(ctx, db, "public") + require.NoError(t, err, "NewDatabaseFromDB") + + tsh2 := fakeTipset(t, 2) + mlens := new(MockedFindLens) + mlens.On("ChainGetTipSetByHeight", mock.Anything, tsh2.Height(), types.EmptyTSK). + Return(tsh2, nil) + + actual, _, err := NewGapIndexer(nil, strg, t.Name(), minHeight, maxHeight, AllTasks). + findEpochGapsAndNullRounds(ctx, mlens) + require.NoError(t, err) + + expected := makeGapReportList(tsh2, AllTasks...) + assertGapReportsEqual(t, expected, actual) + }) + + t.Run("(#775) for each task at epoch 2 there exists an ERROR _and_ an OK on some tasks", func(t *testing.T) { + truncateVPR(t, db) + initializeVPR(t, db, maxHeight, t.Name(), AllTasks...) + // error on some tasks + errorEpochTasksVPR(t, db, 2, ActorStatesInitTask, ActorStatesMinerTask) + + strg, err := storage.NewDatabaseFromDB(ctx, db, "public") + require.NoError(t, err, "NewDatabaseFromDB") + + actual, err := NewGapIndexer(nil, strg, t.Name(), minHeight, maxHeight, AllTasks). + findTaskEpochGaps(ctx) + require.NoError(t, err) + + // only expect gaps at height 2 + expected := makeGapReportList(fakeTipset(t, 2), ActorStatesInitTask, ActorStatesMinerTask) + assertGapReportsEqual(t, expected, actual) + }) + + t.Run("(#773) for each task at epoch 2 there exists a SKIP and an OK", func(t *testing.T) { + truncateVPR(t, db) + initializeVPR(t, db, maxHeight, t.Name(), AllTasks...) + skipEpochSkippedVRP(t, db, 2, AllTasks...) + appendOKAtEpochVPR(t, db, 2, AllTasks...) + + strg, err := storage.NewDatabaseFromDB(ctx, db, "public") + require.NoError(t, err, "NewDatabaseFromDB") + + actual, err := NewGapIndexer(nil, strg, t.Name(), minHeight, maxHeight, AllTasks). + findTaskEpochGaps(ctx) + require.NoError(t, err) + + // no gaps should be found since the epoch has OK's for all tasks; the SKIPS are ignored. + require.Len(t, actual, 0) + }) } type assertFields struct { @@ -421,6 +478,17 @@ func skipEpochSkippedVRP(tb testing.TB, db *pg.DB, epoch int, tasks ...string) { } } +func appendOKAtEpochVPR(tb testing.TB, db *pg.DB, epoch int, tasks ...string) { + for _, task := range tasks { + qsrt := fmt.Sprintf(` + insert into public.visor_processing_reports(height, state_root, reporter, task, started_at, completed_at, status, status_information, errors_detected) + values(%d, concat(%d, '_state_root'), '%s_appendok', '%s', '2021-01-01 00:00:00.000000 +00:00', '2021-01-21 00:00:00.000000 +00:00', 'OK',null, null); + `, epoch, epoch, tb.Name(), task) + _, err := db.Exec(qsrt) + require.NoError(tb, err) + } +} + func truncateVPR(tb testing.TB, db *pg.DB) { _, err := db.Exec(`TRUNCATE TABLE visor_processing_reports`) require.NoError(tb, err, "visor_processing_report")