Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Small message pool refactors #5162

Merged
merged 2 commits into from
Dec 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 0 additions & 14 deletions api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (

datatransfer "github.com/filecoin-project/go-data-transfer"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/lotus/build"
"github.com/ipfs/go-cid"

"github.com/libp2p/go-libp2p-core/peer"
Expand Down Expand Up @@ -51,19 +50,6 @@ type MessageSendSpec struct {
MaxFee abi.TokenAmount
}

var DefaultMessageSendSpec = MessageSendSpec{
// MaxFee of 0.1FIL
MaxFee: abi.NewTokenAmount(int64(build.FilecoinPrecision) / 10),
}

func (ms *MessageSendSpec) Get() MessageSendSpec {
if ms == nil {
return DefaultMessageSendSpec
}

return *ms
}

type DataTransferChannel struct {
TransferID datatransfer.TransferID
Status datatransfer.Status
Expand Down
15 changes: 14 additions & 1 deletion chain/messagepool/messagepool.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,20 @@ func ComputeMinRBF(curPrem abi.TokenAmount) abi.TokenAmount {
return types.BigAdd(minPrice, types.NewInt(1))
}

func CapGasFee(mff dtypes.DefaultMaxFeeFunc, msg *types.Message, maxFee abi.TokenAmount) {
func CapGasFee(mff dtypes.DefaultMaxFeeFunc, msg *types.Message, sendSepc *api.MessageSendSpec) {
var maxFee abi.TokenAmount
if sendSepc != nil {
maxFee = sendSepc.MaxFee
}
if maxFee.Int == nil || maxFee.Equals(big.Zero()) {
mf, err := mff()
if err != nil {
log.Errorf("failed to get default max gas fee: %+v", err)
mf = big.Zero()
}
maxFee = mf
}

if maxFee.Equals(big.Zero()) {
mf, err := mff()
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions chain/messagepool/repub.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ loop:
// check the baseFee lower bound -- only republish messages that can be included in the chain
// within the next 20 blocks.
for _, m := range chain.msgs {
if !allowNegativeChains(ts.Height()) && m.Message.GasFeeCap.LessThan(baseFeeLowerBound) {
if m.Message.GasFeeCap.LessThan(baseFeeLowerBound) {
chain.Invalidate()
continue loop
}
Expand All @@ -115,7 +115,7 @@ loop:

// we can't fit the current chain but there is gas to spare
// trim it and push it down
chain.Trim(gasLimit, mp, baseFee, true)
chain.Trim(gasLimit, mp, baseFee)
for j := i; j < len(chains)-1; j++ {
if chains[j].Before(chains[j+1]) {
break
Expand Down
48 changes: 21 additions & 27 deletions chain/messagepool/selection.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
tbig "github.com/filecoin-project/go-state-types/big"

"github.com/filecoin-project/lotus/build"
Expand All @@ -23,12 +22,6 @@ var bigBlockGasLimit = big.NewInt(build.BlockGasLimit)

var MaxBlockMessages = 16000

// this is *temporary* mutilation until we have implemented uncapped miner penalties -- it will go
// away in the next fork.
func allowNegativeChains(epoch abi.ChainEpoch) bool {
return epoch < build.UpgradeBreezeHeight+5
}

const MaxBlocks = 15

type msgChain struct {
Expand Down Expand Up @@ -121,7 +114,7 @@ func (mp *MessagePool) selectMessagesOptimal(curTs, ts *types.TipSet, tq float64
return chains[i].Before(chains[j])
})

if !allowNegativeChains(curTs.Height()) && len(chains) != 0 && chains[0].gasPerf < 0 {
if len(chains) != 0 && chains[0].gasPerf < 0 {
log.Warnw("all messages in mpool have non-positive gas performance", "bestGasPerf", chains[0].gasPerf)
return result, nil
}
Expand Down Expand Up @@ -174,7 +167,7 @@ func (mp *MessagePool) selectMessagesOptimal(curTs, ts *types.TipSet, tq float64
last := len(chains)
for i, chain := range chains {
// did we run out of performing chains?
if !allowNegativeChains(curTs.Height()) && chain.gasPerf < 0 {
if chain.gasPerf < 0 {
break
}

Expand Down Expand Up @@ -240,7 +233,7 @@ tailLoop:
for gasLimit >= minGas && last < len(chains) {
// trim if necessary
if chains[last].gasLimit > gasLimit {
chains[last].Trim(gasLimit, mp, baseFee, allowNegativeChains(curTs.Height()))
chains[last].Trim(gasLimit, mp, baseFee)
}

// push down if it hasn't been invalidated
Expand All @@ -266,7 +259,7 @@ tailLoop:
}

// if gasPerf < 0 we have no more profitable chains
if !allowNegativeChains(curTs.Height()) && chain.gasPerf < 0 {
if chain.gasPerf < 0 {
break tailLoop
}

Expand Down Expand Up @@ -307,7 +300,7 @@ tailLoop:
}

// dependencies fit, just trim it
chain.Trim(gasLimit-depGasLimit, mp, baseFee, allowNegativeChains(curTs.Height()))
chain.Trim(gasLimit-depGasLimit, mp, baseFee)
last += i
continue tailLoop
}
Expand Down Expand Up @@ -340,7 +333,7 @@ tailLoop:
}

// is it negative?
if !allowNegativeChains(curTs.Height()) && chain.gasPerf < 0 {
if chain.gasPerf < 0 {
continue
}

Expand All @@ -362,7 +355,7 @@ tailLoop:

// do they fit as is? if it doesn't, trim to make it fit if possible
if chainGasLimit > gasLimit {
chain.Trim(gasLimit-depGasLimit, mp, baseFee, allowNegativeChains(curTs.Height()))
chain.Trim(gasLimit-depGasLimit, mp, baseFee)

if !chain.valid {
continue
Expand Down Expand Up @@ -445,7 +438,7 @@ func (mp *MessagePool) selectMessagesGreedy(curTs, ts *types.TipSet) ([]*types.S
return chains[i].Before(chains[j])
})

if !allowNegativeChains(curTs.Height()) && len(chains) != 0 && chains[0].gasPerf < 0 {
if len(chains) != 0 && chains[0].gasPerf < 0 {
log.Warnw("all messages in mpool have non-positive gas performance", "bestGasPerf", chains[0].gasPerf)
return result, nil
}
Expand All @@ -456,7 +449,7 @@ func (mp *MessagePool) selectMessagesGreedy(curTs, ts *types.TipSet) ([]*types.S
last := len(chains)
for i, chain := range chains {
// did we run out of performing chains?
if !allowNegativeChains(curTs.Height()) && chain.gasPerf < 0 {
if chain.gasPerf < 0 {
break
}

Expand Down Expand Up @@ -485,7 +478,7 @@ func (mp *MessagePool) selectMessagesGreedy(curTs, ts *types.TipSet) ([]*types.S
tailLoop:
for gasLimit >= minGas && last < len(chains) {
// trim
chains[last].Trim(gasLimit, mp, baseFee, allowNegativeChains(curTs.Height()))
chains[last].Trim(gasLimit, mp, baseFee)

// push down if it hasn't been invalidated
if chains[last].valid {
Expand All @@ -505,7 +498,7 @@ tailLoop:
}

// if gasPerf < 0 we have no more profitable chains
if !allowNegativeChains(curTs.Height()) && chain.gasPerf < 0 {
if chain.gasPerf < 0 {
break tailLoop
}

Expand Down Expand Up @@ -567,15 +560,15 @@ func (mp *MessagePool) selectPriorityMessages(pending map[address.Address]map[ui
return chains[i].Before(chains[j])
})

if !allowNegativeChains(ts.Height()) && len(chains) != 0 && chains[0].gasPerf < 0 {
if len(chains) != 0 && chains[0].gasPerf < 0 {
log.Warnw("all priority messages in mpool have negative gas performance", "bestGasPerf", chains[0].gasPerf)
return nil, gasLimit
}

// 3. Merge chains until the block limit, as long as they have non-negative gas performance
last := len(chains)
for i, chain := range chains {
if !allowNegativeChains(ts.Height()) && chain.gasPerf < 0 {
if chain.gasPerf < 0 {
break
}

Expand All @@ -593,7 +586,7 @@ func (mp *MessagePool) selectPriorityMessages(pending map[address.Address]map[ui
tailLoop:
for gasLimit >= minGas && last < len(chains) {
// trim, discarding negative performing messages
chains[last].Trim(gasLimit, mp, baseFee, allowNegativeChains(ts.Height()))
chains[last].Trim(gasLimit, mp, baseFee)

// push down if it hasn't been invalidated
if chains[last].valid {
Expand All @@ -613,7 +606,7 @@ tailLoop:
}

// if gasPerf < 0 we have no more profitable chains
if !allowNegativeChains(ts.Height()) && chain.gasPerf < 0 {
if chain.gasPerf < 0 {
break tailLoop
}

Expand Down Expand Up @@ -689,6 +682,10 @@ func (*MessagePool) getGasReward(msg *types.SignedMessage, baseFee types.BigInt)
}

gasReward := tbig.Mul(maxPremium, types.NewInt(uint64(msg.Message.GasLimit)))
if gasReward.Sign() == -1 {
// penalty multiplier
gasReward = tbig.Mul(gasReward, types.NewInt(3))
}
return gasReward.Int
}

Expand Down Expand Up @@ -764,9 +761,6 @@ func (mp *MessagePool) createMessageChains(actor address.Address, mset map[uint6
balance = new(big.Int).Sub(balance, required)

value := m.Message.Value.Int
if balance.Cmp(value) < 0 {
break
}
balance = new(big.Int).Sub(balance, value)

gasReward := mp.getGasReward(m, baseFee)
Expand Down Expand Up @@ -870,9 +864,9 @@ func (mc *msgChain) Before(other *msgChain) bool {
(mc.gasPerf == other.gasPerf && mc.gasReward.Cmp(other.gasReward) > 0)
}

func (mc *msgChain) Trim(gasLimit int64, mp *MessagePool, baseFee types.BigInt, allowNegative bool) {
func (mc *msgChain) Trim(gasLimit int64, mp *MessagePool, baseFee types.BigInt) {
i := len(mc.msgs) - 1
for i >= 0 && (mc.gasLimit > gasLimit || (!allowNegative && mc.gasPerf < 0)) {
for i >= 0 && (mc.gasLimit > gasLimit || mc.gasPerf < 0) {
gasReward := mp.getGasReward(mc.msgs[i], baseFee)
mc.gasReward = new(big.Int).Sub(mc.gasReward, gasReward)
mc.gasLimit -= mc.msgs[i].Message.GasLimit
Expand Down
9 changes: 5 additions & 4 deletions chain/messagepool/selection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -736,8 +736,6 @@ func TestPriorityMessageSelection2(t *testing.T) {
}

func TestPriorityMessageSelection3(t *testing.T) {
t.Skip("reenable after removing allow negative")

mp, tma := makeTestMpool()

// the actors
Expand Down Expand Up @@ -1241,6 +1239,9 @@ func TestCompetitiveMessageSelectionExp(t *testing.T) {
}

func TestCompetitiveMessageSelectionZipf(t *testing.T) {
if testing.Short() {
t.Skip("skipping in short mode")
}
var capacityBoost, rewardBoost, tqReward float64
seeds := []int64{1947, 1976, 2020, 2100, 10000, 143324, 432432, 131, 32, 45}
for _, seed := range seeds {
Expand Down Expand Up @@ -1268,9 +1269,9 @@ func TestGasReward(t *testing.T) {
GasReward int64
}{
{Premium: 100, FeeCap: 200, BaseFee: 100, GasReward: 100},
{Premium: 100, FeeCap: 200, BaseFee: 210, GasReward: -10},
{Premium: 100, FeeCap: 200, BaseFee: 210, GasReward: -10 * 3},
{Premium: 200, FeeCap: 250, BaseFee: 210, GasReward: 40},
{Premium: 200, FeeCap: 250, BaseFee: 2000, GasReward: -1750},
{Premium: 200, FeeCap: 250, BaseFee: 2000, GasReward: -1750 * 3},
}

mp := new(MessagePool)
Expand Down
2 changes: 1 addition & 1 deletion cli/mpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,7 +446,7 @@ var mpoolReplaceCmd = &cli.Command{
return abi.TokenAmount(config.DefaultDefaultMaxFee), nil
}

messagepool.CapGasFee(mff, &msg, mss.Get().MaxFee)
messagepool.CapGasFee(mff, &msg, mss)
} else {
if cctx.IsSet("gas-limit") {
msg.GasLimit = cctx.Int64("gas-limit")
Expand Down
2 changes: 1 addition & 1 deletion node/impl/full/gas.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,7 @@ func (m *GasModule) GasEstimateMessageGas(ctx context.Context, msg *types.Messag
msg.GasFeeCap = feeCap
}

messagepool.CapGasFee(m.GetMaxFee, msg, spec.Get().MaxFee)
messagepool.CapGasFee(m.GetMaxFee, msg, spec)

return msg, nil
}
2 changes: 1 addition & 1 deletion storage/wdpost_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,7 +812,7 @@ func (s *WindowPoStScheduler) setSender(ctx context.Context, msg *types.Message,
return msg.RequiredFunds(), nil
}

messagepool.CapGasFee(mff, msg, big.Min(big.Sub(avail, msg.Value), msg.RequiredFunds()))
messagepool.CapGasFee(mff, msg, &api.MessageSendSpec{MaxFee: big.Min(big.Sub(avail, msg.Value), msg.RequiredFunds())})
}
return nil
}