Skip to content

Commit

Permalink
feat!: multiple blobs per PFB (#1154)
Browse files Browse the repository at this point in the history
## Overview

This PR implements the ability to add an arbitrary number of blobs to a
single PFB. Leaving as a draft until we merge the blocking
[PR](celestiaorg/celestia-core#922) in core, and
probably tidy up a bit, rebase, or add a few more unit tests.

In part thanks to us planning ahead earlier this year, there's actually
not that many required changes to get multiple blobs per PFB. Mostly
just adding tests, using a **new mechanism to calculate the share
commitments**, making a few things a slice instead of a single thing,
and minor adjustments to make square estimation/square layout work with
multiple blobs.

closes #388 

## Checklist

- [x] New and updated code has appropriate documentation
- [x] New and updated code has new and/or updated testing
- [x] Required CI checks are passing
- [x] Visual proof for any user facing features like CLI or
documentation updates
- [x] Linked issues closed with keywords

Co-authored-by: Rootul P <[email protected]>
  • Loading branch information
evan-forbes and rootulp authored Jan 16, 2023
1 parent c57a0bf commit 222bb1e
Show file tree
Hide file tree
Showing 31 changed files with 1,087 additions and 390 deletions.
2 changes: 1 addition & 1 deletion app/check_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func (app *App) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
if _, ok := msg.(*blobtypes.MsgPayForBlob); !ok {
continue
}
return sdkerrors.ResponseCheckTxWithEvents(blobtypes.ErrBloblessPFB, 0, 0, []abci.Event{}, false)
return sdkerrors.ResponseCheckTxWithEvents(blobtypes.ErrNoBlobs, 0, 0, []abci.Event{}, false)
}
// don't do anything special if we have a normal transaction
return app.BaseApp.CheckTx(req)
Expand Down
28 changes: 23 additions & 5 deletions app/estimate_square_size.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package app

import (
"encoding/binary"
"math"

"github.com/celestiaorg/celestia-app/pkg/appconsts"
"github.com/celestiaorg/celestia-app/pkg/shares"
blobtypes "github.com/celestiaorg/celestia-app/x/blob/types"
coretypes "github.com/tendermint/tendermint/types"
)

Expand All @@ -23,8 +25,7 @@ func estimateSquareSize(txs []parsedTx) (squareSize uint64, nonreserveStart int)
if len(ptx.normalTx) != 0 {
continue
}
dataUsed := len(ptx.blobTx.Blobs[0].Data)
blobSharesUsed += shares.SparseSharesNeeded(uint32(dataUsed))
blobSharesUsed += blobtypes.BlobTxSharesUsed(ptx.blobTx)
}

// assume that we have to add a lot of padding by simply doubling the number
Expand All @@ -48,7 +49,8 @@ func estimateSquareSize(txs []parsedTx) (squareSize uint64, nonreserveStart int)

