Skip to content

Commit

Permalink
Merge pull request #11380 from filecoin-project/feat/lp-wdpost-submit
Browse files Browse the repository at this point in the history
[wip] feat: sturdypost: WindowPoSt Submit
  • Loading branch information
snadrus authored Nov 8, 2023
2 parents 848c20d + 5921da3 commit 8ecee2d
Show file tree
Hide file tree
Showing 13 changed files with 554 additions and 97 deletions.
4 changes: 2 additions & 2 deletions cmd/lotus-provider/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,12 +244,12 @@ var runCmd = &cli.Command{
{

if cfg.Subsystems.EnableWindowPost {
wdPostTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, verif, lw,
wdPostTask, wdPoStSubmitTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, verif, lw,
as, maddrs, db, stor, si, cfg.Subsystems.WindowPostMaxTasks)
if err != nil {
return err
}
activeTasks = append(activeTasks, wdPostTask)
activeTasks = append(activeTasks, wdPostTask, wdPoStSubmitTask)
}
}
taskEngine, err := harmonytask.New(db, activeTasks, listenAddr)
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -344,6 +344,7 @@ github.com/filecoin-project/go-state-types v0.0.0-20201102161440-c8033295a1fc/go
github.com/filecoin-project/go-state-types v0.1.0/go.mod h1:ezYnPf0bNkTsDibL/psSz5dy4B5awOJ/E7P2Saeep8g=
github.com/filecoin-project/go-state-types v0.1.6/go.mod h1:UwGVoMsULoCK+bWjEdd/xLCvLAQFBC7EDT477SKml+Q=
github.com/filecoin-project/go-state-types v0.1.10/go.mod h1:UwGVoMsULoCK+bWjEdd/xLCvLAQFBC7EDT477SKml+Q=
github.com/filecoin-project/go-state-types v0.11.1/go.mod h1:SyNPwTsU7I22gL2r0OAPcImvLoTVfgRwdK/Y5rR1zz8=
github.com/filecoin-project/go-state-types v0.11.2-0.20230712101859-8f37624fa540/go.mod h1:SyNPwTsU7I22gL2r0OAPcImvLoTVfgRwdK/Y5rR1zz8=
github.com/filecoin-project/go-state-types v0.12.5 h1:VQ2N2T3JeUDdIHEo/xhjnT7Q218Wl0UYIyglqT7Z9Ck=
github.com/filecoin-project/go-state-types v0.12.5/go.mod h1:iJTqGdWDvzXhuVf64Lw0hzt4TIoitMo0VgHdxdjNDZI=
Expand Down
3 changes: 1 addition & 2 deletions lib/harmony/harmonydb/sql/20230719.sql
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ CREATE TABLE harmony_machines (
host_and_port varchar(300) NOT NULL,
cpu INTEGER NOT NULL,
ram BIGINT NOT NULL,
gpu FLOAT NOT NULL,
gpuram BIGINT NOT NULL
gpu FLOAT NOT NULL
);

