Skip to content

Commit

Permalink
fix: events: sqlite db improvements (#12090)
Browse files Browse the repository at this point in the history
* fix: events: sqlite db improvements

* fix unclosed multi-row query
* tune options to limit wal growth

Ref: #12089

* fix: events: use correct context for CollectEvents transaction

* fix: events: close prepared read statement

* fix: events: close initial query; handle lint failures
  • Loading branch information
rvagg authored Jun 14, 2024
1 parent 16efdf6 commit 2e6a5c3
Showing 1 changed file with 9 additions and 4 deletions.
13 changes: 9 additions & 4 deletions chain/events/filter/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,11 @@ var pragmas = []string{
"PRAGMA temp_store = memory",
"PRAGMA mmap_size = 30000000000",
"PRAGMA page_size = 32768",
"PRAGMA auto_vacuum = NONE", // not useful until we implement GC
"PRAGMA auto_vacuum = NONE",
"PRAGMA automatic_index = OFF",
"PRAGMA journal_mode = WAL",
"PRAGMA read_uncommitted = ON",
"PRAGMA wal_autocheckpoint = 256", // checkpoint @ 256 pages
"PRAGMA journal_size_limit = 0", // always reset journal and wal files
}

// Any changes to this schema should be matched for the `lotus-shed indexes backfill-events` command
Expand Down Expand Up @@ -438,6 +439,9 @@ func NewEventIndex(ctx context.Context, path string, chainStore *store.ChainStor
eventIndex := EventIndex{db: db}

q, err := db.QueryContext(ctx, "SELECT name FROM sqlite_master WHERE type='table' AND name='_meta';")
if q != nil {
defer func() { _ = q.Close() }()
}
if errors.Is(err, sql.ErrNoRows) || !q.Next() {
// empty database, create the schema
for _, ddl := range ddls {
Expand Down Expand Up @@ -521,7 +525,7 @@ func (ei *EventIndex) Close() error {
}

func (ei *EventIndex) CollectEvents(ctx context.Context, te *TipSetEvents, revert bool, resolver func(ctx context.Context, emitter abi.ActorID, ts *types.TipSet) (address.Address, bool)) error {
tx, err := ei.db.Begin()
tx, err := ei.db.BeginTx(ctx, nil)
if err != nil {
return xerrors.Errorf("begin transaction: %w", err)
}
Expand Down Expand Up @@ -743,6 +747,7 @@ func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, exclude
if err != nil {
return xerrors.Errorf("prepare prefill query: %w", err)
}
defer func() { _ = stmt.Close() }()

q, err := stmt.QueryContext(ctx, values...)
if err != nil {
Expand All @@ -751,6 +756,7 @@ func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, exclude
}
return xerrors.Errorf("exec prefill query: %w", err)
}
defer func() { _ = q.Close() }()

var ces []*CollectedEvent
var currentID int64 = -1
Expand Down Expand Up @@ -839,7 +845,6 @@ func (ei *EventIndex) prefillFilter(ctx context.Context, f *eventFilter, exclude
Codec: row.codec,
Value: row.value,
})

}

if ce != nil {
Expand Down

0 comments on commit 2e6a5c3

Please sign in to comment.