Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

abci: implement process proposal to spec #9122

Merged
merged 23 commits into from
Aug 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions abci/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ type Client interface {
OfferSnapshotAsync(types.RequestOfferSnapshot) *ReqRes
LoadSnapshotChunkAsync(types.RequestLoadSnapshotChunk) *ReqRes
ApplySnapshotChunkAsync(types.RequestApplySnapshotChunk) *ReqRes
ProcessProposalAsync(types.RequestProcessProposal) *ReqRes

FlushSync() error
EchoSync(msg string) (*types.ResponseEcho, error)
Expand All @@ -60,6 +61,7 @@ type Client interface {
OfferSnapshotSync(types.RequestOfferSnapshot) (*types.ResponseOfferSnapshot, error)
LoadSnapshotChunkSync(types.RequestLoadSnapshotChunk) (*types.ResponseLoadSnapshotChunk, error)
ApplySnapshotChunkSync(types.RequestApplySnapshotChunk) (*types.ResponseApplySnapshotChunk, error)
ProcessProposalSync(types.RequestProcessProposal) (*types.ResponseProcessProposal, error)
}

//----------------------------------------
Expand Down
15 changes: 15 additions & 0 deletions abci/client/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,16 @@ func (cli *grpcClient) PrepareProposalAsync(params types.RequestPrepareProposal)
return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_PrepareProposal{PrepareProposal: res}})
}

func (cli *grpcClient) ProcessProposalAsync(params types.RequestProcessProposal) *ReqRes {
req := types.ToRequestProcessProposal(params)
res, err := cli.client.ProcessProposal(context.Background(), req.GetProcessProposal(), grpc.WaitForReady(true))
if err != nil {
cli.StopForError(err)
}

return cli.finishAsyncCall(req, &types.Response{Value: &types.Response_ProcessProposal{ProcessProposal: res}})
}

// finishAsyncCall creates a ReqRes for an async call, and immediately populates it
// with the response. We don't complete it until it's been ordered via the channel.
func (cli *grpcClient) finishAsyncCall(req *types.Request, res *types.Response) *ReqRes {
Expand Down Expand Up @@ -432,3 +442,8 @@ func (cli *grpcClient) PrepareProposalSync(
reqres := cli.PrepareProposalAsync(params)
return cli.finishSyncCall(reqres).GetPrepareProposal(), cli.Error()
}

func (cli *grpcClient) ProcessProposalSync(params types.RequestProcessProposal) (*types.ResponseProcessProposal, error) {
reqres := cli.ProcessProposalAsync(params)
return cli.finishSyncCall(reqres).GetProcessProposal(), cli.Error()
}
19 changes: 19 additions & 0 deletions abci/client/local_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,17 @@ func (app *localClient) PrepareProposalAsync(req types.RequestPrepareProposal) *
)
}

func (app *localClient) ProcessProposalAsync(req types.RequestProcessProposal) *ReqRes {
app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.ProcessProposal(req)
return app.callback(
types.ToRequestProcessProposal(req),
types.ToResponseProcessProposal(res),
)
}

//-------------------------------------------------------

