Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: curio/lmrpc: Ingest backpressure #11865

Merged
merged 1 commit into from
Apr 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 107 additions & 27 deletions curiosrc/market/lmrpc/lmrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import (

var log = logging.Logger("lmrpc")

const backpressureWaitTime = 30 * time.Second

func ServeCurioMarketRPCFromConfig(db *harmonydb.DB, full api.FullNode, cfg *config.CurioConfig) error {
return forEachMarketRPC(cfg, func(maddr string, listen string) error {
addr, err := address.NewFromString(maddr)
Expand Down Expand Up @@ -248,46 +250,72 @@ func ServeCurioMarketRPC(db *harmonydb.DB, full api.FullNode, maddr address.Addr
var refID int64
var pieceWasCreated bool

comm, err := db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
var pieceID int64
// Attempt to select the piece ID first
err = tx.QueryRow(`SELECT id FROM parked_pieces WHERE piece_cid = $1`, deal.DealProposal.PieceCID.String()).Scan(&pieceID)
for {
var backpressureWait bool

if err != nil {
if err == pgx.ErrNoRows {
// Piece does not exist, attempt to insert
err = tx.QueryRow(`
comm, err := db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
// BACKPRESSURE
wait, err := maybeApplyBackpressure(tx, conf.Ingest)
if err != nil {
return false, xerrors.Errorf("backpressure checks: %w", err)
}
if wait {
backpressureWait = true
return false, nil
}

var pieceID int64
// Attempt to select the piece ID first
err = tx.QueryRow(`SELECT id FROM parked_pieces WHERE piece_cid = $1`, deal.DealProposal.PieceCID.String()).Scan(&pieceID)

if err != nil {
if err == pgx.ErrNoRows {
// Piece does not exist, attempt to insert
err = tx.QueryRow(`
INSERT INTO parked_pieces (piece_cid, piece_padded_size, piece_raw_size)
VALUES ($1, $2, $3)
ON CONFLICT (piece_cid) DO NOTHING
RETURNING id`, deal.DealProposal.PieceCID.String(), int64(pieceSize.Padded()), int64(pieceSize)).Scan(&pieceID)
if err != nil {
return false, xerrors.Errorf("inserting new parked piece and getting id: %w", err)
if err != nil {
return false, xerrors.Errorf("inserting new parked piece and getting id: %w", err)
}
pieceWasCreated = true // New piece was created
} else {
// Some other error occurred during select
return false, xerrors.Errorf("checking existing parked piece: %w", err)
}
pieceWasCreated = true // New piece was created
} else {
// Some other error occurred during select
return false, xerrors.Errorf("checking existing parked piece: %w", err)
pieceWasCreated = false // Piece already exists, no new piece was created
}
} else {
pieceWasCreated = false // Piece already exists, no new piece was created
}

// Add parked_piece_ref
err = tx.QueryRow(`INSERT INTO parked_piece_refs (piece_id, data_url)
// Add parked_piece_ref
err = tx.QueryRow(`INSERT INTO parked_piece_refs (piece_id, data_url)
VALUES ($1, $2) RETURNING ref_id`, pieceID, dataUrl.String()).Scan(&refID)
if err != nil {
return false, xerrors.Errorf("inserting parked piece ref: %w", err)
}

// If everything went well, commit the transaction
return true, nil // This will commit the transaction
}, harmonydb.OptionRetry())
if err != nil {
return false, xerrors.Errorf("inserting parked piece ref: %w", err)
return api.SectorOffset{}, xerrors.Errorf("inserting parked piece: %w", err)
}
if !comm {
if backpressureWait {
// Backpressure was applied, wait and try again
select {
case <-time.After(backpressureWaitTime):
case <-ctx.Done():
return api.SectorOffset{}, xerrors.Errorf("context done while waiting for backpressure: %w", ctx.Err())
}
continue
}

// If everything went well, commit the transaction
return true, nil // This will commit the transaction
}, harmonydb.OptionRetry())
if err != nil {
return api.SectorOffset{}, xerrors.Errorf("inserting parked piece: %w", err)
}
if !comm {
return api.SectorOffset{}, xerrors.Errorf("piece tx didn't commit")
return api.SectorOffset{}, xerrors.Errorf("piece tx didn't commit")
}

break
}

// wait for piece to be parked
Expand Down Expand Up @@ -420,3 +448,55 @@ func ServeCurioMarketRPC(db *harmonydb.DB, full api.FullNode, maddr address.Addr

return server.ListenAndServe()
}

func maybeApplyBackpressure(tx *harmonydb.Tx, cfg config.CurioIngestConfig) (wait bool, err error) {
var bufferedSDR, bufferedTrees, bufferedPoRep int
err = tx.QueryRow(`WITH BufferedSDR AS (
SELECT SUM(buffered_count) AS buffered_sdr_count
FROM (
SELECT COUNT(p.task_id_sdr) - COUNT(t.owner_id) AS buffered_count
FROM sectors_sdr_pipeline p
LEFT JOIN harmony_task t ON p.task_id_sdr = t.id
WHERE p.after_sdr = false
UNION ALL
SELECT COUNT(1) AS buffered_count
FROM parked_pieces
WHERE complete = false
) AS subquery
),
BufferedTrees AS (
SELECT COUNT(p.task_id_tree_r) - COUNT(t.owner_id) AS buffered_trees_count
FROM sectors_sdr_pipeline p
LEFT JOIN harmony_task t ON p.task_id_tree_r = t.id
WHERE p.after_sdr = true AND p.after_tree_r = false
),
BufferedPoRep AS (
SELECT COUNT(p.task_id_porep) - COUNT(t.owner_id) AS buffered_porep_count
FROM sectors_sdr_pipeline p
LEFT JOIN harmony_task t ON p.task_id_porep = t.id
WHERE p.after_tree_r = true AND p.after_porep = false
)
SELECT
(SELECT buffered_sdr_count FROM BufferedSDR) AS total_buffered,
(SELECT buffered_trees_count FROM BufferedTrees) AS buffered_trees_count,
(SELECT buffered_porep_count FROM BufferedPoRep) AS buffered_porep_count
`).Scan(&bufferedSDR, &bufferedTrees, &bufferedPoRep)
if err != nil {
return false, xerrors.Errorf("counting parked pieces: %w", err)
}

if cfg.MaxQueueSDR != 0 && bufferedSDR > cfg.MaxQueueSDR {
snadrus marked this conversation as resolved.
Show resolved Hide resolved
log.Debugw("backpressure", "reason", "too many SDR tasks", "buffered", bufferedSDR, "max", cfg.MaxQueueSDR)
return true, nil
}
if cfg.MaxQueueTrees != 0 && bufferedTrees > cfg.MaxQueueTrees {
log.Debugw("backpressure", "reason", "too many tree tasks", "buffered", bufferedTrees, "max", cfg.MaxQueueTrees)
return true, nil
}
if cfg.MaxQueuePoRep != 0 && bufferedPoRep > cfg.MaxQueuePoRep {
log.Debugw("backpressure", "reason", "too many PoRep tasks", "buffered", bufferedPoRep, "max", cfg.MaxQueuePoRep)
return true, nil
}

return false, nil
}
31 changes: 31 additions & 0 deletions documentation/en/default-curio-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,37 @@
#SingleRecoveringPartitionPerPostMessage = false


[Ingest]
# Maximum number of sectors that can be queued waiting for SDR to start processing.
# 0 = unlimited
# Note: This mechanism will delay taking deal data from markets, providing backpressure to the market subsystem.
# The SDR queue includes deals which are in the process of entering the sealing pipeline - size of this queue
# will also impact the maximum number of ParkPiece tasks which can run concurrently.
#
# SDR queue is the first queue in the sealing pipeline, meaning that it should be used as the primary backpressure mechanism.
#
# type: int
#MaxQueueSDR = 8

# Maximum number of sectors that can be queued waiting for SDRTrees to start processing.
# 0 = unlimited
# Note: This mechanism will delay taking deal data from markets, providing backpressure to the market subsystem.
# In case of the trees tasks it is possible that this queue grows more than this limit, the backpressure is only
# applied to sectors entering the pipeline.
#
# type: int
#MaxQueueTrees = 0

# Maximum number of sectors that can be queued waiting for PoRep to start processing.
# 0 = unlimited
# Note: This mechanism will delay taking deal data from markets, providing backpressure to the market subsystem.
# Like with the trees tasks, it is possible that this queue grows more than this limit, the backpressure is only
# applied to sectors entering the pipeline.
#
# type: int
#MaxQueuePoRep = 0


[Journal]
# Events of the form: "system1:event1,system1:event2[,...]"
#
Expand Down
5 changes: 5 additions & 0 deletions node/config/def.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,5 +362,10 @@ func DefaultCurioConfig() *CurioConfig {
PartitionCheckTimeout: Duration(20 * time.Minute),
SingleCheckTimeout: Duration(10 * time.Minute),
},
Ingest: CurioIngestConfig{
MaxQueueSDR: 8, // default to 8 sectors before sdr
MaxQueueTrees: 0, // default don't use this limit
MaxQueuePoRep: 0, // default don't use this limit
},
}
}
40 changes: 40 additions & 0 deletions node/config/doc_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions node/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type CurioConfig struct {
// Addresses of wallets per MinerAddress (one of the fields).
Addresses []CurioAddresses
Proving CurioProvingConfig
Ingest CurioIngestConfig
Journal JournalConfig
Apis ApisConfig
}
Expand Down Expand Up @@ -826,6 +827,31 @@ type CurioProvingConfig struct {
SingleRecoveringPartitionPerPostMessage bool
}

type CurioIngestConfig struct {
// Maximum number of sectors that can be queued waiting for SDR to start processing.
// 0 = unlimited
// Note: This mechanism will delay taking deal data from markets, providing backpressure to the market subsystem.
// The SDR queue includes deals which are in the process of entering the sealing pipeline - size of this queue
// will also impact the maximum number of ParkPiece tasks which can run concurrently.
//
// SDR queue is the first queue in the sealing pipeline, meaning that it should be used as the primary backpressure mechanism.
MaxQueueSDR int

// Maximum number of sectors that can be queued waiting for SDRTrees to start processing.
// 0 = unlimited
// Note: This mechanism will delay taking deal data from markets, providing backpressure to the market subsystem.
// In case of the trees tasks it is possible that this queue grows more than this limit, the backpressure is only
// applied to sectors entering the pipeline.
MaxQueueTrees int

// Maximum number of sectors that can be queued waiting for PoRep to start processing.
// 0 = unlimited
// Note: This mechanism will delay taking deal data from markets, providing backpressure to the market subsystem.
// Like with the trees tasks, it is possible that this queue grows more than this limit, the backpressure is only
// applied to sectors entering the pipeline.
MaxQueuePoRep int
}

// API contains configs for API endpoint
type API struct {
// Binding address for the Lotus API
Expand Down
Loading