Skip to content

Commit

Permalink
Announce timestamp (ethereum#745)
Browse files Browse the repository at this point in the history
  • Loading branch information
kevjue authored and celo-ci-bot-user committed Dec 11, 2019
1 parent 666de6d commit 5d4224e
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 68 deletions.
32 changes: 19 additions & 13 deletions consensus/istanbul/backend/announce.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"sort"
"time"

"github.com/syndtr/goleveldb/leveldb"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/istanbul"
vet "github.com/ethereum/go-ethereum/consensus/istanbul/backend/internal/enodes"
Expand All @@ -51,11 +53,11 @@ func (ar *announceRecord) String() string {
type announceData struct {
AnnounceRecords []*announceRecord
EnodeURLHash common.Hash
View *istanbul.View
Timestamp uint
}

func (ad *announceData) String() string {
return fmt.Sprintf("{View: %v, EnodeURLHash: %v, AnnounceRecords: %v}", ad.View, ad.EnodeURLHash.Hex(), ad.AnnounceRecords)
return fmt.Sprintf("{Timestamp: %v, EnodeURLHash: %v, AnnounceRecords: %v}", ad.Timestamp, ad.EnodeURLHash.Hex(), ad.AnnounceRecords)
}

// ==============================================
Expand Down Expand Up @@ -83,21 +85,21 @@ func (ar *announceRecord) DecodeRLP(s *rlp.Stream) error {

// EncodeRLP serializes ad into the Ethereum RLP format.
func (ad *announceData) EncodeRLP(w io.Writer) error {
return rlp.Encode(w, []interface{}{ad.AnnounceRecords, ad.EnodeURLHash, ad.View})
return rlp.Encode(w, []interface{}{ad.AnnounceRecords, ad.EnodeURLHash, ad.Timestamp})
}

// DecodeRLP implements rlp.Decoder, and load the ad fields from a RLP stream.
func (ad *announceData) DecodeRLP(s *rlp.Stream) error {
var msg struct {
AnnounceRecords []*announceRecord
EnodeURLHash common.Hash
View *istanbul.View
Timestamp uint
}

if err := s.Decode(&msg); err != nil {
return err
}
ad.AnnounceRecords, ad.EnodeURLHash, ad.View = msg.AnnounceRecords, msg.EnodeURLHash, msg.View
ad.AnnounceRecords, ad.EnodeURLHash, ad.Timestamp = msg.AnnounceRecords, msg.EnodeURLHash, msg.Timestamp
return nil
}

Expand Down Expand Up @@ -136,7 +138,6 @@ func (sb *Backend) generateIstAnnounce() (*istanbul.Message, error) {
} else {
enodeUrl = sb.p2pserver.Self().String()
}
view := sb.core.CurrentView()

// If the message is not within the registered validator set, then ignore it
regAndActiveVals, err := sb.retrieveActiveAndRegisteredValidators()
Expand All @@ -153,7 +154,8 @@ func (sb *Backend) generateIstAnnounce() (*istanbul.Message, error) {
announceData := &announceData{
AnnounceRecords: announceRecords,
EnodeURLHash: istanbul.RLPHash(enodeUrl),
View: view,
// Unix() returns a int64, but we need a uint for the golang rlp encoding implmentation. Warning: This timestamp value will be truncated in 2106.
Timestamp: uint(time.Now().Unix()),
}

announceBytes, err := rlp.EncodeToBytes(announceData)
Expand Down Expand Up @@ -256,11 +258,15 @@ func (sb *Backend) handleIstAnnounce(payload []byte) error {
return err
}

logger = logger.New("msgAddress", msg.Address, "msg_round", announceData.View.Round, "msg_seq", announceData.View.Sequence)
logger = logger.New("msgAddress", msg.Address, "msg_timestamp", announceData.Timestamp)

if view, err := sb.valEnodeTable.GetViewFromAddress(msg.Address); err == nil && announceData.View.Cmp(view) <= 0 {
logger.Trace("Received an old announce message", "senderAddr", msg.Address, "messageView", announceData.View, "currentEntryView", view)
return errOldAnnounceMessage
if currentEntryTimestamp, err := sb.valEnodeTable.GetTimestampFromAddress(msg.Address); err == nil {
if announceData.Timestamp < currentEntryTimestamp {
logger.Trace("Received an old announce message", "currentEntryTimestamp", currentEntryTimestamp)
return errOldAnnounceMessage
}
} else if err != leveldb.ErrNotFound {
logger.Warn("Error when retrieving timestamp for entry in the ValEnodeTable", "err", err)
}

// If the message is not within the registered validator set, then ignore it
Expand Down Expand Up @@ -299,7 +305,7 @@ func (sb *Backend) handleIstAnnounce(payload []byte) error {
}
// Save in the valEnodeTable if mining
if sb.coreStarted && node != nil {
if err := sb.valEnodeTable.Upsert(map[common.Address]*vet.AddressEntry{msg.Address: {Node: node, View: announceData.View}}); err != nil {
if err := sb.valEnodeTable.Upsert(map[common.Address]*vet.AddressEntry{msg.Address: {Node: node, Timestamp: announceData.Timestamp}}); err != nil {
logger.Warn("Error in upserting a valenode entry", "AnnounceData", announceData.String(), "error", err)
return err
}
Expand All @@ -315,7 +321,7 @@ func (sb *Backend) handleIstAnnounce(payload []byte) error {
}

func (sb *Backend) regossipIstAnnounce(msg *istanbul.Message, payload []byte, announceData announceData, regAndActiveVals map[common.Address]bool, destAddresses []string) error {
logger := sb.logger.New("func", "regossipIstAnnounce", "msgAddress", msg.Address, "msg_round", announceData.View.Round, "msg_seq", announceData.View.Sequence)
logger := sb.logger.New("func", "regossipIstAnnounce", "msgAddress", msg.Address, "msg_timestamp", announceData.Timestamp)
// If we gossiped this address/enodeURL within the last 60 seconds and the enodeURLHash and destAddressHash didn't change, then don't regossip

// Generate the destAddresses hash
Expand Down
48 changes: 27 additions & 21 deletions consensus/istanbul/backend/internal/enodes/val_enode_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ const (
const (
// dbNodeExpiration = 24 * time.Hour // Time after which an unseen node should be dropped.
// dbCleanupCycle = time.Hour // Time period for running the expiration task.
dbVersion = 1
dbVersion = 2
)

// ValidatorEnodeHandler is handler to Add/Remove events. Events execute within write lock
Expand All @@ -74,25 +74,25 @@ func nodeIDKey(nodeID enode.ID) []byte {
return append([]byte(dbNodeIDPrefix), nodeID.Bytes()...)
}

// Entries for the valEnodeTable
// AddressEntry is an entry for the valEnodeTable
type AddressEntry struct {
Node *enode.Node
View *istanbul.View
Node *enode.Node
Timestamp uint
}

func (ve *AddressEntry) String() string {
return fmt.Sprintf("{enodeURL: %v, view: %v}", ve.Node.String(), ve.View)
return fmt.Sprintf("{enodeURL: %v, timestamp: %v}", ve.Node.String(), ve.Timestamp)
}

// Implement RLP Encode/Decode interface
type rlpEntry struct {
EnodeURL string
View *istanbul.View
EnodeURL string
Timestamp uint
}

// EncodeRLP serializes AddressEntry into the Ethereum RLP format.
func (ve *AddressEntry) EncodeRLP(w io.Writer) error {
return rlp.Encode(w, rlpEntry{ve.Node.String(), ve.View})
return rlp.Encode(w, rlpEntry{ve.Node.String(), ve.Timestamp})
}

// DecodeRLP implements rlp.Decoder, and load the AddressEntry fields from a RLP stream.
Expand All @@ -107,7 +107,7 @@ func (ve *AddressEntry) DecodeRLP(s *rlp.Stream) error {
return err
}

*ve = AddressEntry{Node: node, View: entry.View}
*ve = AddressEntry{Node: node, Timestamp: entry.Timestamp}
return nil
}

Expand All @@ -125,10 +125,13 @@ type ValidatorEnodeDB struct {
func OpenValidatorEnodeDB(path string, handler ValidatorEnodeHandler) (*ValidatorEnodeDB, error) {
var db *leveldb.DB
var err error

logger := log.New()

if path == "" {
db, err = newMemoryDB()
} else {
db, err = newPersistentDB(path)
db, err = newPersistentDB(path, logger)
}

if err != nil {
Expand All @@ -137,7 +140,7 @@ func OpenValidatorEnodeDB(path string, handler ValidatorEnodeHandler) (*Validato
return &ValidatorEnodeDB{
db: db,
handler: handler,
logger: log.New(),
logger: logger,
}, nil
}

Expand All @@ -152,7 +155,7 @@ func newMemoryDB() (*leveldb.DB, error) {

// newPersistentNodeDB creates/opens a leveldb backed persistent node database,
// also flushing its contents in case of a version mismatch.
func newPersistentDB(path string) (*leveldb.DB, error) {
func newPersistentDB(path string, logger log.Logger) (*leveldb.DB, error) {
opts := &opt.Options{OpenFilesCacheCapacity: 5}
db, err := leveldb.OpenFile(path, opts)
if _, iscorrupted := err.(*lvlerrors.ErrCorrupted); iscorrupted {
Expand All @@ -178,11 +181,14 @@ func newPersistentDB(path string) (*leveldb.DB, error) {
case nil:
// Version present, flush if different
if !bytes.Equal(blob, currentVer) {
oldVersion, _ := binary.Varint(blob)
newVersion, _ := binary.Varint(currentVer)
logger.Info("Val Enode DB version has changed. Creating a new leveldb.", "old version", oldVersion, "new version", newVersion)
db.Close()
if err = os.RemoveAll(path); err != nil {
return nil, err
}
return newPersistentDB(path)
return newPersistentDB(path, logger)
}
}
return db, nil
Expand Down Expand Up @@ -222,15 +228,15 @@ func (vet *ValidatorEnodeDB) GetNodeFromAddress(address common.Address) (*enode.
return entry.Node, nil
}

// GetViewFromAddress will return the view for an address if it's known
func (vet *ValidatorEnodeDB) GetViewFromAddress(address common.Address) (*istanbul.View, error) {
// GetTimestampFromAddress will return the timestamp for an address if it's known
func (vet *ValidatorEnodeDB) GetTimestampFromAddress(address common.Address) (uint, error) {
vet.lock.RLock()
defer vet.lock.RUnlock()
entry, err := vet.getAddressEntry(address)
if err != nil {
return nil, err
return 0, err
}
return entry.View, nil
return entry.Timestamp, nil
}

// GetAddressFromNodeID will return the address for an nodeID if it's known
Expand Down Expand Up @@ -265,7 +271,7 @@ func (vet *ValidatorEnodeDB) GetAllValEnodes() (map[common.Address]*AddressEntry
}

// Upsert will update or insert a validator enode entry; given that the existing entry
// is older (determined by view parameter) that the new one
// is older (determined by timestamp parameter) than the new one
// TODO - In addition to modifying the val_enode_db, this function also will disconnect
// and/or connect the corresponding validator connenctions. The validator connections
// should be managed be a separate thread (see https://github.com/celo-org/celo-blockchain/issues/607)
Expand Down Expand Up @@ -293,9 +299,9 @@ func (vet *ValidatorEnodeDB) Upsert(valEnodeEntries map[common.Address]*AddressE
}

// If it's an old message, ignore it
if !isNew && addressEntry.View.Cmp(currentEntry.View) <= 0 {
vet.logger.Trace("Ignoring the entry because it's view is older than what is stored in the val enode db",
"entryAddress", remoteAddress, "entryEnodeURL", addressEntry.Node.String(), "addressView", addressEntry.View)
if !isNew && addressEntry.Timestamp < currentEntry.Timestamp {
vet.logger.Trace("Ignoring the entry because its timestamp is older than what is stored in the val enode db",
"entryAddress", remoteAddress, "newEntry", addressEntry.String(), "currentEntry", currentEntry.String())
continue
}

Expand Down
29 changes: 10 additions & 19 deletions consensus/istanbul/backend/internal/enodes/val_enode_db_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package enodes

import (
"math/big"
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/consensus/istanbul"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/rlp"
"github.com/syndtr/goleveldb/leveldb"
Expand All @@ -20,13 +18,6 @@ var (
nodeB, _ = enode.ParseV4(enodeURLB)
)

func view(sequence, round int64) *istanbul.View {
return &istanbul.View{
Sequence: big.NewInt(sequence),
Round: big.NewInt(round),
}
}

type mockListener struct{}

func (ml *mockListener) AddValidatorPeer(node *enode.Node, address common.Address) {}
Expand All @@ -40,7 +31,7 @@ func TestSimpleCase(t *testing.T) {
t.Fatal("Failed to open DB")
}

addressEntry := &AddressEntry{Node: nodeA, View: view(0, 1)}
addressEntry := &AddressEntry{Node: nodeA, Timestamp: 1}

err = vet.Upsert(map[common.Address]*AddressEntry{addressA: addressEntry})
if err != nil {
Expand Down Expand Up @@ -70,7 +61,7 @@ func TestDeleteEntry(t *testing.T) {
t.Fatal("Failed to open DB")
}

addressEntry := &AddressEntry{Node: nodeA, View: view(0, 2)}
addressEntry := &AddressEntry{Node: nodeA, Timestamp: 2}

err = vet.Upsert(map[common.Address]*AddressEntry{addressA: addressEntry})
if err != nil {
Expand Down Expand Up @@ -100,8 +91,8 @@ func TestPruneEntries(t *testing.T) {

batch := make(map[common.Address]*AddressEntry)

batch[addressA] = &AddressEntry{Node: nodeA, View: view(0, 2)}
batch[addressB] = &AddressEntry{Node: nodeB, View: view(0, 2)}
batch[addressA] = &AddressEntry{Node: nodeA, Timestamp: 2}
batch[addressB] = &AddressEntry{Node: nodeB, Timestamp: 2}

vet.Upsert(batch)

Expand All @@ -122,7 +113,7 @@ func TestPruneEntries(t *testing.T) {
}

func TestRLPEntries(t *testing.T) {
original := AddressEntry{Node: nodeA, View: view(0, 1)}
original := AddressEntry{Node: nodeA, Timestamp: 1}

rawEntry, err := rlp.EncodeToBytes(&original)
if err != nil {
Expand All @@ -137,8 +128,8 @@ func TestRLPEntries(t *testing.T) {
if result.Node.String() != original.Node.String() {
t.Errorf("node doesn't match: got: %s expected: %s", result.Node.String(), original.Node.String())
}
if result.View.Cmp(original.View) != 0 {
t.Errorf("view doesn't match: got: %v expected: %v", result.View, original.View)
if result.Timestamp != original.Timestamp {
t.Errorf("timestamp doesn't match: got: %v expected: %v", result.Timestamp, original.Timestamp)
}
}

Expand All @@ -150,12 +141,12 @@ func TestTableToString(t *testing.T) {

batch := make(map[common.Address]*AddressEntry)

batch[addressA] = &AddressEntry{Node: nodeA, View: view(0, 2)}
batch[addressB] = &AddressEntry{Node: nodeB, View: view(0, 2)}
batch[addressA] = &AddressEntry{Node: nodeA, Timestamp: 2}
batch[addressB] = &AddressEntry{Node: nodeB, Timestamp: 2}

vet.Upsert(batch)

expected := "ValEnodeTable: [0x00Ce0d46d924CC8437c806721496599FC3FFA268 => {enodeURL: enode://1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439@127.0.0.1:52150, view: {Round: 2, Sequence: 0}}] [0xfFFFff46D924CCfffFc806721496599fC3FFffff => {enodeURL: enode://38b219b54ed49cf7d802e8add586fc75b531ed2c31e43b5da71c35982b2e6f5c56fa9cfbe39606fe71fbee2566b94c2874e950b1ec88323103c835246e3d0023@127.0.0.1:37303, view: {Round: 2, Sequence: 0}}]"
expected := "ValEnodeTable: [0x00Ce0d46d924CC8437c806721496599FC3FFA268 => {enodeURL: enode://1dd9d65c4552b5eb43d5ad55a2ee3f56c6cbc1c64a5c8d659f51fcd51bace24351232b8d7821617d2b29b54b81cdefb9b3e9c37d7fd5f63270bcc9e1a6f6a439@127.0.0.1:52150, timestamp: 2}] [0xfFFFff46D924CCfffFc806721496599fC3FFffff => {enodeURL: enode://38b219b54ed49cf7d802e8add586fc75b531ed2c31e43b5da71c35982b2e6f5c56fa9cfbe39606fe71fbee2566b94c2874e950b1ec88323103c835246e3d0023@127.0.0.1:37303, timestamp: 2}]"

if vet.String() != expected {
t.Errorf("String() error: got: %s", vet.String())
Expand Down
16 changes: 8 additions & 8 deletions consensus/istanbul/backend/val_enodes_share.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,17 +35,17 @@ import (
// define the validator enode share message

type sharedValidatorEnode struct {
Address common.Address
EnodeURL string
View *istanbul.View
Address common.Address
EnodeURL string
Timestamp uint
}

type valEnodesShareData struct {
ValEnodes []sharedValidatorEnode
}

func (sve *sharedValidatorEnode) String() string {
return fmt.Sprintf("{Address: %s, EnodeURL: %v, View: %v}", sve.Address.Hex(), sve.EnodeURL, sve.View)
return fmt.Sprintf("{Address: %s, EnodeURL: %v, Timestamp: %v}", sve.Address.Hex(), sve.EnodeURL, sve.Timestamp)
}

func (sd *valEnodesShareData) String() string {
Expand Down Expand Up @@ -111,9 +111,9 @@ func (sb *Backend) generateValEnodesShareMsg() (*istanbul.Message, error) {
sharedValidatorEnodes := make([]sharedValidatorEnode, 0, len(vetEntries))
for address, vetEntry := range vetEntries {
sharedValidatorEnodes = append(sharedValidatorEnodes, sharedValidatorEnode{
Address: address,
EnodeURL: vetEntry.Node.String(),
View: vetEntry.View,
Address: address,
EnodeURL: vetEntry.Node.String(),
Timestamp: vetEntry.Timestamp,
})
}

Expand Down Expand Up @@ -201,7 +201,7 @@ func (sb *Backend) handleValEnodesShareMsg(payload []byte) error {
sb.logger.Warn("Error in parsing enodeURL", "enodeURL", sharedValidatorEnode.EnodeURL)
continue
} else {
upsertBatch[sharedValidatorEnode.Address] = &vet.AddressEntry{Node: node, View: sharedValidatorEnode.View}
upsertBatch[sharedValidatorEnode.Address] = &vet.AddressEntry{Node: node, Timestamp: sharedValidatorEnode.Timestamp}
}
}

Expand Down
Loading

0 comments on commit 5d4224e

Please sign in to comment.