CREATE TABLE harmony_task (
Expand Down
19 changes: 13 additions & 6 deletions lib/harmony/harmonydb/sql/20230823.sql
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,17 @@ comment on column wdpost_partition_tasks.partition_index is 'partition index wit

create table wdpost_proofs
(
sp_id bigint not null,
deadline bigint not null,
partition bigint not null,
submit_at_epoch bigint not null,
submit_by_epoch bigint not null,
proof_message bytea
sp_id bigint not null,
proving_period_start bigint not null,
deadline bigint not null,
partition bigint not null,
submit_at_epoch bigint not null,
submit_by_epoch bigint not null,
proof_params bytea,

submit_task_id bigint,
message_cid text,

constraint wdpost_proofs_identity_key
unique (sp_id, proving_period_start, deadline, partition)
);
22 changes: 22 additions & 0 deletions lib/harmony/harmonydb/sql/20231103.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
create table message_sends
(
from_key text not null,
nonce bigint not null,
to_addr text not null,
signed_data bytea not null,
signed_json jsonb not null,
signed_cid text not null,
send_time timestamp default CURRENT_TIMESTAMP,
send_reason text,
send_success bool default false not null,
constraint message_sends_pk
primary key (from_key, nonce)
);

comment on column message_sends.from_key is 'text f[1/3/4]... address';
comment on column message_sends.nonce is 'assigned message nonce';
comment on column message_sends.to_addr is 'text f[0/1/2/3/4]... address';
comment on column message_sends.signed_data is 'signed message data';
comment on column message_sends.signed_cid is 'signed message cid';
comment on column message_sends.send_reason is 'optional description of send reason';
comment on column message_sends.send_success is 'whether this message was broadcasted to the network already';
2 changes: 1 addition & 1 deletion lib/harmony/harmonytask/task_type_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ top:

defer func() {
if r := recover(); r != nil {
stackSlice := make([]byte, 512)
stackSlice := make([]byte, 4092)
sz := runtime.Stack(stackSlice, false)
log.Error("Recovered from a serious error "+
"while processing "+h.Name+" task "+strconv.Itoa(int(*tID))+": ", r,
Expand Down
15 changes: 10 additions & 5 deletions lib/harmony/resources/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package resources
import (
"bytes"
"context"
"golang.org/x/xerrors"
"os/exec"
"regexp"
"runtime"
Expand Down Expand Up @@ -42,7 +43,7 @@ func Register(db *harmonydb.DB, hostnameAndPort string) (*Reg, error) {
}
ctx := context.Background()
{ // Learn our owner_id while updating harmony_machines
var ownerID int
var ownerID *int

// Upsert query with last_contact update, fetch the machine ID
// (note this isn't a simple insert .. on conflict because host_and_port isn't unique)
Expand All @@ -54,7 +55,7 @@ func Register(db *harmonydb.DB, hostnameAndPort string) (*Reg, error) {
RETURNING id
),
inserted AS (
INSERT INTO harmony_machines (host_and_port, cpu, ram, gpu, gpuram, last_contact)
INSERT INTO harmony_machines (host_and_port, cpu, ram, gpu, last_contact)
SELECT $1, $2, $3, $4, CURRENT_TIMESTAMP
WHERE NOT EXISTS (SELECT id FROM upsert)
RETURNING id
Expand All @@ -64,10 +65,13 @@ func Register(db *harmonydb.DB, hostnameAndPort string) (*Reg, error) {
SELECT id FROM inserted;
`, hostnameAndPort, reg.Cpu, reg.Ram, reg.Gpu).Scan(&ownerID)
if err != nil {
return nil, err
return nil, xerrors.Errorf("inserting machine entry: %w", err)
}
if ownerID == nil {
return nil, xerrors.Errorf("no owner id")
}

reg.MachineID = ownerID
reg.MachineID = *ownerID

cleaned := CleanupMachines(context.Background(), db)
logger.Infow("Cleaned up machines", "count", cleaned)
Expand All @@ -87,9 +91,10 @@ func Register(db *harmonydb.DB, hostnameAndPort string) (*Reg, error) {

return &reg, nil
}

func CleanupMachines(ctx context.Context, db *harmonydb.DB) int {
ct, err := db.Exec(ctx, `DELETE FROM harmony_machines WHERE last_contact < $1`,
time.Now().Add(-1*LOOKS_DEAD_TIMEOUT))
time.Now().Add(-1*LOOKS_DEAD_TIMEOUT).UTC())
if err != nil {
logger.Warn("unable to delete old machines: ", err)
}
Expand Down
16 changes: 12 additions & 4 deletions provider/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/filecoin-project/lotus/node/config"
dtypes "github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/filecoin-project/lotus/provider/chainsched"
"github.com/filecoin-project/lotus/provider/lpmessage"
"github.com/filecoin-project/lotus/provider/lpwindow"
"github.com/filecoin-project/lotus/storage/ctladdr"
"github.com/filecoin-project/lotus/storage/paths"
Expand All @@ -22,19 +23,26 @@ var log = logging.Logger("provider")

func WindowPostScheduler(ctx context.Context, fc config.LotusProviderFees, pc config.ProvingConfig,
api api.FullNode, verif storiface.Verifier, lw *sealer.LocalWorker,
as *ctladdr.AddressSelector, maddr []dtypes.MinerAddress, db *harmonydb.DB, stor paths.Store, idx paths.SectorIndex, max int) (*lpwindow.WdPostTask, error) {
as *ctladdr.AddressSelector, maddr []dtypes.MinerAddress, db *harmonydb.DB, stor paths.Store, idx paths.SectorIndex, max int) (*lpwindow.WdPostTask, *lpwindow.WdPostSubmitTask, error) {

chainSched := chainsched.New(api)

// todo config
ft := lpwindow.NewSimpleFaultTracker(stor, idx, 32, 5*time.Second, 300*time.Second)

task, err := lpwindow.NewWdPostTask(db, api, ft, lw, verif, chainSched, maddr, max)
sender := lpmessage.NewSender(api, api, db)

computeTask, err := lpwindow.NewWdPostTask(db, api, ft, lw, verif, chainSched, maddr, max)
if err != nil {
return nil, nil, err
}

submitTask, err := lpwindow.NewWdPostSubmitTask(chainSched, sender, db, api, fc.MaxWindowPoStGasFee, as)
if err != nil {
return nil, err
return nil, nil, err
}

go chainSched.Run(ctx)

return task, nil
return computeTask, submitTask, nil
}
177 changes: 177 additions & 0 deletions provider/lpmessage/sender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package lpmessage

import (
"context"

"github.com/google/uuid"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/big"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
)

var log = logging.Logger("lpmessage")

type SenderAPI interface {
StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, tsk types.TipSetKey) (*types.Message, error)
WalletBalance(ctx context.Context, addr address.Address) (big.Int, error)
MpoolGetNonce(context.Context, address.Address) (uint64, error)
MpoolPush(context.Context, *types.SignedMessage) (cid.Cid, error)
}

type SignerAPI interface {
WalletSignMessage(context.Context, address.Address, *types.Message) (*types.SignedMessage, error)
}

// Sender abstracts away highly-available message sending with coordination through
// HarmonyDB. It make sure that nonces are assigned transactionally, and that
// messages are correctly broadcasted to the network. It is not a Task in the sense
// of a HarmonyTask interface, just a helper for tasks which need to send messages
// to the network.
type Sender struct {
api SenderAPI
signer SignerAPI

db *harmonydb.DB
}

// NewSender creates a new Sender.
func NewSender(api SenderAPI, signer SignerAPI, db *harmonydb.DB) *Sender {
return &Sender{
api: api,
signer: signer,

db: db,
}
}

// Send atomically assigns a nonce, signs, and pushes a message
// to mempool.
// maxFee is only used when GasFeeCap/GasPremium fields aren't specified
//
// When maxFee is set to 0, Send will guess appropriate fee
// based on current chain conditions
//
// Send behaves much like fullnodeApi.MpoolPushMessage, but it coordinates
// through HarmonyDB, making it safe to broadcast messages from multiple independent
// API nodes
//
// Send is also currently more strict about required parameters than MpoolPushMessage
func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageSendSpec, reason string) (cid.Cid, error) {
if mss == nil {
return cid.Undef, xerrors.Errorf("MessageSendSpec cannot be nil")
}
if (mss.MsgUuid != uuid.UUID{}) {
return cid.Undef, xerrors.Errorf("MessageSendSpec.MsgUuid must be zero")
}

fromA, err := s.api.StateAccountKey(ctx, msg.From, types.EmptyTSK)
if err != nil {
return cid.Undef, xerrors.Errorf("getting key address: %w", err)
}

msg.From = fromA

if msg.Nonce != 0 {
return cid.Undef, xerrors.Errorf("Send expects message nonce to be 0, was %d", msg.Nonce)
}

msg, err = s.api.GasEstimateMessageGas(ctx, msg, mss, types.EmptyTSK)
if err != nil {
return cid.Undef, xerrors.Errorf("GasEstimateMessageGas error: %w", err)
}

b, err := s.api.WalletBalance(ctx, msg.From)
if err != nil {
return cid.Undef, xerrors.Errorf("mpool push: getting origin balance: %w", err)
}

requiredFunds := big.Add(msg.Value, msg.RequiredFunds())
if b.LessThan(requiredFunds) {
return cid.Undef, xerrors.Errorf("mpool push: not enough funds: %s < %s", b, requiredFunds)
}

var sigMsg *types.SignedMessage

// start db tx
c, err := s.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
// assign nonce (max(api.MpoolGetNonce, db nonce+1))
msgNonce, err := s.api.MpoolGetNonce(ctx, fromA)
if err != nil {
return false, xerrors.Errorf("getting nonce from mpool: %w", err)
}

// get nonce from db
var dbNonce *uint64
r := tx.QueryRow(`select max(nonce) from message_sends where from_key = $1`, fromA.String())
if err := r.Scan(&dbNonce); err != nil {
return false, xerrors.Errorf("getting nonce from db: %w", err)
}

if dbNonce != nil && *dbNonce+1 > msgNonce {
msgNonce = *dbNonce + 1
}

msg.Nonce = msgNonce

// sign message
sigMsg, err = s.signer.WalletSignMessage(ctx, msg.From, msg)
if err != nil {
return false, xerrors.Errorf("signing message: %w", err)
}

data, err := sigMsg.Serialize()
if err != nil {
return false, xerrors.Errorf("serializing message: %w", err)
}

jsonBytes, err := sigMsg.MarshalJSON()
if err != nil {
return false, xerrors.Errorf("marshaling message: %w", err)
}

// write to db
c, err := tx.Exec(`insert into message_sends (from_key, nonce, to_addr, signed_data, signed_json, signed_cid, send_reason) values ($1, $2, $3, $4, $5, $6, $7)`,
fromA.String(), msg.Nonce, msg.To.String(), data, string(jsonBytes), sigMsg.Cid().String(), reason)
if err != nil {
return false, xerrors.Errorf("inserting message into db: %w", err)
}
if c != 1 {
return false, xerrors.Errorf("inserting message into db: expected 1 row to be affected, got %d", c)
}

// commit
return true, nil
})
if err != nil || !c {
return cid.Undef, xerrors.Errorf("transaction failed or didn't commit: %w", err)
}

// push to mpool
_, err = s.api.MpoolPush(ctx, sigMsg)
if err != nil {
// TODO: We may get nonce gaps here..

return cid.Undef, xerrors.Errorf("mpool push: failed to push message: %w", err)
}

// update db recocd to say it was pushed (set send_success to true)
cn, err := s.db.Exec(ctx, `update message_sends set send_success = true where from_key = $1 and nonce = $2`, fromA.String(), msg.Nonce)
if err != nil {
return cid.Undef, xerrors.Errorf("updating db record: %w", err)
}
if cn != 1 {
return cid.Undef, xerrors.Errorf("updating db record: expected 1 row to be affected, got %d", c)
}

log.Infow("sent message", "cid", sigMsg.Cid(), "from", fromA, "to", msg.To, "nonce", msg.Nonce, "value", msg.Value, "gaslimit", msg.GasLimit)

return sigMsg.Cid(), nil
}
File renamed without changes.
Loading

0 comments on commit 8ecee2d

Please sign in to comment.