Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[wip] feat: sturdypost: WindowPoSt Submit #11380

Merged
merged 12 commits into from
Nov 8, 2023
4 changes: 2 additions & 2 deletions cmd/lotus-provider/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,12 @@ var runCmd = &cli.Command{
{

if cfg.Subsystems.EnableWindowPost {
wdPostTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, verif, lw,
wdPostTask, wdPoStSubmitTask, err := provider.WindowPostScheduler(ctx, cfg.Fees, cfg.Proving, full, verif, lw,
as, maddrs, db, stor, si, cfg.Subsystems.WindowPostMaxTasks)
if err != nil {
return err
}
activeTasks = append(activeTasks, wdPostTask)
activeTasks = append(activeTasks, wdPostTask, wdPoStSubmitTask)
}
}
taskEngine, err := harmonytask.New(db, activeTasks, listenAddr)
Expand Down
8 changes: 7 additions & 1 deletion lib/harmony/harmonydb/sql/20230823.sql
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,11 @@ create table wdpost_proofs
partition bigint not null,
submit_at_epoch bigint not null,
submit_by_epoch bigint not null,
proof_message bytea
proof_message bytea,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should rename this to proof_params before merging


submit_task_id bigint not null,
message_cid text not null,

constraint wdpost_proofs_identity_key
unique (sp_id, deadline, partition)
);
22 changes: 22 additions & 0 deletions lib/harmony/harmonydb/sql/20231103.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
create table message_sends
(
from_key text not null,
nonce bigint not null,
to_addr text not null,
signed_data bytea not null,
signed_json jsonb not null,
signed_cid text not null,
send_time date default CURRENT_TIMESTAMP,
magik6k marked this conversation as resolved.
Show resolved Hide resolved
send_reason text,
send_success bool default false not null,
constraint message_sends_pk
primary key (from_key, nonce)
);

comment on column message_sends.from_key is 'text f[1/3/4]... address';
comment on column message_sends.nonce is 'assigned message nonce';
comment on column message_sends.to_addr is 'text f[0/1/2/3/4]... address';
comment on column message_sends.signed_data is 'signed message data';
comment on column message_sends.signed_cid is 'signed message cid';
comment on column message_sends.send_reason is 'optional description of send reason';
comment on column message_sends.send_success is 'whether this message was broadcasted to the network already';
16 changes: 12 additions & 4 deletions provider/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package provider