func (app *localClient) FlushSync() error {
Expand Down Expand Up @@ -342,6 +353,14 @@ func (app *localClient) PrepareProposalSync(req types.RequestPrepareProposal) (*
return &res, nil
}

func (app *localClient) ProcessProposalSync(req types.RequestProcessProposal) (*types.ResponseProcessProposal, error) {
app.mtx.Lock()
defer app.mtx.Unlock()

res := app.Application.ProcessProposal(req)
return &res, nil
}

//-------------------------------------------------------

func (app *localClient) callback(req *types.Request, res *types.Response) *ReqRes {
Expand Down
39 changes: 39 additions & 0 deletions abci/client/mocks/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

15 changes: 15 additions & 0 deletions abci/client/socket_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,10 @@ func (cli *socketClient) PrepareProposalAsync(req types.RequestPrepareProposal)
return cli.queueRequest(types.ToRequestPrepareProposal(req))
}

func (cli *socketClient) ProcessProposalAsync(req types.RequestProcessProposal) *ReqRes {
return cli.queueRequest(types.ToRequestProcessProposal(req))
}

//----------------------------------------

func (cli *socketClient) FlushSync() error {
Expand Down Expand Up @@ -430,6 +434,15 @@ func (cli *socketClient) PrepareProposalSync(req types.RequestPrepareProposal) (
return reqres.Response.GetPrepareProposal(), cli.Error()
}

func (cli *socketClient) ProcessProposalSync(req types.RequestProcessProposal) (*types.ResponseProcessProposal, error) {
reqres := cli.queueRequest(types.ToRequestProcessProposal(req))
if err := cli.FlushSync(); err != nil {
return nil, err
}

return reqres.Response.GetProcessProposal(), cli.Error()
}

//----------------------------------------

func (cli *socketClient) queueRequest(req *types.Request) *ReqRes {
Expand Down Expand Up @@ -507,6 +520,8 @@ func resMatchesReq(req *types.Request, res *types.Response) (ok bool) {
_, ok = res.Value.(*types.Response_OfferSnapshot)
case *types.Request_PrepareProposal:
_, ok = res.Value.(*types.Response_PrepareProposal)
case *types.Request_ProcessProposal:
_, ok = res.Value.(*types.Response_ProcessProposal)
}
return ok
}
Expand Down
10 changes: 10 additions & 0 deletions abci/example/kvstore/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,3 +170,13 @@ func (app *Application) Query(reqQuery types.RequestQuery) (resQuery types.Respo

return resQuery
}

func (app *Application) ProcessProposal(
req types.RequestProcessProposal) types.ResponseProcessProposal {
for _, tx := range req.Txs {
if len(tx) == 0 {
return types.ResponseProcessProposal{Status: types.ResponseProcessProposal_REJECT}
}
}
return types.ResponseProcessProposal{Status: types.ResponseProcessProposal_ACCEPT}
}
10 changes: 10 additions & 0 deletions abci/example/kvstore/persistent_kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,16 @@ func (app *PersistentKVStoreApplication) PrepareProposal(
return types.ResponsePrepareProposal{TxRecords: app.substPrepareTx(req.Txs, req.MaxTxBytes)}
}

func (app *PersistentKVStoreApplication) ProcessProposal(
req types.RequestProcessProposal) types.ResponseProcessProposal {
for _, tx := range req.Txs {
if len(tx) == 0 {
return types.ResponseProcessProposal{Status: types.ResponseProcessProposal_REJECT}
}
}
return types.ResponseProcessProposal{Status: types.ResponseProcessProposal_ACCEPT}
}

//---------------------------------------------
// update validators

Expand Down
3 changes: 3 additions & 0 deletions abci/server/socket_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,9 @@ func (s *SocketServer) handleRequest(req *types.Request, responses chan<- *types
case *types.Request_PrepareProposal:
res := s.app.PrepareProposal(*r.PrepareProposal)
responses <- types.ToResponsePrepareProposal(res)
case *types.Request_ProcessProposal:
res := s.app.ProcessProposal(*r.ProcessProposal)
responses <- types.ToResponseProcessProposal(res)
case *types.Request_LoadSnapshotChunk:
res := s.app.LoadSnapshotChunk(*r.LoadSnapshotChunk)
responses <- types.ToResponseLoadSnapshotChunk(res)
Expand Down
15 changes: 14 additions & 1 deletion abci/types/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import (
context "golang.org/x/net/context"
)

//go:generate mockery --case underscore --name Application
//go:generate ../../scripts/mockery_generate.sh Application

// Application is an interface that enables any finite, deterministic state machine
// to be driven by a blockchain-based replication engine via the ABCI.
// All methods take a RequestXxx argument and return a ResponseXxx argument,
Expand All @@ -21,6 +22,7 @@ type Application interface {
// Consensus Connection
InitChain(RequestInitChain) ResponseInitChain // Initialize blockchain w validators/other info from TendermintCore
PrepareProposal(RequestPrepareProposal) ResponsePrepareProposal
ProcessProposal(RequestProcessProposal) ResponseProcessProposal
BeginBlock(RequestBeginBlock) ResponseBeginBlock // Signals the beginning of a block
DeliverTx(RequestDeliverTx) ResponseDeliverTx // Deliver a tx for full processing
EndBlock(RequestEndBlock) ResponseEndBlock // Signals the end of a block, returns changes to the validator set
Expand Down Expand Up @@ -113,6 +115,11 @@ func (BaseApplication) PrepareProposal(req RequestPrepareProposal) ResponsePrepa
return ResponsePrepareProposal{TxRecords: trs}
}

func (BaseApplication) ProcessProposal(req RequestProcessProposal) ResponseProcessProposal {
return ResponseProcessProposal{
Status: ResponseProcessProposal_ACCEPT}
}

//-------------------------------------------------------

// GRPCApplication is a GRPC wrapper for Application
Expand Down Expand Up @@ -206,3 +213,9 @@ func (app *GRPCApplication) PrepareProposal(
res := app.app.PrepareProposal(*req)
return &res, nil
}

func (app *GRPCApplication) ProcessProposal(
ctx context.Context, req *RequestProcessProposal) (*ResponseProcessProposal, error) {
res := app.app.ProcessProposal(*req)
return &res, nil
}
12 changes: 12 additions & 0 deletions abci/types/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,12 @@ func ToRequestPrepareProposal(req RequestPrepareProposal) *Request {
}
}

func ToRequestProcessProposal(req RequestProcessProposal) *Request {
return &Request{
Value: &Request_ProcessProposal{&req},
}
}

//----------------------------------------

func ToResponseException(errStr string) *Response {
Expand Down Expand Up @@ -268,3 +274,9 @@ func ToResponsePrepareProposal(res ResponsePrepareProposal) *Response {
Value: &Response_PrepareProposal{&res},
}
}

func ToResponseProcessProposal(res ResponseProcessProposal) *Response {
return &Response{
Value: &Response_ProcessProposal{&res},
}
}
16 changes: 15 additions & 1 deletion abci/types/mocks/application.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions abci/types/mocks/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,17 @@ func (m BaseMock) PrepareProposal(input types.RequestPrepareProposal) types.Resp
return ret
}

func (m BaseMock) ProcessProposal(input types.RequestProcessProposal) types.ResponseProcessProposal {
var ret types.ResponseProcessProposal
defer func() {
if r := recover(); r != nil {
ret = m.base.ProcessProposal(input)
}
}()
ret = m.Application.ProcessProposal(input)
return ret
}

// Commit the state and return the application Merkle root hash
func (m BaseMock) Commit() types.ResponseCommit {
var ret types.ResponseCommit
Expand Down
10 changes: 10 additions & 0 deletions abci/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,16 @@ func (r ResponseQuery) IsErr() bool {
return r.Code != CodeTypeOK
}

// IsAccepted returns true if Code is ACCEPT
func (r ResponseProcessProposal) IsAccepted() bool {
return r.Status == ResponseProcessProposal_ACCEPT
}

// IsStatusUnknown returns true if Code is UNKNOWN
func (r ResponseProcessProposal) IsStatusUnknown() bool {
return r.Status == ResponseProcessProposal_UNKNOWN
}

//---------------------------------------------------------------------------
// override JSON marshaling so we emit defaults (ie. disable omitempty)

Expand Down
Loading