Skip to content

Commit

Permalink
Implement Simulator as a Standalone Actor (ethereum#208)
Browse files Browse the repository at this point in the history
sharding: implement service actor
  • Loading branch information
terencechain authored Jul 12, 2018
1 parent 0ce1aa6 commit 3544c0b
Show file tree
Hide file tree
Showing 12 changed files with 114 additions and 50 deletions.
23 changes: 15 additions & 8 deletions sharding/node/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,14 +260,14 @@ func (s *ShardEthereum) registerActorService(config *params.Config, actor string
return err
}

if actor == "notary" {
switch actor {
case "notary":
not, err := notary.NewNotary(config, client, shardp2p, shardChainDB)
if err != nil {
return fmt.Errorf("could not register notary service: %v", err)
}
return s.registerService(not)
} else if actor == "proposer" {

case "proposer":
var pool *txpool.TXPool
if err := s.fetchService(&pool); err != nil {
return err
Expand All @@ -278,12 +278,19 @@ func (s *ShardEthereum) registerActorService(config *params.Config, actor string
return fmt.Errorf("could not register proposer service: %v", err)
}
return s.registerService(prop)
case "simulator":
sim, err := simulator.NewSimulator(config, client, shardp2p, shardID, 15) // 15 second delay between simulator requests.
if err != nil {
return fmt.Errorf("could not register simulator service: %v", err)
}
return s.registerService(sim)
default:
obs, err := observer.NewObserver(shardp2p, shardChainDB, shardID, sync, client)
if err != nil {
return fmt.Errorf("could not register observer service: %v", err)
}
return s.registerService(obs)
}
obs, err := observer.NewObserver(shardp2p, shardChainDB, shardID, sync, client)
if err != nil {
return fmt.Errorf("could not register observer service: %v", err)
}
return s.registerService(obs)
}

func (s *ShardEthereum) registerSimulatorService(actorFlag string, config *params.Config, shardID int) error {
Expand Down
2 changes: 1 addition & 1 deletion sharding/notary/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/crypto"
"github.com/prysmaticlabs/geth-sharding/sharding/types"
"github.com/prysmaticlabs/geth-sharding/sharding/internal"
shardparams "github.com/prysmaticlabs/geth-sharding/sharding/params"
"github.com/prysmaticlabs/geth-sharding/sharding/types"
)

var (
Expand Down
5 changes: 4 additions & 1 deletion sharding/p2p/messages/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,8 @@ go_library(
srcs = ["messages.go"],
importpath = "github.com/prysmaticlabs/geth-sharding/sharding/p2p/messages",
visibility = ["//sharding:__subpackages__"],
deps = ["@com_github_ethereum_go_ethereum//common:go_default_library"],
deps = [
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_ethereum_go_ethereum//core/types:go_default_library",
],
)
7 changes: 7 additions & 0 deletions sharding/p2p/messages/messages.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"math/big"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)

// CollationBodyRequest defines a p2p message being sent over subscription feeds
Expand All @@ -22,3 +23,9 @@ type CollationBodyResponse struct {
HeaderHash *common.Hash
Body []byte
}

// TransactionBroadcast defines the p2p message broadcast from simulators
// to the rest of the actors for transactions to be included in collation.
type TransactionBroadcast struct {
Transaction *types.Transaction
}
1 change: 1 addition & 0 deletions sharding/proposer/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ go_library(
"//sharding/database:go_default_library",
"//sharding/mainchain:go_default_library",
"//sharding/p2p:go_default_library",
"//sharding/p2p/messages:go_default_library",
"//sharding/params:go_default_library",
"//sharding/syncer:go_default_library",
"//sharding/txpool:go_default_library",
Expand Down
20 changes: 14 additions & 6 deletions sharding/proposer/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/prysmaticlabs/geth-sharding/sharding/database"
"github.com/prysmaticlabs/geth-sharding/sharding/mainchain"
"github.com/prysmaticlabs/geth-sharding/sharding/p2p"
"github.com/prysmaticlabs/geth-sharding/sharding/p2p/messages"
"github.com/prysmaticlabs/geth-sharding/sharding/params"
"github.com/prysmaticlabs/geth-sharding/sharding/syncer"
"github.com/prysmaticlabs/geth-sharding/sharding/txpool"
Expand Down Expand Up @@ -73,14 +74,21 @@ func (p *Proposer) Stop() error {

// proposeCollations listens to the transaction feed and submits collations over an interval.
func (p *Proposer) proposeCollations() {
requests := make(chan *gethTypes.Transaction)
p.txpoolSub = p.txpool.TransactionsFeed().Subscribe(requests)
defer close(requests)
feed := p.p2p.Feed(messages.TransactionBroadcast{})
ch := make(chan p2p.Message, 20)
sub := feed.Subscribe(ch)
defer sub.Unsubscribe()
defer close(ch)
for {
select {
case tx := <-requests:
log.Infof("Received transaction: %x", tx.Hash())
if err := p.createCollation(p.ctx, []*gethTypes.Transaction{tx}); err != nil {
case msg := <-ch:
tx, ok := msg.Data.(messages.TransactionBroadcast)
if !ok {
log.Error("Received incorrect p2p message. Wanted a transaction broadcast message")
break
}
log.Infof("Received transaction: %x", tx.Transaction.Hash())
if err := p.createCollation(p.ctx, []*gethTypes.Transaction{tx.Transaction}); err != nil {
log.Errorf("Create collation failed: %v", err)
}
case <-p.ctx.Done():
Expand Down
2 changes: 2 additions & 0 deletions sharding/simulator/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ go_library(
"//sharding/p2p/messages:go_default_library",
"//sharding/params:go_default_library",
"//sharding/syncer:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_ethereum_go_ethereum//core/types:go_default_library",
"@com_github_ethereum_go_ethereum//event:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
Expand Down
30 changes: 30 additions & 0 deletions sharding/simulator/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@ package simulator

import (
"context"
"crypto/rand"
"math/big"
"time"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/prysmaticlabs/geth-sharding/sharding/mainchain"
"github.com/prysmaticlabs/geth-sharding/sharding/p2p"
Expand Down Expand Up @@ -44,6 +47,8 @@ func (s *Simulator) Start() {
log.Info("Starting simulator service")

s.requestFeed = s.p2p.Feed(messages.CollationBodyRequest{})

go s.broadcastTransactions(time.Tick(time.Second*s.delay), s.ctx.Done())
go s.simulateNotaryRequests(s.client.SMCCaller(), s.client.ChainReader(), time.Tick(time.Second*s.delay), s.ctx.Done())
}

Expand Down Expand Up @@ -93,3 +98,28 @@ func (s *Simulator) simulateNotaryRequests(fetcher mainchain.RecordFetcher, read
}
}
}

// broadcastTransactions sends a transaction with random bytes over by a delay period,
// this method is for testing purposes only, and will be replaced by a more functional CLI tool.
func (s *Simulator) broadcastTransactions(delayChan <-chan time.Time, done <-chan struct{}) {
for {
select {
// Makes sure to close this goroutine when the service stops.
case <-done:
log.Debug("Simulator context closed, exiting goroutine")
return
case <-delayChan:
tx := createTestTx()
s.p2p.Broadcast(messages.TransactionBroadcast{Transaction: tx})
log.Info("Transaction broadcasted")
}
}
}

// createTestTx is a helper method to generate tx with random data bytes.
// it is used for broadcastTransactions.
func createTestTx() *types.Transaction {
data := make([]byte, 1024)
rand.Read(data)
return types.NewTransaction(0, common.HexToAddress("0x0"), nil, 0, nil, data)
}
40 changes: 39 additions & 1 deletion sharding/simulator/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (

"github.com/prysmaticlabs/geth-sharding/sharding/mainchain"

ethereum "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
gethTypes "github.com/ethereum/go-ethereum/core/types"
Expand Down Expand Up @@ -229,3 +229,41 @@ func TestSimulateNotaryRequests(t *testing.T) {
exitRoutine <- true
hook.Reset()
}

// This test verifies actor simulator can successfully broadcast
// transactions to rest of the peers.
func TestBroadcastTransactions(t *testing.T) {
hook := logTest.NewGlobal()

shardID := 0
server, err := p2p.NewServer()
if err != nil {
t.Fatalf("Unable to setup p2p server: %v", err)
}

simulator, err := NewSimulator(params.DefaultConfig, &mainchain.SMCClient{}, server, shardID, 1)
if err != nil {
t.Fatalf("Unable to setup simulator service: %v", err)
}

delayChan := make(chan time.Time)
doneChan := make(chan struct{})
exitRoutine := make(chan bool)

go func() {
simulator.broadcastTransactions(delayChan, doneChan)
<-exitRoutine
}()

delayChan <- time.Time{}
doneChan <- struct{}{}

msg := hook.AllEntries()[0].Message
want := "Transaction broadcasted"
if msg != want {
t.Errorf("incorrect log, expected %s, got %s", want, msg)
}

exitRoutine <- true
hook.Reset()
}
2 changes: 1 addition & 1 deletion sharding/syncer/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import (

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/prysmaticlabs/geth-sharding/sharding/types"
"github.com/prysmaticlabs/geth-sharding/sharding/mainchain"
"github.com/prysmaticlabs/geth-sharding/sharding/p2p"
"github.com/prysmaticlabs/geth-sharding/sharding/p2p/messages"
"github.com/prysmaticlabs/geth-sharding/sharding/types"
)

// RespondCollationBody is called by a node responding to another node's request
Expand Down
2 changes: 0 additions & 2 deletions sharding/txpool/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ go_library(
visibility = ["//sharding:__subpackages__"],
deps = [
"//sharding/p2p:go_default_library",
"@com_github_ethereum_go_ethereum//common:go_default_library",
"@com_github_ethereum_go_ethereum//core/types:go_default_library",
"@com_github_ethereum_go_ethereum//event:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
],
Expand Down
30 changes: 0 additions & 30 deletions sharding/txpool/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,6 @@
package txpool

import (
"crypto/rand"
"time"

"github.com/ethereum/go-ethereum/common"
gethTypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/event"
"github.com/prysmaticlabs/geth-sharding/sharding/p2p"
log "github.com/sirupsen/logrus"
Expand All @@ -16,7 +11,6 @@ import (
type TXPool struct {
p2p *p2p.Server
transactionsFeed *event.Feed
ticker *time.Ticker
}

// NewTXPool creates a new observer instance.
Expand All @@ -27,34 +21,10 @@ func NewTXPool(p2p *p2p.Server) (*TXPool, error) {
// Start the main routine for a shard transaction pool.
func (p *TXPool) Start() {
log.Info("Starting shard txpool service")
go p.sendTestTransaction()
}

// Stop the main loop for a transaction pool in the shard network.
func (p *TXPool) Stop() error {
log.Info("Stopping shard txpool service")
p.ticker.Stop()
return nil
}

func (p *TXPool) TransactionsFeed() *event.Feed {
return p.transactionsFeed
}

// sendTestTransaction sends a transaction with random bytes over a 5 second interval.
// This method is for testing purposes only, and will be replaced by a more functional CLI tool.
func (p *TXPool) sendTestTransaction() {
p.ticker = time.NewTicker(5 * time.Second)

for range p.ticker.C {
tx := createTestTransaction()
nsent := p.transactionsFeed.Send(tx)
log.Infof("Sent transaction %x to %d subscribers", tx.Hash(), nsent)
}
}

func createTestTransaction() *gethTypes.Transaction {
data := make([]byte, 1024)
rand.Read(data)
return gethTypes.NewTransaction(0, common.HexToAddress("0x0"), nil, 0, nil, data)
}

0 comments on commit 3544c0b

Please sign in to comment.