Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Raft consensus for lotus nodes in a cluster #9294

Merged
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
8f1b1bb
WIP: Raft consensus for lotus nodes in a cluster
Sep 8, 2022
a1f2fdb
Merge branch 'master' into sbansal/nonce-coordination-and-consensus-f…
Sep 12, 2022
4171be0
Few more changes
Sep 12, 2022
3441224
WIP: rest of the stuff
Sep 13, 2022
4be8861
Merge branch 'master' into sbansal/nonce-coordination-and-consensus-f…
Sep 13, 2022
81c729e
Cluster raft config changes
Sep 13, 2022
1fe4aa3
Add Auth func for gorpc and address comments
Sep 21, 2022
7470549
Address moar comments
Sep 22, 2022
99e7c32
More wip
Sep 27, 2022
559c2c6
Merge branch 'master' into sbansal/nonce-coordination-and-consensus-f…
Sep 27, 2022
570f614
Retries within proxy working
Sep 28, 2022
f89a682
Add Mpool ref to raft state and rearrange some APIs
Sep 29, 2022
b8060cd
Add persistent stores for cluster raft data
Sep 29, 2022
986c5e3
Use multiaddrs in config for raft peerset
Sep 30, 2022
dde204f
Change Mpool push API to have an option to publish
Oct 4, 2022
9848182
solution for mining loop hitting the same node
Oct 5, 2022
17a7722
Ignore mpool msg existing errors for applying raft state
Oct 6, 2022
139f877
fix some bugs and address some comments
Oct 17, 2022
b77ca54
Change cli cmd to API with proxy
Oct 17, 2022
900525f
some cleanup
Oct 17, 2022
674427a
fix lint and make gen
Oct 18, 2022
15ed1ee
Merge branch 'master' into sbansal/nonce-coordination-and-consensus-f…
Oct 18, 2022
ad8b959
Address more comments and add test for gorpc auth
Oct 18, 2022
94bd4d8
make gen
Oct 18, 2022
09e9562
i hate make gen
Oct 18, 2022
2fa21ff
Merge branch 'master' into sbansal/nonce-coordination-and-consensus-f…
Nov 11, 2022
2681c2a
Change config name from Raft to Cluster
Nov 11, 2022
8740fb4
remove 2nd rpc closer call
Nov 11, 2022
b541cf9
Remove double stop
Nov 14, 2022
800d9de
Address comments
Nov 14, 2022
a66619f
update filecoin-ffi
Nov 14, 2022
ab1eeeb
one more
Nov 14, 2022
f14a25a
make gen and docsgen
Nov 14, 2022
b95d1a6
Merge branch 'master' into sbansal/nonce-coordination-and-consensus-f…
Nov 14, 2022
4b11b45
remove comments
Nov 15, 2022
9451221
remove moar commented out code
Nov 15, 2022
c0925ff
Remove some configs
Nov 15, 2022
22f3fbb
Add comment to Push API
Nov 15, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -924,6 +924,11 @@ workflows:
suite: itest-mempool
target: "./itests/mempool_test.go"

- test:
name: test-itest-mpool_push_with_uuid
suite: itest-mpool_push_with_uuid
target: "./itests/mpool_push_with_uuid_test.go"

- test:
name: test-itest-multisig
suite: itest-multisig
Expand Down Expand Up @@ -959,6 +964,11 @@ workflows:
suite: itest-paych_cli
target: "./itests/paych_cli_test.go"

- test:
name: test-itest-raft_messagesigner
suite: itest-raft_messagesigner
target: "./itests/raft_messagesigner_test.go"

- test:
name: test-itest-sdr_upgrade
suite: itest-sdr_upgrade
Expand Down
4 changes: 4 additions & 0 deletions api/api_full.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
consensus "github.com/libp2p/go-libp2p-consensus"
"github.com/libp2p/go-libp2p/core/peer"

