Skip to content

Commit

Permalink
Test fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
dhaidashenko committed Jan 22, 2025
1 parent 0fbe244 commit f729a34
Show file tree
Hide file tree
Showing 4 changed files with 68 additions and 15 deletions.
3 changes: 2 additions & 1 deletion integration-tests/smoke/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,10 @@ func TestEventLoader(t *testing.T) {
orm := logpoller.NewMockORM(t) // TODO: replace with real DB, when available
programPubKey, err := solana.PublicKeyFromBase58(programPubKey)
require.NoError(t, err)
orm.EXPECT().SelectFilters(mock.Anything).Return([]logpoller.Filter{{IsBackfilled: false, Address: logpoller.PublicKey(programPubKey)}}, nil).Once()
orm.EXPECT().SelectFilters(mock.Anything).Return([]logpoller.Filter{{ID: 1, IsBackfilled: false, Address: logpoller.PublicKey(programPubKey)}}, nil).Once()
orm.EXPECT().MarkFilterBackfilled(mock.Anything, mock.Anything).Return(nil).Once()
orm.EXPECT().GetLatestBlock(mock.Anything).Return(0, sql.ErrNoRows)
orm.EXPECT().SelectSeqNums(mock.Anything).Return(map[int64]int64{1: 0}, nil).Once()
lp := logpoller.NewWithCustomProcessor(logger.TestSugared(t), orm, cl, parser.ProcessBlocks)

require.NoError(t, lp.Start(ctx))
Expand Down
57 changes: 57 additions & 0 deletions pkg/solana/client/mocks/reader_writer.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/solana/logpoller/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"sync/atomic"

"github.com/gagliardetto/solana-go"

"github.com/smartcontractkit/chainlink-common/pkg/codec/encodings/binary"
"github.com/smartcontractkit/chainlink-common/pkg/logger"

Expand Down Expand Up @@ -388,6 +389,9 @@ func (fl *filters) LoadFilters(ctx context.Context) error {
fl.lggr.Debugw("Loading filters from db")
fl.filtersMutex.Lock()
defer fl.filtersMutex.Unlock()
if fl.loadedFilters.Load() {
return nil
}
// reset filters' indexes to ensure we do not have partial data from the previous run
fl.filtersByID = make(map[int64]*Filter)
fl.filtersByName = make(map[string]int64)
Expand Down
19 changes: 5 additions & 14 deletions pkg/solana/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,13 @@ func NewWithCustomProcessor(lggr logger.SugaredLogger, orm ORM, client RPCClient
}

func (lp *Service) start(_ context.Context) error {
lp.eng.Go(func(ctx context.Context) {
lp.eng.GoTick(services.NewTicker(time.Second), func(ctx context.Context) {
err := lp.run(ctx)
if err != nil {
lp.lggr.Errorw("log poller iteration failed - retrying", "err", err)
}
})
lp.eng.Go(lp.backgroundWorkerRun)
lp.eng.GoTick(services.NewTicker(time.Minute), lp.backgroundWorkerRun)
return nil
}

Expand Down Expand Up @@ -377,17 +377,8 @@ func appendBuffered(ch <-chan Block, max int, blocks []Block) []Block {
}

func (lp *Service) backgroundWorkerRun(ctx context.Context) {
pruneFilters := services.NewTicker(time.Minute)
defer pruneFilters.Stop()
for {
select {
case <-ctx.Done():
return
case <-pruneFilters.C:
err := lp.filters.PruneFilters(ctx)
if err != nil {
lp.lggr.Errorw("Failed to prune filters", "err", err)
}
}
err := lp.filters.PruneFilters(ctx)
if err != nil {
lp.lggr.Errorw("Failed to prune filters", "err", err)
}
}

0 comments on commit f729a34

Please sign in to comment.