From fd6f328ccc1531d3c063eb10505f77d5c6c712bd Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Tue, 9 Nov 2021 20:27:08 +0530 Subject: [PATCH 1/5] telemetry when GRANDPA receieve commit or vote messages Send `afg.received_commit` when grandpa receives a commit message. Send `afg.received_precommit` or `afg.received_prevote` when grandpa receives a vote message Closes #1840 Closes #1839 Closes #1838 --- dot/telemetry/afg_received.go | 82 +++++++++++++++++++++++++++++++++ dot/telemetry/telemetry.go | 13 ++++-- dot/telemetry/telemetry_test.go | 25 ++++++++-- lib/grandpa/grandpa.go | 4 +- lib/grandpa/message_handler.go | 18 ++++++++ lib/grandpa/vote_message.go | 27 +++++++++++ 6 files changed, 159 insertions(+), 10 deletions(-) create mode 100644 dot/telemetry/afg_received.go diff --git a/dot/telemetry/afg_received.go b/dot/telemetry/afg_received.go new file mode 100644 index 0000000000..5e065d1931 --- /dev/null +++ b/dot/telemetry/afg_received.go @@ -0,0 +1,82 @@ +// Copyright 2021 ChainSafe Systems (ON) Corp. +// This file is part of gossamer. +// +// The gossamer library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The gossamer library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the gossamer library. If not, see . + +package telemetry + +import "github.com/ChainSafe/gossamer/lib/common" + +// AfG ("Al's Finality Gadget") is synonymous with GRANDPA. + +type afgReceivedTM struct { + TargetHash common.Hash `json:"target_hash"` + TargetNumber string `json:"target_number"` + Voter string `json:"voter"` +} + +// afgReceivedPrecommitTM holds `afg.received_precommit` telemetry message which is +// supposed to be sent when grandpa client receives a precommit. +type afgReceivedPrecommitTM afgReceivedTM + +// NewAfgReceivedPrecommitTM gets a new afgReceivedPrecommitTM struct. +func NewAfgReceivedPrecommitTM(targetHash common.Hash, targetNumber, voter string) Message { + return &afgReceivedPrecommitTM{ + TargetHash: targetHash, + TargetNumber: targetNumber, + Voter: voter, + } +} + +func (afgReceivedPrecommitTM) messageType() string { + return afgReceivedPrecommitMsg +} + +// afgReceivedPrevoteTM holds `afg.received_prevote` telemetry message which is +// supposed to be sent when grandpa client receives a prevote. +type afgReceivedPrevoteTM afgReceivedTM + +// NewAfgReceivedPrevoteTM gets a new afgReceivedPrevoteTM struct. +func NewAfgReceivedPrevoteTM(targetHash common.Hash, targetNumber, voter string) Message { + return &afgReceivedPrevoteTM{ + TargetHash: targetHash, + TargetNumber: targetNumber, + Voter: voter, + } +} + +func (t afgReceivedPrevoteTM) messageType() string { + return afgReceivedPrevoteMsg +} + +// afgReceivedCommitTM holds `afg.received_commit` telemetry message which is +// supposed to be sent when grandpa client receives a commit. +type afgReceivedCommitTM struct { + TargetHash common.Hash `json:"target_hash"` + TargetNumber string `json:"target_number"` + ContainsPrecommitsSignedBy []string `json:"contains_precommits_signed_by"` +} + +// NewAfgReceivedCommitTM gets a new afgReceivedCommitTM struct. +func NewAfgReceivedCommitTM(targetHash common.Hash, targetNumber string, containsPrecommitsSignedBy []string) Message { + return &afgReceivedCommitTM{ + TargetHash: targetHash, + TargetNumber: targetNumber, + ContainsPrecommitsSignedBy: containsPrecommitsSignedBy, + } +} + +func (afgReceivedCommitTM) messageType() string { + return afgReceivedCommitMsg +} diff --git a/dot/telemetry/telemetry.go b/dot/telemetry/telemetry.go index 3788a4831d..5397f4f275 100644 --- a/dot/telemetry/telemetry.go +++ b/dot/telemetry/telemetry.go @@ -29,11 +29,14 @@ import ( // telemetry message types const ( - notifyFinalizedMsg = "notify.finalized" - blockImportMsg = "block.import" - systemNetworkStateMsg = "system.network_state" - systemConnectedMsg = "system.connected" - systemIntervalMsg = "system.interval" + notifyFinalizedMsg = "notify.finalized" + blockImportMsg = "block.import" + systemNetworkStateMsg = "system.network_state" + systemConnectedMsg = "system.connected" + systemIntervalMsg = "system.interval" + afgReceivedPrecommitMsg = "afg.received_precommit" + afgReceivedPrevoteMsg = "afg.received_prevote" + afgReceivedCommitMsg = "afg.received_commit" ) type telemetryConnection struct { diff --git a/dot/telemetry/telemetry_test.go b/dot/telemetry/telemetry_test.go index 898f7127c5..0efb3d313e 100644 --- a/dot/telemetry/telemetry_test.go +++ b/dot/telemetry/telemetry_test.go @@ -43,7 +43,7 @@ func TestMain(m *testing.M) { func TestHandler_SendMulti(t *testing.T) { var wg sync.WaitGroup - wg.Add(6) + wg.Add(9) resultCh = make(chan []byte) @@ -92,6 +92,21 @@ func TestHandler_SendMulti(t *testing.T) { wg.Done() }() + go func() { + GetInstance().SendMessage(NewAfgReceivedCommitTM(common.MustHexToHash("0x5814aec3e28527f81f65841e034872f3a30337cf6c33b2d258bba6071e37e27c"), "1", []string{})) + wg.Done() + }() + + go func() { + GetInstance().SendMessage(NewAfgReceivedPrecommitTM(common.MustHexToHash("0x5814aec3e28527f81f65841e034872f3a30337cf6c33b2d258bba6071e37e27c"), "1", "")) + wg.Done() + }() + + go func() { + GetInstance().SendMessage(NewAfgReceivedPrevoteTM(common.MustHexToHash("0x5814aec3e28527f81f65841e034872f3a30337cf6c33b2d258bba6071e37e27c"), "1", "")) + wg.Done() + }() + wg.Wait() expected1 := []byte(`{"authority":false,"chain":"chain","genesis_hash":"0x91b171bb158e2d3848fa23a9f1c25182fb8e20313b2c1eb49219da7a70ce90c3","implementation":"systemName","msg":"system.connected","name":"nodeName","network_id":"netID","startup_time":"startTime","ts":`) @@ -100,13 +115,15 @@ func TestHandler_SendMulti(t *testing.T) { expected4 := []byte(`{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","finalized_hash":"0x687197c11b4cf95374159843e7f46fbcd63558db981aaef01a8bac2a44a1d6b2","finalized_height":32256,"height":32375,"msg":"system.interval","ts":`) // nolint expected5 := []byte(`{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","height":"32375","msg":"notify.finalized","ts":`) expected6 := []byte(`{"hash":"0x5814aec3e28527f81f65841e034872f3a30337cf6c33b2d258bba6071e37e27c","msg":"prepared_block_for_proposing","number":"1","ts":`) - - expected := [][]byte{expected1, expected3, expected4, expected5, expected2, expected6} + expected7 := []byte(`{"contains_precommits_signed_by":[],"msg":"afg.received_commit","target_hash":"0x5814aec3e28527f81f65841e034872f3a30337cf6c33b2d258bba6071e37e27c","target_number":"1","ts":`) + expected8 := []byte(`{"msg":"afg.received_precommit","target_hash":"0x5814aec3e28527f81f65841e034872f3a30337cf6c33b2d258bba6071e37e27c","target_number":"1","ts":`) + expected9 := []byte(`{"msg":"afg.received_prevote","target_hash":"0x5814aec3e28527f81f65841e034872f3a30337cf6c33b2d258bba6071e37e27c","target_number":"1","ts":`) + expected := [][]byte{expected1, expected3, expected4, expected5, expected2, expected7, expected6, expected8, expected9} var actual [][]byte for data := range resultCh { actual = append(actual, data) - if len(actual) == 6 { + if len(actual) == 9 { break } } diff --git a/lib/grandpa/grandpa.go b/lib/grandpa/grandpa.go index 43e3d5af6e..a79dda71f7 100644 --- a/lib/grandpa/grandpa.go +++ b/lib/grandpa/grandpa.go @@ -487,6 +487,7 @@ func (s *Service) playGrandpaRound() error { go s.sendVoteMessage(prevote, vm, roundComplete) logger.Debug("receiving pre-commit messages...") + // through goroutine s.receiveMessages(ctx) time.Sleep(s.interval) if s.paused.Load().(bool) { @@ -530,9 +531,10 @@ func (s *Service) sendVoteMessage(stage Subround, msg *VoteMessage, roundComplet if err := s.sendMessage(msg); err != nil { logger.Warn("could not send message", "stage", stage, "error", err) + } else { + logger.Trace("sent vote message", "stage", stage, "vote", msg) } - logger.Trace("sent vote message", "stage", stage, "vote", msg) select { case <-roundComplete: return diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index 78864bbf27..18d649423b 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -24,6 +24,7 @@ import ( "reflect" "github.com/ChainSafe/gossamer/dot/network" + "github.com/ChainSafe/gossamer/dot/telemetry" "github.com/ChainSafe/gossamer/dot/types" "github.com/ChainSafe/gossamer/lib/blocktree" "github.com/ChainSafe/gossamer/lib/common" @@ -105,6 +106,23 @@ func (h *MessageHandler) handleNeighbourMessage(msg *NeighbourMessage) error { func (h *MessageHandler) handleCommitMessage(msg *CommitMessage) error { logger.Debug("received commit message", "msg", msg) + + contains_precommits_signed_by := make([]string, len(msg.AuthData)) + for i, authData := range msg.AuthData { + contains_precommits_signed_by[i] = authData.AuthorityID.String() + } + + err := telemetry.GetInstance().SendMessage( + telemetry.NewAfgReceivedCommitTM( + msg.Vote.Hash, + fmt.Sprintf("%d", msg.Vote.Number), + contains_precommits_signed_by, + ), + ) + if err != nil { + logger.Debug("problem sending afg.received_commit telemetry message", "err", err) + } + if has, _ := h.blockState.HasFinalisedBlock(msg.Round, h.grandpa.state.setID); has { return nil } diff --git a/lib/grandpa/vote_message.go b/lib/grandpa/vote_message.go index 4ddef6f047..e3a11ba67b 100644 --- a/lib/grandpa/vote_message.go +++ b/lib/grandpa/vote_message.go @@ -20,7 +20,9 @@ import ( "bytes" "context" "errors" + "fmt" + "github.com/ChainSafe/gossamer/dot/telemetry" "github.com/ChainSafe/gossamer/lib/blocktree" "github.com/ChainSafe/gossamer/lib/crypto/ed25519" "github.com/ChainSafe/gossamer/pkg/scale" @@ -49,6 +51,31 @@ func (s *Service) receiveMessages(ctx context.Context) { logger.Trace("received vote message", "msg", msg.msg, "from", msg.from) vm := msg.msg + switch vm.Message.Stage { + case prevote: + err := telemetry.GetInstance().SendMessage( + telemetry.NewAfgReceivedPrevoteTM( + vm.Message.Hash, + fmt.Sprintf("%d", vm.Message.Number), + vm.Message.AuthorityID.String(), + ), + ) + if err != nil { + logger.Debug("problem sending afg.received_prevote telemetry message", "err", err) + } + case precommit: + err := telemetry.GetInstance().SendMessage( + telemetry.NewAfgReceivedPrecommitTM( + vm.Message.Hash, + fmt.Sprintf("%d", vm.Message.Number), + vm.Message.AuthorityID.String(), + ), + ) + if err != nil { + logger.Debug("problem sending afg.received_precommit telemetry message", "err", err) + } + } + v, err := s.validateMessage(msg.from, vm) if err != nil { logger.Debug("failed to validate vote message", "message", vm, "error", err) From 897fd37511f2b575c31bd23bd198dd2eb71e74f1 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Mon, 15 Nov 2021 12:01:44 +0530 Subject: [PATCH 2/5] fix tests --- dot/telemetry/telemetry_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dot/telemetry/telemetry_test.go b/dot/telemetry/telemetry_test.go index b6f3dec66b..38489754e0 100644 --- a/dot/telemetry/telemetry_test.go +++ b/dot/telemetry/telemetry_test.go @@ -53,6 +53,9 @@ func TestHandler_SendMulti(t *testing.T) { []byte(`{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","height":"32375","msg":"notify.finalized","ts":`), []byte(`{"hash":"0x5814aec3e28527f81f65841e034872f3a30337cf6c33b2d258bba6071e37e27c","msg":"prepared_block_for_proposing","number":"1","ts":`), []byte(`{"future":2,"msg":"txpool.import","ready":1,"ts":`), + []byte(`{"contains_precommits_signed_by":[],"msg":"afg.received_commit","target_hash":"0x5814aec3e28527f81f65841e034872f3a30337cf6c33b2d258bba6071e37e27c","target_number":"1","ts":`), + []byte(`{"msg":"afg.received_precommit","target_hash":"0x5814aec3e28527f81f65841e034872f3a30337cf6c33b2d258bba6071e37e27c","target_number":"1","ts":`), + []byte(`{"msg":"afg.received_prevote","target_hash":"0x5814aec3e28527f81f65841e034872f3a30337cf6c33b2d258bba6071e37e27c","target_number":"1","ts":`), } messages := []Message{ From cd3382f23c4a1a7048127bb4f31204637ac1a015 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Mon, 15 Nov 2021 12:15:47 +0530 Subject: [PATCH 3/5] one more small fix --- dot/telemetry/afg_received.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dot/telemetry/afg_received.go b/dot/telemetry/afg_received.go index 5e065d1931..53f861fa52 100644 --- a/dot/telemetry/afg_received.go +++ b/dot/telemetry/afg_received.go @@ -56,7 +56,7 @@ func NewAfgReceivedPrevoteTM(targetHash common.Hash, targetNumber, voter string) } } -func (t afgReceivedPrevoteTM) messageType() string { +func (afgReceivedPrevoteTM) messageType() string { return afgReceivedPrevoteMsg } From e484db7c014bd4fce730ee13715297eec1d120c9 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Wed, 17 Nov 2021 13:36:54 +0530 Subject: [PATCH 4/5] addressed comments --- lib/grandpa/vote_message.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/grandpa/vote_message.go b/lib/grandpa/vote_message.go index 374db4156b..58a0110a46 100644 --- a/lib/grandpa/vote_message.go +++ b/lib/grandpa/vote_message.go @@ -61,6 +61,8 @@ func (s *Service) receiveMessages(ctx context.Context) { if err != nil { logger.Debugf("problem sending afg.received_precommit telemetry message err: %s", err) } + default: + logger.Warnf("unsupported stage %s", vm.Message.Stage.String()) } v, err := s.validateMessage(msg.from, vm) From 21cb548cd6a404f9a24576279c76e195ee153c64 Mon Sep 17 00:00:00 2001 From: Kishan Sagathiya Date: Mon, 29 Nov 2021 19:17:39 +0530 Subject: [PATCH 5/5] addressed some reviews --- dot/telemetry/afg_received.go | 17 ++--------------- dot/telemetry/telemetry_test.go | 2 +- dot/telemetry/txpool_import.go | 17 ++--------------- lib/grandpa/message_handler.go | 4 ++-- lib/grandpa/vote_message.go | 8 ++++---- 5 files changed, 11 insertions(+), 37 deletions(-) diff --git a/dot/telemetry/afg_received.go b/dot/telemetry/afg_received.go index 53f861fa52..299cd1e3cd 100644 --- a/dot/telemetry/afg_received.go +++ b/dot/telemetry/afg_received.go @@ -1,18 +1,5 @@ -// Copyright 2021 ChainSafe Systems (ON) Corp. -// This file is part of gossamer. -// -// The gossamer library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The gossamer library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the gossamer library. If not, see . +// Copyright 2021 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only package telemetry diff --git a/dot/telemetry/telemetry_test.go b/dot/telemetry/telemetry_test.go index 131ce04363..eef1c59805 100644 --- a/dot/telemetry/telemetry_test.go +++ b/dot/telemetry/telemetry_test.go @@ -51,7 +51,7 @@ func TestHandler_SendMulti(t *testing.T) { []byte(`{"bandwidth_download":2,"bandwidth_upload":3,"msg":"system.interval","peers":1,"ts":`), []byte(`{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","finalized_hash":"0x687197c11b4cf95374159843e7f46fbcd63558db981aaef01a8bac2a44a1d6b2","finalized_height":32256,"height":32375,"msg":"system.interval","ts":`), //nolint:lll []byte(`{"best":"0x07b749b6e20fd5f1159153a2e790235018621dd06072a62bcd25e8576f6ff5e6","height":"32375","msg":"notify.finalized","ts":`), //nolint:lll - []byte(`{"hash":"0x5814aec3e28527f81f65841e034872f3a30337cf6c33b2d258bba6071e37e27c","msg":"prepared_block_for_proposing","number":"1","ts":`), //nolint:lll //nolint:lll + []byte(`{"hash":"0x5814aec3e28527f81f65841e034872f3a30337cf6c33b2d258bba6071e37e27c","msg":"prepared_block_for_proposing","number":"1","ts":`), //nolint:lll []byte(`{"future":2,"msg":"txpool.import","ready":1,"ts":`), []byte(`{"contains_precommits_signed_by":[],"msg":"afg.received_commit","target_hash":"0x5814aec3e28527f81f65841e034872f3a30337cf6c33b2d258bba6071e37e27c","target_number":"1","ts":`), //nolint:lll []byte(`{"msg":"afg.received_precommit","target_hash":"0x5814aec3e28527f81f65841e034872f3a30337cf6c33b2d258bba6071e37e27c","target_number":"1","ts":`), //nolint:lll diff --git a/dot/telemetry/txpool_import.go b/dot/telemetry/txpool_import.go index eb5abec889..9da38c1a58 100644 --- a/dot/telemetry/txpool_import.go +++ b/dot/telemetry/txpool_import.go @@ -1,18 +1,5 @@ -// Copyright 2021 ChainSafe Systems (ON) Corp. -// This file is part of gossamer. -// -// The gossamer library is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// The gossamer library is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with the gossamer library. If not, see . +// Copyright 2021 ChainSafe Systems (ON) +// SPDX-License-Identifier: LGPL-3.0-only package telemetry diff --git a/lib/grandpa/message_handler.go b/lib/grandpa/message_handler.go index 513e8a540f..57dd870449 100644 --- a/lib/grandpa/message_handler.go +++ b/lib/grandpa/message_handler.go @@ -102,12 +102,12 @@ func (h *MessageHandler) handleCommitMessage(msg *CommitMessage) error { err := telemetry.GetInstance().SendMessage( telemetry.NewAfgReceivedCommitTM( msg.Vote.Hash, - fmt.Sprintf("%d", msg.Vote.Number), + fmt.Sprint(msg.Vote.Number), containsPrecommitsSignedBy, ), ) if err != nil { - logger.Debugf("problem sending afg.received_commit telemetry message, err: %s", err) + logger.Debugf("problem sending afg.received_commit telemetry message: %s", err) } if has, _ := h.blockState.HasFinalisedBlock(msg.Round, h.grandpa.state.setID); has { diff --git a/lib/grandpa/vote_message.go b/lib/grandpa/vote_message.go index 7dbc4d7a5b..07f5b798e2 100644 --- a/lib/grandpa/vote_message.go +++ b/lib/grandpa/vote_message.go @@ -43,23 +43,23 @@ func (s *Service) receiveMessages(ctx context.Context) { err := telemetry.GetInstance().SendMessage( telemetry.NewAfgReceivedPrevoteTM( vm.Message.Hash, - fmt.Sprintf("%d", vm.Message.Number), + fmt.Sprint(vm.Message.Number), vm.Message.AuthorityID.String(), ), ) if err != nil { - logger.Debugf("problem sending afg.received_prevote telemetry message, err: %s", err) + logger.Debugf("problem sending afg.received_prevote telemetry message: %s", err) } case precommit: err := telemetry.GetInstance().SendMessage( telemetry.NewAfgReceivedPrecommitTM( vm.Message.Hash, - fmt.Sprintf("%d", vm.Message.Number), + fmt.Sprint(vm.Message.Number), vm.Message.AuthorityID.String(), ), ) if err != nil { - logger.Debugf("problem sending afg.received_precommit telemetry message err: %s", err) + logger.Debugf("problem sending afg.received_precommit telemetry message: %s", err) } default: logger.Warnf("unsupported stage %s", vm.Message.Stage.String())