Skip to content

Commit

Permalink
feat: prune ineffectual mempool txs (#6443)
Browse files Browse the repository at this point in the history
## Motivation

Adds support for eviction of ineffectual transactions from the mempool.
  • Loading branch information
acud committed Nov 15, 2024
1 parent 0f568d2 commit c4e59b6
Show file tree
Hide file tree
Showing 15 changed files with 241 additions and 14 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ See [RELEASE](./RELEASE.md) for workflow instructions.

* [#6422](https://github.com/spacemeshos/go-spacemesh/pull/6422) Further improved performance of the proposal building
process to avoid late proposals.
* [#6443](https://github.com/spacemeshos/go-spacemesh/pull/6443) Improve eviction of ineffectual transactions in the database
which will now show up as ineffectual when querying them from the API.

* [#6431](https://github.com/spacemeshos/go-spacemesh/pull/6431) Fix db-allow-schema-drift handling

Expand Down
4 changes: 4 additions & 0 deletions api/grpcserver/grpcserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,10 @@ func (t *ConStateAPIMock) GetStateRoot() (types.Hash32, error) {
return stateRoot, nil
}

func (t *ConStateAPIMock) HasEvicted(id types.TransactionID) (bool, error) {
panic("not implemented")
}

func (t *ConStateAPIMock) GetMeshTransaction(id types.TransactionID) (*types.MeshTransaction, error) {
tx, ok := t.returnTx[id]
if ok {
Expand Down
1 change: 1 addition & 0 deletions api/grpcserver/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type conservativeState interface {
GetMeshTransactions([]types.TransactionID) ([]*types.MeshTransaction, map[types.TransactionID]struct{})
GetTransactionsByAddress(types.LayerID, types.LayerID, types.Address) ([]*types.MeshTransaction, error)
Validation(raw types.RawTx) system.ValidationRequest
HasEvicted(tid types.TransactionID) (bool, error)
}

// syncer is the API to get sync status.
Expand Down
39 changes: 39 additions & 0 deletions api/grpcserver/mocks.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 9 additions & 1 deletion api/grpcserver/transaction_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,15 @@ func (s *TransactionService) getTransactionAndStatus(
case types.APPLIED:
state = pb.TransactionState_TRANSACTION_STATE_PROCESSED
default:
state = pb.TransactionState_TRANSACTION_STATE_UNSPECIFIED
evicted, err := s.conState.HasEvicted(txID)
if err != nil {
return nil, state
}
if evicted {
state = pb.TransactionState_TRANSACTION_STATE_INEFFECTUAL
} else {
state = pb.TransactionState_TRANSACTION_STATE_UNSPECIFIED
}
}
return &tx.Transaction, state
}
Expand Down
8 changes: 4 additions & 4 deletions genvm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func (v *VM) Apply(

for _, reward := range rewardsResult {
if err := rewards.Add(tx, &reward); err != nil {
return nil, nil, fmt.Errorf("%w: %w", core.ErrInternal, err)
return nil, nil, fmt.Errorf("add reward %w: %w", core.ErrInternal, err)
}
}

Expand All @@ -247,17 +247,17 @@ func (v *VM) Apply(
return true
})
if err != nil {
return nil, nil, fmt.Errorf("%w: %w", core.ErrInternal, err)
return nil, nil, fmt.Errorf("iterate changed %w: %w", core.ErrInternal, err)
}
writesPerBlock.Observe(float64(total))

var hashSum types.Hash32
hasher.Sum(hashSum[:0])
if err := layers.UpdateStateHash(tx, layer, hashSum); err != nil {
return nil, nil, err
return nil, nil, fmt.Errorf("update state hash: %w", err)
}
if err := tx.Commit(); err != nil {
return nil, nil, fmt.Errorf("%w: %w", core.ErrInternal, err)
return nil, nil, fmt.Errorf("commit %w: %w", core.ErrInternal, err)
}
ss.IterateChanged(func(account *core.Account) bool {
if err := events.ReportAccountUpdate(account.Address); err != nil {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ require (
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1
github.com/seehuhn/mt19937 v1.0.0
github.com/slok/go-http-metrics v0.13.0
github.com/spacemeshos/api/release/go v1.55.0
github.com/spacemeshos/api/release/go v1.56.0
github.com/spacemeshos/economics v0.1.4
github.com/spacemeshos/fixed v0.1.2
github.com/spacemeshos/go-scale v1.2.1
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -629,8 +629,8 @@ github.com/sourcegraph/annotate v0.0.0-20160123013949-f4cad6c6324d/go.mod h1:Udh
github.com/sourcegraph/conc v0.3.0 h1:OQTbbt6P72L20UqAkXXuLOj79LfEanQ+YQFNpLA9ySo=
github.com/sourcegraph/conc v0.3.0/go.mod h1:Sdozi7LEKbFPqYX2/J+iBAM6HpqSLTASQIKqDmF7Mt0=
github.com/sourcegraph/syntaxhighlight v0.0.0-20170531221838-bd320f5d308e/go.mod h1:HuIsMU8RRBOtsCgI77wP899iHVBQpCmg4ErYMZB+2IA=
github.com/spacemeshos/api/release/go v1.55.0 h1:IQ8PmQ1d7CwUiM1r3NH8uZ+JkEyNjSltiAuqEY6dn6o=
github.com/spacemeshos/api/release/go v1.55.0/go.mod h1:qM6GTS2QtUvxPNIJf+2ObH63bGXYrJnapgOd6l6pbpQ=
github.com/spacemeshos/api/release/go v1.56.0 h1:llBVijoO4I3mhHk0OtGJdTT/11I7ajo0CZp3x8h1EjA=
github.com/spacemeshos/api/release/go v1.56.0/go.mod h1:6o17nhNyXpbVeijAQqkZfL8Pe/IkMGAWMLSLZni0DOU=
github.com/spacemeshos/economics v0.1.4 h1:twlawrcQhYNqPgyDv08+24EL/OgUKz3d7q+PvJIAND0=
github.com/spacemeshos/economics v0.1.4/go.mod h1:6HKWKiKdxjVQcGa2z/wA0LR4M/DzKib856bP16yqNmQ=
github.com/spacemeshos/fixed v0.1.2 h1:pENQ8pXFAqin3f15ZLoOVVeSgcmcFJ0IFdFm4+9u4SM=
Expand Down
5 changes: 5 additions & 0 deletions sql/statesql/schema/migrations/0026_pruned_txs.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
CREATE TABLE evicted_mempool (
id CHAR(32) NOT NULL,
time INT NOT NULL,
PRIMARY KEY (id)
);
7 changes: 6 additions & 1 deletion sql/statesql/schema/schema.sql
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
PRAGMA user_version = 25;
PRAGMA user_version = 26;
CREATE TABLE accounts
(
address CHAR(24),
Expand Down Expand Up @@ -69,6 +69,11 @@ CREATE TABLE certificates
valid bool NOT NULL,
PRIMARY KEY (layer, block)
);
CREATE TABLE evicted_mempool (
id CHAR(32) NOT NULL,
time INT NOT NULL,
PRIMARY KEY (id)
);
CREATE TABLE identities
(
pubkey CHAR(32) PRIMARY KEY,
Expand Down
63 changes: 63 additions & 0 deletions sql/transactions/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,17 @@ func Has(db sql.Executor, id types.TransactionID) (bool, error) {
return rows > 0, nil
}

func HasEvicted(db sql.Executor, id types.TransactionID) (bool, error) {
rows, err := db.Exec("select 1 from evicted_mempool where id = ?1",
func(stmt *sql.Statement) {
stmt.BindBytes(1, id.Bytes())
}, nil)
if err != nil {
return false, fmt.Errorf("has evicted %s: %w", id, err)
}
return rows > 0, nil
}

// GetByAddress finds all transactions for an address.
func GetByAddress(db sql.Executor, from, to types.LayerID, address types.Address) ([]*types.MeshTransaction, error) {
var txs []*types.MeshTransaction
Expand Down Expand Up @@ -295,6 +306,58 @@ func GetAcctPendingFromNonce(db sql.Executor, address types.Address, from uint64
}, "get acct pending from nonce")
}

// GetAcctPendingToNonce get all pending transactions with nonce before `to` for the given address.
func GetAcctPendingToNonce(db sql.Executor, address types.Address, to uint64) ([]types.TransactionID, error) {
ids := make([]types.TransactionID, 0)
_, err := db.Exec(`select id from transactions
where principal = ?1 and nonce < ?2 and result is null
order by nonce asc, timestamp asc;`,
func(stmt *sql.Statement) {
stmt.BindBytes(1, address.Bytes())
stmt.BindBytes(2, util.Uint64ToBytesBigEndian(to))
}, func(stmt *sql.Statement) bool {
id := types.TransactionID{}
stmt.ColumnBytes(0, id[:])
ids = append(ids, id)
return true
})
if err != nil {
return nil, fmt.Errorf("get acct pending to nonce %s: %w", address, err)
}
return ids, nil
}

func SetEvicted(db sql.Executor, id types.TransactionID) error {
if _, err := db.Exec("insert into evicted_mempool (id, time) values (?1, ?2) on conflict do nothing;",
func(stmt *sql.Statement) {
stmt.BindBytes(1, id.Bytes())
stmt.BindInt64(2, time.Now().UnixNano())
}, nil); err != nil {
return fmt.Errorf("set evicted %s: %w", id, err)
}
return nil
}

func Delete(db sql.Executor, id types.TransactionID) error {
if _, err := db.Exec("delete from transactions where id = ?1;",
func(stmt *sql.Statement) {
stmt.BindBytes(1, id.Bytes())
}, nil); err != nil {
return fmt.Errorf("delete %s: %w", id, err)
}
return nil
}

func PruneEvicted(db sql.Executor, before time.Time) error {
if _, err := db.Exec("delete from evicted_mempool where time < ?1;",
func(stmt *sql.Statement) {
stmt.BindInt64(1, before.UnixNano())
}, nil); err != nil {
return fmt.Errorf("prune evicted %w", err)
}
return nil
}

// query MUST ensure that this order of fields tx, header, layer, block, timestamp, id.
func queryPending(
db sql.Executor,
Expand Down
70 changes: 70 additions & 0 deletions sql/transactions/transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,3 +562,73 @@ func TestTransactionInBlock(t *testing.T) {
_, _, err = transactions.TransactionInBlock(db, tid, lids[2])
require.ErrorIs(t, err, sql.ErrNotFound)
}

func TestTransactionEvictMempool(t *testing.T) {
principals := []types.Address{
{1},
{2},
{3},
}
txs := []types.Transaction{
{
RawTx: types.RawTx{ID: types.TransactionID{1}},
TxHeader: &types.TxHeader{Principal: principals[0], Nonce: 0},
},
{
RawTx: types.RawTx{ID: types.TransactionID{2}},
TxHeader: &types.TxHeader{Principal: principals[0], Nonce: 1},
},
{
RawTx: types.RawTx{ID: types.TransactionID{3}},
TxHeader: &types.TxHeader{Principal: principals[1], Nonce: 0},
},
}
db := statesql.InMemoryTest(t)
for _, tx := range txs {
require.NoError(t, transactions.Add(db, &tx, time.Time{}))
}
err := transactions.SetEvicted(db, types.TransactionID{1})
require.NoError(t, err)

err = transactions.Delete(db, types.TransactionID{1})
require.NoError(t, err)

pending, err := transactions.GetAcctPendingFromNonce(db, principals[0], 1)
require.NoError(t, err)
require.Len(t, pending, 1)
require.Equal(t, pending[0].ID, txs[1].ID)

pending, err = transactions.GetAcctPendingFromNonce(db, principals[1], 0)
require.NoError(t, err)
require.Len(t, pending, 1)
require.Equal(t, pending[0].ID, txs[2].ID)

has, err := transactions.Has(db, txs[0].ID)
require.False(t, has)
require.NoError(t, err)

has, err = transactions.HasEvicted(db, txs[0].ID)
require.True(t, has)
require.NoError(t, err)
}

func TestPruneEvicted(t *testing.T) {
txId := types.TransactionID{1}
db := statesql.InMemoryTest(t)
db.Exec(`insert into evicted_mempool (id, time) values (?1,?2);`,
func(stmt *sql.Statement) {
stmt.BindBytes(1, txId.Bytes())
stmt.BindInt64(2, time.Now().Add(-13*time.Hour).UnixNano())
}, nil)

has, err := transactions.HasEvicted(db, txId)
require.True(t, has)
require.NoError(t, err)

err = transactions.PruneEvicted(db, time.Now().Add(-12*time.Hour))
require.NoError(t, err)

has, err = transactions.HasEvicted(db, txId)
require.False(t, has)
require.NoError(t, err)
}
28 changes: 28 additions & 0 deletions txs/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,32 @@ func (ac *accountCache) resetAfterApply(
ac.txsByNonce = list.New()
ac.startNonce = nextNonce
ac.startBalance = newBalance

err := ac.evictPendingNonce(db)
if err != nil {
return fmt.Errorf("evict pending: %w", err)
}
return ac.addPendingFromNonce(logger, db, ac.startNonce, applied)
}

func (ac *accountCache) evictPendingNonce(db sql.StateDatabase) error {
return db.WithTxImmediate(context.Background(), func(tx sql.Transaction) error {
txIds, err := transactions.GetAcctPendingToNonce(tx, ac.addr, ac.startNonce)
if err != nil {
return fmt.Errorf("get pending to nonce: %w", err)
}
for _, tid := range txIds {
if err := transactions.SetEvicted(tx, tid); err != nil {
return fmt.Errorf("set evicted for %s: %w", tid, err)
}
if err := transactions.Delete(tx, tid); err != nil {
return fmt.Errorf("delete tx %s: %w", tid, err)
}
}
return nil
})
}

func (ac *accountCache) shouldEvict() bool {
return ac.txsByNonce.Len() == 0 && !ac.moreInDB
}
Expand Down Expand Up @@ -776,6 +799,11 @@ func (c *Cache) ApplyLayer(
}
acctResetDuration.Observe(float64(time.Since(t2)))
}

err := transactions.PruneEvicted(db, time.Now().Add(-12*time.Hour))
if err != nil {
logger.Warn("failed to prune evicted", zap.Error(err))
}
return nil
}

Expand Down
8 changes: 3 additions & 5 deletions txs/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,10 +334,10 @@ func TestCache_Account_HappyFlow(t *testing.T) {
checkProjection(t, tc.Cache, ta.principal, newNextNonce, newBalance+income)
// mempool is unchanged
checkMempool(t, tc.Cache, expectedMempool)

// pruning has removed old and ineffective txs
for _, mtx := range append(oldNonces, sameNonces...) {
got, err := transactions.Get(tc.db, mtx.ID)
require.NoError(t, err)
require.Equal(t, types.MEMPOOL, got.State)
checkTXNotInDB(t, tc.db, mtx.ID)
}

// revert to one layer before lid
Expand All @@ -357,8 +357,6 @@ func TestCache_Account_HappyFlow(t *testing.T) {
}
checkProjection(t, tc.Cache, ta.principal, newNextNonce, newBalance)
checkTXStateFromDB(t, tc.db, mtxs, types.MEMPOOL)
checkTXStateFromDB(t, tc.db, oldNonces, types.MEMPOOL)
checkTXStateFromDB(t, tc.db, sameNonces, types.MEMPOOL)
}

func TestCache_Account_TXInMultipleLayers(t *testing.T) {
Expand Down
4 changes: 4 additions & 0 deletions txs/conservative_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,3 +283,7 @@ func ShuffleWithNonceOrder(
})))
return result
}

func (cs *ConservativeState) HasEvicted(tid types.TransactionID) (bool, error) {
return transactions.HasEvicted(cs.db, tid)
}

0 comments on commit c4e59b6

Please sign in to comment.