// estimateTxShares estimates the number of shares used by transactions.
func estimateTxShares(squareSize uint64, ptxs []parsedTx) int {
maxWTxOverhead := maxWrappedTxOverhead(squareSize)
maxWTxOverhead := maxIndexWrapperOverhead(squareSize)
maxIndexOverhead := maxIndexOverhead(squareSize)
txbytes := 0
for _, pTx := range ptxs {
if len(pTx.normalTx) != 0 {
Expand All @@ -57,7 +59,7 @@ func estimateTxShares(squareSize uint64, ptxs []parsedTx) int {
txbytes += txLen
continue
}
txLen := len(pTx.blobTx.Tx) + maxWTxOverhead
txLen := len(pTx.blobTx.Tx) + maxWTxOverhead + (maxIndexOverhead * len(pTx.blobTx.Blobs))
txLen += shares.DelimLen(uint64(txLen))
txbytes += txLen
}
Expand All @@ -70,7 +72,7 @@ func estimateTxShares(squareSize uint64, ptxs []parsedTx) int {
//
// TODO: make more efficient by only generating these numbers once or something
// similar. This function alone can take up to 5ms.
func maxWrappedTxOverhead(squareSize uint64) int {
func maxIndexWrapperOverhead(squareSize uint64) int {
maxTxLen := squareSize * squareSize * appconsts.ContinuationCompactShareContentSize
wtx, err := coretypes.MarshalIndexWrapper(
make([]byte, maxTxLen),
Expand All @@ -81,3 +83,19 @@ func maxWrappedTxOverhead(squareSize uint64) int {
}
return len(wtx) - int(maxTxLen)
}

// maxIndexOverhead calculates the maximum amount of overhead in bytes that
// could occur by adding an index to an IndexWrapper.
func maxIndexOverhead(squareSize uint64) int {
maxShareIndex := squareSize * squareSize
maxIndexLen := binary.PutUvarint(make([]byte, binary.MaxVarintLen32), maxShareIndex)
wtx, err := coretypes.MarshalIndexWrapper(make([]byte, 1), uint32(maxShareIndex))
if err != nil {
panic(err)
}
wtx2, err := coretypes.MarshalIndexWrapper(make([]byte, 1), uint32(maxShareIndex), uint32(maxShareIndex-1))
if err != nil {
panic(err)
}
return len(wtx2) - len(wtx) + maxIndexLen
}
106 changes: 106 additions & 0 deletions app/estimate_square_size_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ package app
import (
"testing"

"github.com/celestiaorg/celestia-app/app/encoding"
"github.com/celestiaorg/celestia-app/pkg/appconsts"
"github.com/celestiaorg/celestia-app/pkg/shares"
"github.com/celestiaorg/celestia-app/testutil/blobfactory"
"github.com/celestiaorg/celestia-app/testutil/testfactory"
blobtypes "github.com/celestiaorg/celestia-app/x/blob/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
coretypes "github.com/tendermint/tendermint/types"
Expand Down Expand Up @@ -41,6 +45,67 @@ func Test_estimateSquareSize(t *testing.T) {
}
}

func Test_estimateSquareSize_MultiBlob(t *testing.T) {
enc := encoding.MakeConfig(ModuleEncodingRegisters...)
acc := "account"
kr := testfactory.GenerateKeyring(acc)
signer := blobtypes.NewKeyringSigner(kr, acc, "chainid")
type test struct {
name string
getBlobSizes func() [][]int
expectedSquareSize uint64
expectedStartingShareIndex int
}
tests := []test{
{
"single share multiblob transaction",
func() [][]int { return [][]int{{4}} },
2, 1,
},
{
"10 multiblob single share transactions",
func() [][]int {
return blobfactory.Repeat([]int{100}, 10)
},
8, 7,
},
{
"10 multiblob 2 share transactions",
func() [][]int {
return blobfactory.Repeat([]int{1000}, 10)
},
8, 7,
},
{
"10 multiblob 4 share transactions",
func() [][]int {
return blobfactory.Repeat([]int{2000}, 10)
},
16, 7,
},
{
"100 multiblob single share transaction", func() [][]int {
return [][]int{blobfactory.Repeat(int(100), 100)}
},
16, 5,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
txs := blobfactory.ManyMultiBlobTxSameSigner(
t,
enc.TxConfig.TxEncoder(),
signer,
tt.getBlobSizes(),
)
ptxs := parseTxs(enc.TxConfig, shares.TxsToBytes(txs))
resSquareSize, resStart := estimateSquareSize(ptxs)
require.Equal(t, tt.expectedSquareSize, resSquareSize)
require.Equal(t, tt.expectedStartingShareIndex, resStart)
})
}
}

func Test_estimateTxShares(t *testing.T) {
type test struct {
name string
Expand Down Expand Up @@ -87,3 +152,44 @@ func Test_estimateTxShares(t *testing.T) {
})
}
}

// The point of this test is to fail if anything to do with the serialization
// of index wrappers change, as changes could lead to tricky bugs.
func Test_expected_maxIndexWrapperOverhead(t *testing.T) {
assert.Equal(t, 2, maxIndexOverhead(4))
assert.Equal(t, 5, maxIndexOverhead(128))
assert.Equal(t, 6, maxIndexOverhead(512))
assert.Equal(t, 12, maxIndexWrapperOverhead(4))
assert.Equal(t, 16, maxIndexWrapperOverhead(128))
assert.Equal(t, 16, maxIndexWrapperOverhead(512))
}

func Test_maxIndexWrapperOverhead(t *testing.T) {
type test struct {
squareSize int
blobs int
}
tests := []test{
{4, 2},
{32, 2},
{128, 1},
{128, 10},
{128, 1000},
{512, 4},
}
for i, tt := range tests {
maxTxLen := tt.squareSize * tt.squareSize * appconsts.ContinuationCompactShareContentSize
blobLens := make([]uint32, tt.blobs)
for i := 0; i < tt.blobs; i++ {
blobLens[i] = uint32(tt.squareSize * tt.squareSize)
}
tx := make([]byte, maxTxLen)
wtx, err := coretypes.MarshalIndexWrapper(tx, blobLens...)
require.NoError(t, err)

wrapperOverhead := maxIndexWrapperOverhead(uint64(tt.squareSize))
indexOverhead := maxIndexOverhead(uint64(tt.squareSize)) * tt.blobs

assert.LessOrEqual(t, len(wtx)-len(tx), wrapperOverhead+indexOverhead, i)
}
}
6 changes: 3 additions & 3 deletions app/parse_txs.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ type parsedTx struct {
normalTx []byte
// blobTx is the processed blob transaction. this field is only filled if
// the transaction has blobs attached
blobTx core.BlobTx
shareIndex uint32
blobTx core.BlobTx
shareIndexes []uint32
}

// parseTxs decodes raw tendermint txs along with checking for and processing
Expand Down Expand Up @@ -52,7 +52,7 @@ func processTxs(logger log.Logger, txs []parsedTx) [][]byte {

// if this is a blob transaction, then we need to encode and wrap the
// underlying MsgPFB containing transaction
wTx, err := coretypes.MarshalIndexWrapper(pTx.blobTx.Tx, pTx.shareIndex)
wTx, err := coretypes.MarshalIndexWrapper(pTx.blobTx.Tx, pTx.shareIndexes...)
if err != nil {
// note: Its not safe to bubble this error up and stop the block
// creation process.
Expand Down
2 changes: 1 addition & 1 deletion app/prepare_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
coretypes "github.com/tendermint/tendermint/types"
)

// PrepareProposal fullfills the celestia-core version of the ABCI interface by
// PrepareProposal fulfills the celestia-core version of the ABCI interface by
// preparing the proposal block data. The square size is determined by first
// estimating it via the size of the passed block data. Then, this method
// generates the data root for the proposal block and passes it back to
Expand Down
16 changes: 2 additions & 14 deletions app/process_proposal.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/celestiaorg/celestia-app/pkg/da"
"github.com/celestiaorg/celestia-app/pkg/inclusion"
"github.com/celestiaorg/celestia-app/pkg/shares"
"github.com/celestiaorg/celestia-app/x/blob/types"
blobtypes "github.com/celestiaorg/celestia-app/x/blob/types"
"github.com/celestiaorg/rsmt2d"
sdk "github.com/cosmos/cosmos-sdk/types"
Expand Down Expand Up @@ -71,7 +70,6 @@ func (app *App) ProcessProposal(req abci.RequestProcessProposal) abci.ResponsePr

// iterate over all of the MsgPayForBlob transactions and ensure that their
// commitments are subtree roots of the data root.
commitmentCounter := 0
for _, rawTx := range req.BlockData.Txs {
tx := rawTx
wrappedTx, isWrapped := coretypes.UnmarshalIndexWrapper(rawTx)
Expand Down Expand Up @@ -118,7 +116,7 @@ func (app *App) ProcessProposal(req abci.RequestProcessProposal) abci.ResponsePr
}
}

commitment, err := inclusion.GetMultiCommit(cacher, dah, wrappedTx.ShareIndexes, []uint32{pfb.BlobSize})
commitment, err := inclusion.GetMultiCommit(cacher, dah, wrappedTx.ShareIndexes, pfb.BlobSizes)
if err != nil {
logInvalidPropBlockError(app.Logger(), req.Header, "commitment not found", err)
return abci.ResponseProcessProposal{
Expand All @@ -132,26 +130,16 @@ func (app *App) ProcessProposal(req abci.RequestProcessProposal) abci.ResponsePr
Result: abci.ResponseProcessProposal_REJECT,
}
}

commitmentCounter++
}

// compare the number of MPFBs and blobs, if they aren't
// identical, then we already know this block is invalid
if commitmentCounter != len(req.BlockData.Blobs) {
logInvalidPropBlock(app.Logger(), req.Header, "varying number of MsgPayForBlob and blobs in the same block")
return abci.ResponseProcessProposal{
Result: abci.ResponseProcessProposal_REJECT,
}
}
return abci.ResponseProcessProposal{
Result: abci.ResponseProcessProposal_ACCEPT,
}
}

func hasPFB(msgs []sdk.Msg) (*blobtypes.MsgPayForBlob, bool) {
for _, msg := range msgs {
if pfb, ok := msg.(*types.MsgPayForBlob); ok {
if pfb, ok := msg.(*blobtypes.MsgPayForBlob); ok {
return pfb, true
}
}
Expand Down
45 changes: 36 additions & 9 deletions app/test/block_size_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/celestiaorg/celestia-app/testutil/blobfactory"
"github.com/celestiaorg/celestia-app/testutil/network"
"github.com/celestiaorg/celestia-app/x/blob"
"github.com/celestiaorg/celestia-app/x/blob/types"
blobtypes "github.com/celestiaorg/celestia-app/x/blob/types"

sdk "github.com/cosmos/cosmos-sdk/types"
Expand Down Expand Up @@ -93,6 +94,23 @@ func (s *IntegrationTestSuite) TestMaxBlockSize() {
s.kr,
c.GRPCClient,
950000,
1,
false,
s.cfg.ChainID,
s.accounts[:20],
)
}

// Tendermint's default tx size limit is 1 MiB, so we get close to that by
// generating transactions of size 600 KiB because 3 blobs per transaction *
// 200,000 bytes each = 600,000 total bytes = 600 KiB per transaction.
randMultiBlob1MbTxGen := func(c client.Context) []coretypes.Tx {
return blobfactory.RandBlobTxsWithAccounts(
s.cfg.TxConfig.TxEncoder(),
s.kr,
c.GRPCClient,
200000, // 200 KiB
3,
false,
s.cfg.ChainID,
s.accounts[:20],
Expand All @@ -108,10 +126,11 @@ func (s *IntegrationTestSuite) TestMaxBlockSize() {
s.cfg.TxConfig.TxEncoder(),
s.kr,
c.GRPCClient,
500000,
50000,
8,
true,
s.cfg.ChainID,
s.accounts[20:],
s.accounts[:80],
)
}

Expand All @@ -125,6 +144,10 @@ func (s *IntegrationTestSuite) TestMaxBlockSize() {
"20 ~1Mb txs",
equallySized1MbTxGen,
},
{
"20 ~1Mb multiblob txs",
randMultiBlob1MbTxGen,
},
{
"80 random txs",
randoTxGen,
Expand Down Expand Up @@ -154,8 +177,11 @@ func (s *IntegrationTestSuite) TestMaxBlockSize() {
for _, hash := range hashes {
// TODO: reenable fetching and verifying proofs
resp, err := queryTx(val.ClientCtx, hash, false)
require.NoError(err)
require.NotNil(resp)
assert.NoError(err)
assert.NotNil(resp)
if resp == nil {
continue
}
assert.Equal(abci.CodeTypeOK, resp.TxResult.Code)
heights[resp.Height]++
// ensure that some gas was used
Expand Down Expand Up @@ -213,8 +239,8 @@ func (s *IntegrationTestSuite) TestSubmitPayForBlob() {
},
{
"large random typical",
mustNewBlob([]byte{2, 3, 4, 5, 6, 7, 8, 9}, tmrand.Bytes(700000)),
[]blobtypes.TxBuilderOption{
mustNewBlob([]byte{2, 3, 4, 5, 6, 7, 8, 9}, tmrand.Bytes(350000)),
[]types.TxBuilderOption{
blobtypes.SetFeeAmount(sdk.NewCoins(sdk.NewCoin(app.BondDenom, sdk.NewInt(10)))),
},
},
Expand All @@ -241,7 +267,7 @@ func (s *IntegrationTestSuite) TestSubmitPayForBlob() {
require.NoError(s.network.WaitForNextBlock())
}
signer := blobtypes.NewKeyringSigner(s.kr, s.accounts[0], val.ClientCtx.ChainID)
res, err := blob.SubmitPayForBlob(context.TODO(), signer, val.ClientCtx.GRPCClient, tc.blob, 1000000000, tc.opts...)
res, err := blob.SubmitPayForBlob(context.TODO(), signer, val.ClientCtx.GRPCClient, []*blobtypes.Blob{tc.blob, tc.blob}, 1000000000, tc.opts...)
require.NoError(err)
require.NotNil(res)
assert.Equal(abci.CodeTypeOK, res.Code)
Expand All @@ -257,7 +283,8 @@ func (s *IntegrationTestSuite) TestUnwrappedPFBRejection() {
s.cfg.TxConfig.TxEncoder(),
s.kr,
val.ClientCtx.GRPCClient,
100000,
int(100000),
1,
false,
s.cfg.ChainID,
s.accounts[:1],
Expand All @@ -268,7 +295,7 @@ func (s *IntegrationTestSuite) TestUnwrappedPFBRejection() {

res, err := val.ClientCtx.BroadcastTxSync(btx.Tx)
require.NoError(t, err)
require.Equal(t, blobtypes.ErrBloblessPFB.ABCICode(), res.Code)
require.Equal(t, blobtypes.ErrNoBlobs.ABCICode(), res.Code)
}

func queryTx(clientCtx client.Context, hashHexStr string, prove bool) (*rpctypes.ResultTx, error) {
Expand Down
Loading

0 comments on commit 222bb1e

Please sign in to comment.