"github.com/filecoin-project/go-address"
Expand Down Expand Up @@ -751,6 +752,9 @@ type FullNode interface {
// LOTUS_BACKUP_BASE_PATH environment variable set to some path, and that
// the path specified when calling CreateBackup is within the base path
CreateBackup(ctx context.Context, fpath string) error //perm:admin

RaftState(ctx context.Context) (consensus.State, error) //perm:read
RaftLeader(ctx context.Context) (peer.ID, error) //perm:read
}

type StorageAsk struct {
Expand Down
31 changes: 31 additions & 0 deletions api/mocks/mock_full.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 27 additions & 0 deletions api/proxy_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ type MessageSendSpec struct {
MsgUuid uuid.UUID
}

type MpoolMessageWhole struct {
Msg *types.Message
Spec *MessageSendSpec
}

// GraphSyncDataTransfer provides diagnostics on a data transfer happening over graphsync
type GraphSyncDataTransfer struct {
// GraphSync request id for this transfer
Expand Down
Binary file modified build/openrpc/full.json.gz
Binary file not shown.
Binary file modified build/openrpc/gateway.json.gz
Binary file not shown.
Binary file modified build/openrpc/miner.json.gz
Binary file not shown.
Binary file modified build/openrpc/worker.json.gz
Binary file not shown.
55 changes: 44 additions & 11 deletions chain/messagesigner/messagesigner.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import (
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
logging "github.com/ipfs/go-log/v2"
consensus "github.com/libp2p/go-libp2p-consensus"
"github.com/libp2p/go-libp2p/core/peer"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"

Expand All @@ -29,6 +31,19 @@ type MpoolNonceAPI interface {
GetActor(context.Context, address.Address, types.TipSetKey) (*types.Actor, error)
}

type MsgSigner interface {
SignMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, cb func(*types.SignedMessage) error) (*types.SignedMessage, error)
GetSignedMessage(ctx context.Context, uuid uuid.UUID) (*types.SignedMessage, error)
StoreSignedMessage(ctx context.Context, uuid uuid.UUID, message *types.SignedMessage) error
NextNonce(ctx context.Context, addr address.Address) (uint64, error)
SaveNonce(ctx context.Context, addr address.Address, nonce uint64) error
DstoreKey(addr address.Address) datastore.Key
shrenujbansal marked this conversation as resolved.
Show resolved Hide resolved
IsLeader(ctx context.Context) bool
RaftLeader(ctx context.Context) (peer.ID, error)
shrenujbansal marked this conversation as resolved.
Show resolved Hide resolved
RedirectToLeader(ctx context.Context, method string, arg interface{}, ret interface{}) (bool, error)
magik6k marked this conversation as resolved.
Show resolved Hide resolved
GetRaftState(ctx context.Context) (consensus.State, error)
magik6k marked this conversation as resolved.
Show resolved Hide resolved
}

// MessageSigner keeps track of nonces per address, and increments the nonce
// when signing a message
type MessageSigner struct {
Expand All @@ -38,6 +53,8 @@ type MessageSigner struct {
ds datastore.Batching
}

//var _ full.MsgSigner = &MessageSigner{}

func NewMessageSigner(wallet api.Wallet, mpool MpoolNonceAPI, ds dtypes.MetadataDS) *MessageSigner {
ds = namespace.Wrap(ds, datastore.NewKey("/message-signer/"))
return &MessageSigner{
Expand All @@ -49,12 +66,12 @@ func NewMessageSigner(wallet api.Wallet, mpool MpoolNonceAPI, ds dtypes.Metadata

// SignMessage increments the nonce for the message From address, and signs
// the message
func (ms *MessageSigner) SignMessage(ctx context.Context, msg *types.Message, cb func(*types.SignedMessage) error) (*types.SignedMessage, error) {
func (ms *MessageSigner) SignMessage(ctx context.Context, msg *types.Message, spec *api.MessageSendSpec, cb func(*types.SignedMessage) error) (*types.SignedMessage, error) {
ms.lk.Lock()
defer ms.lk.Unlock()

// Get the next message nonce
nonce, err := ms.nextNonce(ctx, msg.From)
nonce, err := ms.NextNonce(ctx, msg.From)
if err != nil {
return nil, xerrors.Errorf("failed to create nonce: %w", err)
}
Expand All @@ -72,7 +89,7 @@ func (ms *MessageSigner) SignMessage(ctx context.Context, msg *types.Message, cb
Extra: mb.RawData(),
})
if err != nil {
return nil, xerrors.Errorf("failed to sign message: %w", err)
return nil, xerrors.Errorf("failed to sign message: %w, addr=%s", err, msg.From)
}

// Callback with the signed message
Expand All @@ -86,7 +103,7 @@ func (ms *MessageSigner) SignMessage(ctx context.Context, msg *types.Message, cb
}

// If the callback executed successfully, write the nonce to the datastore
if err := ms.saveNonce(ctx, msg.From, nonce); err != nil {
if err := ms.SaveNonce(ctx, msg.From, nonce); err != nil {
return nil, xerrors.Errorf("failed to save nonce: %w", err)
}

Expand All @@ -113,9 +130,9 @@ func (ms *MessageSigner) StoreSignedMessage(ctx context.Context, uuid uuid.UUID,
return ms.ds.Put(ctx, key, serializedMsg)
}

// nextNonce gets the next nonce for the given address.
// NextNonce gets the next nonce for the given address.
// If there is no nonce in the datastore, gets the nonce from the message pool.
func (ms *MessageSigner) nextNonce(ctx context.Context, addr address.Address) (uint64, error) {
func (ms *MessageSigner) NextNonce(ctx context.Context, addr address.Address) (uint64, error) {
// Nonces used to be created by the mempool and we need to support nodes
// that have mempool nonces, so first check the mempool for a nonce for
// this address. Note that the mempool returns the actor state's nonce
Expand All @@ -126,7 +143,7 @@ func (ms *MessageSigner) nextNonce(ctx context.Context, addr address.Address) (u
}

// Get the next nonce for this address from the datastore
addrNonceKey := ms.dstoreKey(addr)
addrNonceKey := ms.DstoreKey(addr)
dsNonceBytes, err := ms.ds.Get(ctx, addrNonceKey)

switch {
Expand Down Expand Up @@ -159,14 +176,14 @@ func (ms *MessageSigner) nextNonce(ctx context.Context, addr address.Address) (u
}
}

// saveNonce increments the nonce for this address and writes it to the
// SaveNonce increments the nonce for this address and writes it to the
// datastore
func (ms *MessageSigner) saveNonce(ctx context.Context, addr address.Address, nonce uint64) error {
func (ms *MessageSigner) SaveNonce(ctx context.Context, addr address.Address, nonce uint64) error {
// Increment the nonce
nonce++

// Write the nonce to the datastore
addrNonceKey := ms.dstoreKey(addr)
addrNonceKey := ms.DstoreKey(addr)
buf := bytes.Buffer{}
_, err := buf.Write(cbg.CborEncodeMajorType(cbg.MajUnsignedInt, nonce))
if err != nil {
Expand All @@ -179,6 +196,22 @@ func (ms *MessageSigner) saveNonce(ctx context.Context, addr address.Address, no
return nil
}

func (ms *MessageSigner) dstoreKey(addr address.Address) datastore.Key {
func (ms *MessageSigner) DstoreKey(addr address.Address) datastore.Key {
return datastore.KeyWithNamespaces([]string{dsKeyActorNonce, addr.String()})
}

func (ms *MessageSigner) IsLeader(ctx context.Context) bool {
return true
}

func (ms *MessageSigner) RedirectToLeader(ctx context.Context, method string, arg interface{}, ret interface{}) (bool, error) {
return false, xerrors.Errorf("Single node shouldn't have any redirects")
shrenujbansal marked this conversation as resolved.
Show resolved Hide resolved
}

func (ms *MessageSigner) GetRaftState(ctx context.Context) (consensus.State, error) {
return nil, xerrors.Errorf("This is a non raft consensus message signer")
}

func (ms *MessageSigner) RaftLeader(ctx context.Context) (peer.ID, error) {
return "", xerrors.Errorf("No leaders in non raft message signer")
}
99 changes: 99 additions & 0 deletions chain/messagesigner/messagesigner_consensus.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package messagesigner

import (
"context"

"github.com/google/uuid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
libp2pconsensus "github.com/libp2p/go-libp2p-consensus"
"github.com/libp2p/go-libp2p/core/peer"
"golang.org/x/xerrors"

"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
consensus "github.com/filecoin-project/lotus/lib/consensus/raft"
"github.com/filecoin-project/lotus/node/modules/dtypes"
)

type MessageSignerConsensus struct {
MsgSigner
consensus *consensus.Consensus
}

func NewMessageSignerConsensus(
wallet api.Wallet,
mpool MpoolNonceAPI,
ds dtypes.MetadataDS,
consensus *consensus.Consensus) *MessageSignerConsensus {

ds = namespace.Wrap(ds, datastore.NewKey("/message-signer-consensus/"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a different prefix than the one used in non-consensus messagesigner, but I guess that's what we want.

I'm just not 100% sure that we actually need this to be a different prefix.

return &MessageSignerConsensus{
MsgSigner: &MessageSigner{
wallet: wallet,
mpool: mpool,
ds: ds,
},
consensus: consensus,
}
}

func (ms *MessageSignerConsensus) IsLeader(ctx context.Context) bool {
return ms.consensus.IsLeader(ctx)
}

func (ms *MessageSignerConsensus) RedirectToLeader(ctx context.Context, method string, arg interface{}, ret interface{}) (bool, error) {
ok, err := ms.consensus.RedirectToLeader(method, arg, ret.(*types.SignedMessage))
if err != nil {
return ok, err
}
return ok, nil
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we drop this method?

Suggested change
func (ms *MessageSignerConsensus) RedirectToLeader(ctx context.Context, method string, arg interface{}, ret interface{}) (bool, error) {
ok, err := ms.consensus.RedirectToLeader(method, arg, ret.(*types.SignedMessage))
if err != nil {
return ok, err
}
return ok, nil
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this is needed to be called in MpoolPushMessage


func (ms *MessageSignerConsensus) SignMessage(
ctx context.Context,
msg *types.Message,
spec *api.MessageSendSpec,
cb func(*types.SignedMessage) error) (*types.SignedMessage, error) {

signedMsg, err := ms.MsgSigner.SignMessage(ctx, msg, spec, cb)
if err != nil {
return nil, err
}

// We can't have an empty/default uuid as part of the consensus state so generate a new uuid if spec is empty
u := uuid.New()
if spec != nil {
u = spec.MsgUuid
shrenujbansal marked this conversation as resolved.
Show resolved Hide resolved
}

op := &consensus.ConsensusOp{signedMsg.Message.Nonce, u, signedMsg.Message.From, signedMsg}
err = ms.consensus.Commit(ctx, op)
if err != nil {
return nil, err
}

return signedMsg, nil
}

func (ms *MessageSignerConsensus) GetSignedMessage(ctx context.Context, uuid uuid.UUID) (*types.SignedMessage, error) {
state, err := ms.consensus.State(ctx)
if err != nil {
return nil, err
}

cstate := state.(consensus.RaftState)
msg, ok := cstate.MsgUuids[uuid]
if !ok {
return nil, xerrors.Errorf("Msg with Uuid %s not available", uuid)
}
return msg, nil
}

func (ms *MessageSignerConsensus) GetRaftState(ctx context.Context) (libp2pconsensus.State, error) {
return ms.consensus.State(ctx)
}

func (ms *MessageSignerConsensus) RaftLeader(ctx context.Context) (peer.ID, error) {
return ms.consensus.Leader(ctx)
}
2 changes: 1 addition & 1 deletion chain/messagesigner/messagesigner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func TestMessageSignerSignMessage(t *testing.T) {
mpool.setNonce(m.msg.From, m.mpoolNonce[0])
}
merr := m.cbErr
smsg, err := ms.SignMessage(ctx, m.msg, func(message *types.SignedMessage) error {
smsg, err := ms.SignMessage(ctx, m.msg, nil, func(message *types.SignedMessage) error {
return merr
})

Expand Down
Binary file added devgen.car
Binary file not shown.
Loading