import (
"context"
"github.com/filecoin-project/lotus/provider/lpmessage"
"time"

"github.com/filecoin-project/lotus/storage/paths"
Expand All @@ -23,19 +24,26 @@ var log = logging.Logger("provider")

func WindowPostScheduler(ctx context.Context, fc config.LotusProviderFees, pc config.ProvingConfig,
api api.FullNode, verif storiface.Verifier, lw *sealer.LocalWorker,
as *ctladdr.AddressSelector, maddr []dtypes.MinerAddress, db *harmonydb.DB, stor paths.Store, idx paths.SectorIndex, max int) (*lpwindow.WdPostTask, error) {
as *ctladdr.AddressSelector, maddr []dtypes.MinerAddress, db *harmonydb.DB, stor paths.Store, idx paths.SectorIndex, max int) (*lpwindow.WdPostTask, *lpwindow.WdPostSubmitTask, error) {

chainSched := chainsched.New(api)

// todo config
ft := lpwindow.NewSimpleFaultTracker(stor, idx, 32, 5*time.Second, 300*time.Second)

task, err := lpwindow.NewWdPostTask(db, api, ft, lw, verif, chainSched, maddr, max)
sender := lpmessage.NewSender(api, api, db)

computeTask, err := lpwindow.NewWdPostTask(db, api, ft, lw, verif, chainSched, maddr, max)
if err != nil {
return nil, nil, err
}

submitTask, err := lpwindow.NewWdPostSubmitTask(chainSched, sender, db, api, fc.MaxWindowPoStGasFee, as)
if err != nil {
return nil, err
return nil, nil, err
}

go chainSched.Run(ctx)

return task, nil
return computeTask, submitTask, nil
}
170 changes: 170 additions & 0 deletions provider/lpmessage/sender.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
package lpmessage

import (
"context"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/lib/harmony/harmonydb"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"golang.org/x/xerrors"
)

var log = logging.Logger("lpmessage")

type SenderAPI interface {
StateAccountKey(ctx context.Context, addr address.Address, tsk types.TipSetKey) (address.Address, error)
GasEstimateMessageGas(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, tsk types.TipSetKey) (*types.Message, error)
WalletBalance(ctx context.Context, addr address.Address) (big.Int, error)
MpoolGetNonce(context.Context, address.Address) (uint64, error)
MpoolPush(context.Context, *types.SignedMessage) (cid.Cid, error)
}

type SignerAPI interface {
WalletSignMessage(context.Context, address.Address, *types.Message) (*types.SignedMessage, error)
}

// Sender abstracts away highly-available message sending with coordination through
// HarmonyDB. It make sure that nonces are assigned transactionally, and that
// messages are correctly broadcasted to the network. It is not a Task in the sense
// of a HarmonyTask interface, just a helper for tasks which need to send messages
// to the network.
type Sender struct {
api SenderAPI
signer SignerAPI

db *harmonydb.DB
}

// NewSender creates a new Sender.
func NewSender(api SenderAPI, signer SignerAPI, db *harmonydb.DB) *Sender {
return &Sender{
api: api,
signer: signer,

db: db,
}
}

// Send atomically assigns a nonce, signs, and pushes a message
// to mempool.
// maxFee is only used when GasFeeCap/GasPremium fields aren't specified
//
// When maxFee is set to 0, Send will guess appropriate fee
// based on current chain conditions
//
// Send behaves much like fullnodeApi.MpoolPushMessage, but it coordinates
// through HarmonyDB, making it safe to broadcast messages from multiple independent
// API nodes
//
// Send is also currently more strict about required parameters than MpoolPushMessage
func (s *Sender) Send(ctx context.Context, msg *types.Message, mss *api.MessageSendSpec, reason string) (cid.Cid, error) {
if mss == nil {
return cid.Undef, xerrors.Errorf("MessageSendSpec cannot be nil")
}
if (mss.MsgUuid != uuid.UUID{}) {
return cid.Undef, xerrors.Errorf("MessageSendSpec.MsgUuid must be zero")
}

fromA, err := s.api.StateAccountKey(ctx, msg.From, types.EmptyTSK)
if err != nil {
return cid.Undef, xerrors.Errorf("getting key address: %w", err)
}

if msg.Nonce != 0 {
return cid.Undef, xerrors.Errorf("Send expects message nonce to be 0, was %d", msg.Nonce)
}

msg, err = s.api.GasEstimateMessageGas(ctx, msg, mss, types.EmptyTSK)
if err != nil {
return cid.Undef, xerrors.Errorf("GasEstimateMessageGas error: %w", err)
}

b, err := s.api.WalletBalance(ctx, msg.From)
if err != nil {
return cid.Undef, xerrors.Errorf("mpool push: getting origin balance: %w", err)
}

requiredFunds := big.Add(msg.Value, msg.RequiredFunds())
if b.LessThan(requiredFunds) {
return cid.Undef, xerrors.Errorf("mpool push: not enough funds: %s < %s", b, requiredFunds)
}

var sigMsg *types.SignedMessage

// start db tx
c, err := s.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
// assign nonce (max(api.MpoolGetNonce, db nonce+1))
msgNonce, err := s.api.MpoolGetNonce(ctx, fromA)
if err != nil {
return false, xerrors.Errorf("getting nonce from mpool: %w", err)
}

// get nonce from db
var dbNonce uint64
r := tx.QueryRow(`select max(nonce) from message_sends where from_key = $1`, fromA.String())
if err := r.Scan(&dbNonce); err != nil {
return false, xerrors.Errorf("getting nonce from db: %w", err)
}

if dbNonce+1 > msgNonce {
msgNonce = dbNonce + 1
}

msg.Nonce = msgNonce

// sign message
sigMsg, err = s.signer.WalletSignMessage(ctx, msg.From, msg)
if err != nil {
return false, xerrors.Errorf("signing message: %w", err)
}

data, err := sigMsg.Serialize()
if err != nil {
return false, xerrors.Errorf("serializing message: %w", err)
}

jsonBytes, err := sigMsg.MarshalJSON()
if err != nil {
return false, xerrors.Errorf("marshaling message: %w", err)
}

// write to db
c, err := tx.Exec(`insert into message_sends (from_key, nonce, to_addr, signed_data, signed_json, signed_cid, send_reason) values ($1, $2, $3, $4, $5, $6, $7)`,
fromA.String(), msg.Nonce, msg.To.String(), data, string(jsonBytes), sigMsg.Cid().String(), reason)
if err != nil {
return false, xerrors.Errorf("inserting message into db: %w", err)
}
if c != 1 {
return false, xerrors.Errorf("inserting message into db: expected 1 row to be affected, got %d", c)
}

// commit
return true, nil
})
if err != nil || !c {
return cid.Undef, xerrors.Errorf("transaction failed or didn't commit: %w", err)
}

// push to mpool
_, err = s.api.MpoolPush(ctx, sigMsg)
if err != nil {
return cid.Undef, xerrors.Errorf("mpool push: failed to push message: %w", err)
}

// update db recocd to say it was pushed (set send_success to true)
cn, err := s.db.Exec(ctx, `update message_sends set send_success = true where from_key = $1 and nonce = $2`, fromA.String(), msg.Nonce)
if err != nil {
return cid.Undef, xerrors.Errorf("updating db record: %w", err)
}
if cn != 1 {
return cid.Undef, xerrors.Errorf("updating db record: expected 1 row to be affected, got %d", c)
}

log.Infow("sent message", "cid", sigMsg.Cid(), "from", fromA, "to", msg.To, "nonce", msg.Nonce, "value", msg.Value, "gaslimit", msg.GasLimit)

return cid.Undef, nil
}
File renamed without changes.
Loading