Skip to content

Commit

Permalink
feat: curio/lmrpc: Ingest backpressure
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Apr 10, 2024
1 parent 6443afa commit 803cd1c
Show file tree
Hide file tree
Showing 5 changed files with 209 additions and 27 deletions.
134 changes: 107 additions & 27 deletions curiosrc/market/lmrpc/lmrpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import (

var log = logging.Logger("lmrpc")

const backpressureWaitTime = 30 * time.Second

func ServeCurioMarketRPCFromConfig(db *harmonydb.DB, full api.FullNode, cfg *config.CurioConfig) error {
return forEachMarketRPC(cfg, func(maddr string, listen string) error {
addr, err := address.NewFromString(maddr)
Expand Down Expand Up @@ -248,46 +250,72 @@ func ServeCurioMarketRPC(db *harmonydb.DB, full api.FullNode, maddr address.Addr
var refID int64
var pieceWasCreated bool

comm, err := db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
var pieceID int64
// Attempt to select the piece ID first
err = tx.QueryRow(`SELECT id FROM parked_pieces WHERE piece_cid = $1`, deal.DealProposal.PieceCID.String()).Scan(&pieceID)
for {
var backpressureWait bool

if err != nil {
if err == pgx.ErrNoRows {
// Piece does not exist, attempt to insert
err = tx.QueryRow(`
comm, err := db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
// BACKPRESSURE
wait, err := maybeApplyBackpressure(tx, conf.Ingest)
if err != nil {
return false, xerrors.Errorf("backpressure checks: %w", err)
}
if wait {
backpressureWait = true
return false, nil
}

var pieceID int64
// Attempt to select the piece ID first
err = tx.QueryRow(`SELECT id FROM parked_pieces WHERE piece_cid = $1`, deal.DealProposal.PieceCID.String()).Scan(&pieceID)

if err != nil {
if err == pgx.ErrNoRows {
// Piece does not exist, attempt to insert
err = tx.QueryRow(`
INSERT INTO parked_pieces (piece_cid, piece_padded_size, piece_raw_size)
VALUES ($1, $2, $3)
ON CONFLICT (piece_cid) DO NOTHING
RETURNING id`, deal.DealProposal.PieceCID.String(), int64(pieceSize.Padded()), int64(pieceSize)).Scan(&pieceID)
if err != nil {
return false, xerrors.Errorf("inserting new parked piece and getting id: %w", err)
if err != nil {
return false, xerrors.Errorf("inserting new parked piece and getting id: %w", err)
}
pieceWasCreated = true // New piece was created
} else {
// Some other error occurred during select
return false, xerrors.Errorf("checking existing parked piece: %w", err)
}
pieceWasCreated = true // New piece was created
} else {
// Some other error occurred during select
return false, xerrors.Errorf("checking existing parked piece: %w", err)
pieceWasCreated = false // Piece already exists, no new piece was created
}
} else {
pieceWasCreated = false // Piece already exists, no new piece was created
}

// Add parked_piece_ref
err = tx.QueryRow(`INSERT INTO parked_piece_refs (piece_id, data_url)
// Add parked_piece_ref
err = tx.QueryRow(`INSERT INTO parked_piece_refs (piece_id, data_url)
VALUES ($1, $2) RETURNING ref_id`, pieceID, dataUrl.String()).Scan(&refID)
if err != nil {
return false, xerrors.Errorf("inserting parked piece ref: %w", err)
}

// If everything went well, commit the transaction
return true, nil // This will commit the transaction
}, harmonydb.OptionRetry())
if err != nil {
return false, xerrors.Errorf("inserting parked piece ref: %w", err)
return api.SectorOffset{}, xerrors.Errorf("inserting parked piece: %w", err)
}
if !comm {
if backpressureWait {
// Backpressure was applied, wait and try again
select {
case <-time.After(backpressureWaitTime):
case <-ctx.Done():
return api.SectorOffset{}, xerrors.Errorf("context done while waiting for backpressure: %w", ctx.Err())
}
continue
}

// If everything went well, commit the transaction
return true, nil // This will commit the transaction
}, harmonydb.OptionRetry())
if err != nil {
return api.SectorOffset{}, xerrors.Errorf("inserting parked piece: %w", err)
}
if !comm {
return api.SectorOffset{}, xerrors.Errorf("piece tx didn't commit")
return api.SectorOffset{}, xerrors.Errorf("piece tx didn't commit")
}

break
}

// wait for piece to be parked
Expand Down Expand Up @@ -420,3 +448,55 @@ func ServeCurioMarketRPC(db *harmonydb.DB, full api.FullNode, maddr address.Addr

return server.ListenAndServe()
}

func maybeApplyBackpressure(tx *harmonydb.Tx, cfg config.CurioIngestConfig) (wait bool, err error) {
var bufferedSDR, bufferedTrees, bufferedPoRep int
err = tx.QueryRow(`WITH BufferedSDR AS (
SELECT SUM(buffered_count) AS buffered_sdr_count
FROM (
SELECT COUNT(p.task_id_sdr) - COUNT(t.owner_id) AS buffered_count
FROM sectors_sdr_pipeline p
LEFT JOIN harmony_task t ON p.task_id_sdr = t.id
WHERE p.after_sdr = false
UNION ALL
SELECT COUNT(1) AS buffered_count
FROM parked_pieces
WHERE complete = false
) AS subquery
),
BufferedTrees AS (
SELECT COUNT(p.task_id_tree_r) - COUNT(t.owner_id) AS buffered_trees_count
FROM sectors_sdr_pipeline p
LEFT JOIN harmony_task t ON p.task_id_tree_r = t.id
WHERE p.after_sdr = true AND p.after_tree_r = false
),
BufferedPoRep AS (
SELECT COUNT(p.task_id_porep) - COUNT(t.owner_id) AS buffered_porep_count
FROM sectors_sdr_pipeline p
LEFT JOIN harmony_task t ON p.task_id_porep = t.id
WHERE p.after_tree_r = true AND p.after_porep = false
)
SELECT
(SELECT buffered_sdr_count FROM BufferedSDR) AS total_buffered,
(SELECT buffered_trees_count FROM BufferedTrees) AS buffered_trees_count,
(SELECT buffered_porep_count FROM BufferedPoRep) AS buffered_porep_count
`).Scan(&bufferedSDR, &bufferedTrees, &bufferedPoRep)
if err != nil {
return false, xerrors.Errorf("counting parked pieces: %w", err)
}

if cfg.MaxQueueSDR != 0 && bufferedSDR > cfg.MaxQueueSDR {
log.Debugw("backpressure", "reason", "too many SDR tasks", "buffered", bufferedSDR, "max", cfg.MaxQueueSDR)
return true, nil
}
if cfg.MaxQueueTrees != 0 && bufferedTrees > cfg.MaxQueueTrees {
log.Debugw("backpressure", "reason", "too many tree tasks", "buffered", bufferedTrees, "max", cfg.MaxQueueTrees)
return true, nil
}
if cfg.MaxQueuePoRep != 0 && bufferedPoRep > cfg.MaxQueuePoRep {
log.Debugw("backpressure", "reason", "too many PoRep tasks", "buffered", bufferedPoRep, "max", cfg.MaxQueuePoRep)
return true, nil
}

return false, nil
}
31 changes: 31 additions & 0 deletions documentation/en/default-curio-config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,37 @@
#SingleRecoveringPartitionPerPostMessage = false


[Ingest]
# Maximum number of sectors that can be queued waiting for SDR to start processing.
# 0 = unlimited
# Note: This mechanism will delay taking deal data from markets, providing backpressure to the market subsystem.
# The SDR queue includes deals which are in the process of entering the sealing pipeline - size of this queue
# will also impact the maximum number of ParkPiece tasks which can run concurrently.
#
# SDR queue is the first queue in the sealing pipeline, meaning that it should be used as the primary backpressure mechanism.
#
# type: int
#MaxQueueSDR = 8

# Maximum number of sectors that can be queued waiting for SDRTrees to start processing.
# 0 = unlimited
# Note: This mechanism will delay taking deal data from markets, providing backpressure to the market subsystem.
# In case of the trees tasks it is possible that this queue grows more than this limit, the backpressure is only
# applied to sectors entering the pipeline.
#
# type: int
#MaxQueueTrees = 0

# Maximum number of sectors that can be queued waiting for PoRep to start processing.
# 0 = unlimited
# Note: This mechanism will delay taking deal data from markets, providing backpressure to the market subsystem.
# Like with the trees tasks, it is possible that this queue grows more than this limit, the backpressure is only
# applied to sectors entering the pipeline.
#
# type: int
#MaxQueuePoRep = 0


[Journal]
# Events of the form: "system1:event1,system1:event2[,...]"
#
Expand Down
5 changes: 5 additions & 0 deletions node/config/def.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,5 +362,10 @@ func DefaultCurioConfig() *CurioConfig {
PartitionCheckTimeout: Duration(20 * time.Minute),
SingleCheckTimeout: Duration(10 * time.Minute),
},
Ingest: CurioIngestConfig{
MaxQueueSDR: 8, // default to 8 sectors before sdr
MaxQueueTrees: 0, // default don't use this limit
MaxQueuePoRep: 0, // default don't use this limit
},
}
}
40 changes: 40 additions & 0 deletions node/config/doc_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions node/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ type CurioConfig struct {
// Addresses of wallets per MinerAddress (one of the fields).
Addresses []CurioAddresses
Proving CurioProvingConfig
Ingest CurioIngestConfig
Journal JournalConfig
Apis ApisConfig
}
Expand Down Expand Up @@ -826,6 +827,31 @@ type CurioProvingConfig struct {
SingleRecoveringPartitionPerPostMessage bool
}

type CurioIngestConfig struct {
// Maximum number of sectors that can be queued waiting for SDR to start processing.
// 0 = unlimited
// Note: This mechanism will delay taking deal data from markets, providing backpressure to the market subsystem.
// The SDR queue includes deals which are in the process of entering the sealing pipeline - size of this queue
// will also impact the maximum number of ParkPiece tasks which can run concurrently.
//
// SDR queue is the first queue in the sealing pipeline, meaning that it should be used as the primary backpressure mechanism.
MaxQueueSDR int

// Maximum number of sectors that can be queued waiting for SDRTrees to start processing.
// 0 = unlimited
// Note: This mechanism will delay taking deal data from markets, providing backpressure to the market subsystem.
// In case of the trees tasks it is possible that this queue grows more than this limit, the backpressure is only
// applied to sectors entering the pipeline.
MaxQueueTrees int

// Maximum number of sectors that can be queued waiting for PoRep to start processing.
// 0 = unlimited
// Note: This mechanism will delay taking deal data from markets, providing backpressure to the market subsystem.
// Like with the trees tasks, it is possible that this queue grows more than this limit, the backpressure is only
// applied to sectors entering the pipeline.
MaxQueuePoRep int
}

// API contains configs for API endpoint
type API struct {
// Binding address for the Lotus API
Expand Down

0 comments on commit 803cd1c

Please sign in to comment.