diff --git a/model/derived/gasoutputs.go b/model/derived/gasoutputs.go index e6e66d807..d33a5db24 100644 --- a/model/derived/gasoutputs.go +++ b/model/derived/gasoutputs.go @@ -65,3 +65,8 @@ func (l GasOutputsList) PersistWithTx(ctx context.Context, tx *pg.Tx) error { } return nil } + +type ProcessingGasOutputs struct { + Height int64 + GasOutputs +} diff --git a/storage/sql.go b/storage/sql.go index c8eb3754e..1859b5869 100644 --- a/storage/sql.go +++ b/storage/sql.go @@ -435,14 +435,14 @@ func (d *Database) FindActors(ctx context.Context, claimUntil time.Time, batchSi return actors, nil } -func (d *Database) MarkActorComplete(ctx context.Context, head string, code string, completedAt time.Time, errorsDetected string) error { +func (d *Database) MarkActorComplete(ctx context.Context, height int64, head string, code string, completedAt time.Time, errorsDetected string) error { if err := d.DB.RunInTransaction(ctx, func(tx *pg.Tx) error { _, err := tx.ExecContext(ctx, ` UPDATE visor_processing_actors SET claimed_until = null, completed_at = ?, errors_detected = ? - WHERE head = ? AND code = ? + WHERE height = ? AND head = ? AND code = ? `, completedAt, useNullIfEmpty(errorsDetected), head, code) if err != nil { return err @@ -521,8 +521,8 @@ func useNullIfEmpty(s string) *string { } // LeaseGasOutputsMessages leases a set of messages that have receipts for gas output processing. minHeight and maxHeight define an inclusive range of heights to process. -func (d *Database) LeaseGasOutputsMessages(ctx context.Context, claimUntil time.Time, batchSize int, minHeight, maxHeight int64) (derived.GasOutputsList, error) { - var list derived.GasOutputsList +func (d *Database) LeaseGasOutputsMessages(ctx context.Context, claimUntil time.Time, batchSize int, minHeight, maxHeight int64) ([]*derived.ProcessingGasOutputs, error) { + var list []*derived.ProcessingGasOutputs if err := d.DB.RunInTransaction(ctx, func(tx *pg.Tx) error { _, err := tx.QueryContext(ctx, &list, ` @@ -561,15 +561,15 @@ SELECT * FROM leased; return list, nil } -func (d *Database) MarkGasOutputsMessagesComplete(ctx context.Context, cid string, completedAt time.Time, errorsDetected string) error { +func (d *Database) MarkGasOutputsMessagesComplete(ctx context.Context, height int64, cid string, completedAt time.Time, errorsDetected string) error { if err := d.DB.RunInTransaction(ctx, func(tx *pg.Tx) error { _, err := tx.ExecContext(ctx, ` UPDATE visor_processing_messages SET gas_outputs_claimed_until = null, gas_outputs_completed_at = ?, gas_outputs_errors_detected = ? - WHERE cid = ? -`, completedAt, useNullIfEmpty(errorsDetected), cid) + WHERE height = ? AND cid = ? +`, completedAt, useNullIfEmpty(errorsDetected), height, cid) if err != nil { return err } diff --git a/storage/sql_test.go b/storage/sql_test.go index e2a36c105..4d1da8cd3 100644 --- a/storage/sql_test.go +++ b/storage/sql_test.go @@ -329,7 +329,7 @@ func TestMarkActorComplete(t *testing.T) { t.Run("with error message", func(t *testing.T) { completedAt := testutil.KnownTime.Add(time.Minute * 1) - err = d.MarkActorComplete(ctx, "head1", "codeB", completedAt, "message") + err = d.MarkActorComplete(ctx, 1, "head1", "codeB", completedAt, "message") require.NoError(t, err) // Check the database contains the updated row @@ -341,7 +341,7 @@ func TestMarkActorComplete(t *testing.T) { t.Run("without error message", func(t *testing.T) { completedAt := testutil.KnownTime.Add(time.Minute * 2) - err = d.MarkActorComplete(ctx, "head1", "codeB", completedAt, "") + err = d.MarkActorComplete(ctx, 1, "head1", "codeB", completedAt, "") require.NoError(t, err) // Check the database contains the updated row with a null errors_detected column @@ -713,7 +713,7 @@ func TestMarkGasOutputsMessagesComplete(t *testing.T) { t.Run("with error message", func(t *testing.T) { completedAt := testutil.KnownTime.Add(time.Minute * 1) - err = d.MarkGasOutputsMessagesComplete(ctx, "cid1", completedAt, "message") + err = d.MarkGasOutputsMessagesComplete(ctx, 1, "cid1", completedAt, "message") require.NoError(t, err) // Check the database contains the updated row @@ -725,7 +725,7 @@ func TestMarkGasOutputsMessagesComplete(t *testing.T) { t.Run("without error message", func(t *testing.T) { completedAt := testutil.KnownTime.Add(time.Minute * 2) - err = d.MarkGasOutputsMessagesComplete(ctx, "cid1", completedAt, "") + err = d.MarkGasOutputsMessagesComplete(ctx, 1, "cid1", completedAt, "") require.NoError(t, err) // Check the database contains the updated row with a null errors_detected column diff --git a/tasks/actorstate/actorstate.go b/tasks/actorstate/actorstate.go index bfa855ec6..a2ba523d9 100644 --- a/tasks/actorstate/actorstate.go +++ b/tasks/actorstate/actorstate.go @@ -267,7 +267,7 @@ func (p *ActorStateProcessor) processBatch(ctx context.Context, node lens.API) ( info, err := NewActorInfo(actor) if err != nil { errorLog.Errorw("unmarshal actor", "error", err.Error()) - if err := p.storage.MarkActorComplete(ctx, actor.Head, actor.Code, p.clock.Now(), err.Error()); err != nil { + if err := p.storage.MarkActorComplete(ctx, actor.Height, actor.Head, actor.Code, p.clock.Now(), err.Error()); err != nil { errorLog.Errorw("failed to mark actor complete", "error", err.Error()) } continue @@ -275,14 +275,14 @@ func (p *ActorStateProcessor) processBatch(ctx context.Context, node lens.API) ( if err := p.processActor(ctx, node, info); err != nil { errorLog.Errorw("process actor", "error", err.Error()) - if err := p.storage.MarkActorComplete(ctx, actor.Head, actor.Code, p.clock.Now(), err.Error()); err != nil { + if err := p.storage.MarkActorComplete(ctx, actor.Height, actor.Head, actor.Code, p.clock.Now(), err.Error()); err != nil { errorLog.Errorw("failed to mark actor complete", "error", err.Error()) } return false, xerrors.Errorf("process actor: %w", err) } - if err := p.storage.MarkActorComplete(ctx, actor.Head, actor.Code, p.clock.Now(), ""); err != nil { + if err := p.storage.MarkActorComplete(ctx, actor.Height, actor.Head, actor.Code, p.clock.Now(), ""); err != nil { errorLog.Errorw("failed to mark actor complete", "error", err.Error()) } } diff --git a/tasks/message/gasoutputs.go b/tasks/message/gasoutputs.go index 535a0523d..217831948 100644 --- a/tasks/message/gasoutputs.go +++ b/tasks/message/gasoutputs.go @@ -91,16 +91,16 @@ func (p *GasOutputsProcessor) processBatch(ctx context.Context, node lens.API) ( errorLog := log.With("cid", item.Cid) - if err := p.processItem(ctx, node, item); err != nil { + if err := p.processItem(ctx, node, &item.GasOutputs); err != nil { // Any errors are likely to be problems using the lens, mark this tipset as failed and exit this batch errorLog.Errorw("failed to process message", "error", err.Error()) - if err := p.storage.MarkGasOutputsMessagesComplete(ctx, item.Cid, p.clock.Now(), err.Error()); err != nil { + if err := p.storage.MarkGasOutputsMessagesComplete(ctx, item.Height, item.Cid, p.clock.Now(), err.Error()); err != nil { errorLog.Errorw("failed to mark message complete", "error", err.Error()) } return false, xerrors.Errorf("process item: %w", err) } - if err := p.storage.MarkGasOutputsMessagesComplete(ctx, item.Cid, p.clock.Now(), ""); err != nil { + if err := p.storage.MarkGasOutputsMessagesComplete(ctx, item.Height, item.Cid, p.clock.Now(), ""); err != nil { errorLog.Errorw("failed to mark message complete", "error", err.Error()) } }