Skip to content

Commit

Permalink
Adding block height and wallet balance metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
joeabbey committed Sep 7, 2022
1 parent ffc46dc commit ce8c984
Show file tree
Hide file tree
Showing 9 changed files with 185 additions and 60 deletions.
80 changes: 73 additions & 7 deletions relayer/chains/cosmos/cosmos_chain_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"context"
"errors"
"fmt"
"math/big"
"time"

"github.com/avast/retry-go/v4"
sdk "github.com/cosmos/cosmos-sdk/types"
clienttypes "github.com/cosmos/ibc-go/v5/modules/core/02-client/types"
conntypes "github.com/cosmos/ibc-go/v5/modules/core/03-connection/types"
chantypes "github.com/cosmos/ibc-go/v5/modules/core/04-channel/types"
Expand Down Expand Up @@ -44,9 +46,15 @@ type CosmosChainProcessor struct {

// map of channel ID to connection ID
channelConnections map[string]string

// metrics to monitor lifetime of processor
metrics *processor.PrometheusMetrics

// parsed gas prices accepted by the chain (only used for metrics)
parsedGasPrices *sdk.DecCoins
}

func NewCosmosChainProcessor(log *zap.Logger, provider *CosmosProvider) *CosmosChainProcessor {
func NewCosmosChainProcessor(log *zap.Logger, provider *CosmosProvider, metrics *processor.PrometheusMetrics) *CosmosChainProcessor {
return &CosmosChainProcessor{
log: log.With(zap.String("chain_name", provider.ChainName()), zap.String("chain_id", provider.ChainId())),
chainProvider: provider,
Expand All @@ -55,6 +63,7 @@ func NewCosmosChainProcessor(log *zap.Logger, provider *CosmosProvider) *CosmosC
channelStateCache: make(processor.ChannelStateCache),
connectionClients: make(map[string]string),
channelConnections: make(map[string]string),
metrics: metrics,
}
}

Expand All @@ -64,8 +73,9 @@ const (
latestHeightQueryRetryDelay = 1 * time.Second
latestHeightQueryRetries = 5

defaultMinQueryLoopDuration = 1 * time.Second
inSyncNumBlocksThreshold = 2
defaultMinQueryLoopDuration = 1 * time.Second
defaultBalanceUpdateWaitDuration = 60 * time.Second
inSyncNumBlocksThreshold = 2
)

// latestClientState is a map of clientID to the latest clientInfo for that client.
Expand Down Expand Up @@ -132,9 +142,11 @@ func (ccp *CosmosChainProcessor) clientState(ctx context.Context, clientID strin

// queryCyclePersistence hold the variables that should be retained across queryCycles.
type queryCyclePersistence struct {
latestHeight int64
latestQueriedBlock int64
minQueryLoopDuration time.Duration
latestHeight int64
latestQueriedBlock int64
minQueryLoopDuration time.Duration
lastBalanceUpdate time.Time
balanceUpdateWaitDuration time.Duration
}

// Run starts the query loop for the chain which will gather applicable ibc messages and push events out to the relevant PathProcessors.
Expand All @@ -143,7 +155,9 @@ type queryCyclePersistence struct {
func (ccp *CosmosChainProcessor) Run(ctx context.Context, initialBlockHistory uint64) error {
// this will be used for persistence across query cycle loop executions
persistence := queryCyclePersistence{
minQueryLoopDuration: defaultMinQueryLoopDuration,
minQueryLoopDuration: defaultMinQueryLoopDuration,
lastBalanceUpdate: time.Unix(0, 0),
balanceUpdateWaitDuration: defaultBalanceUpdateWaitDuration,
}

// Infinite retry to get initial latest height
Expand Down Expand Up @@ -267,6 +281,10 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu
zap.Int64("latest_height", persistence.latestHeight),
)

if ccp.metrics != nil {
ccp.CollectMetrics(ctx, persistence)
}

// used at the end of the cycle to send signal to path processors to start processing if both chains are in sync and no new messages came in this cycle
firstTimeInSync := false

Expand Down Expand Up @@ -390,3 +408,51 @@ func (ccp *CosmosChainProcessor) queryCycle(ctx context.Context, persistence *qu

return nil
}

func (ccp *CosmosChainProcessor) CollectMetrics(ctx context.Context, persistence *queryCyclePersistence) {
ccp.CurrentBlockHeight(ctx, persistence)

// Wait a while before updating the balance
if time.Since(persistence.lastBalanceUpdate) > persistence.balanceUpdateWaitDuration {
ccp.CurrentRelayerBalance(ctx)
persistence.lastBalanceUpdate = time.Now()
}
}

func (ccp *CosmosChainProcessor) CurrentBlockHeight(ctx context.Context, persistence *queryCyclePersistence) {
ccp.metrics.SetLatestHeight(ccp.chainProvider.ChainName(), persistence.latestHeight)
}

func (ccp *CosmosChainProcessor) CurrentRelayerBalance(ctx context.Context) {
// memoize the current gas prices to only show metrics for "interesting" denoms
if ccp.parsedGasPrices == nil {
gp, err := sdk.ParseDecCoins(ccp.chainProvider.Config.GasPrices)
if err != nil {
ccp.log.Error(
"Failed to parse gas prices",
zap.Error(err),
)
}
ccp.parsedGasPrices = &gp
}

// Get the balance for the chain provider's key
relayerWalletBalance, err := ccp.chainProvider.QueryBalance(ctx, ccp.chainProvider.Key())
if err != nil {
ccp.log.Error(
"Failed to query relayer balance",
zap.Error(err),
)
}

// Print the relevant gas prices
for _, gasDenom := range *ccp.parsedGasPrices {
for _, balance := range relayerWalletBalance {
if balance.Denom == gasDenom.Denom {
// Convert to a big float to get a float64 for metrics
f, _ := big.NewFloat(0.0).SetInt(balance.Amount.BigInt()).Float64()
ccp.metrics.SetWalletBalance(ccp.chainProvider.ChainId(), ccp.chainProvider.Key(), balance.Denom, f)
}
}
}
}
8 changes: 4 additions & 4 deletions relayer/chains/cosmos/message_handlers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestConnectionStateCache(t *testing.T) {
// will populate the connectionStateCache with a key that has an empty counterparty connection ID.
// The MsgConnectionOpenTry needs to replace this key with a key that has the counterparty connection ID.

ccp := NewCosmosChainProcessor(zap.NewNop(), &CosmosProvider{})
ccp := NewCosmosChainProcessor(zap.NewNop(), &CosmosProvider{}, nil)
c := processor.NewIBCMessagesCache()

// Observe MsgConnectionOpenInit, which does not have counterparty connection ID.
Expand Down Expand Up @@ -67,7 +67,7 @@ func TestConnectionStateCache(t *testing.T) {
// We need to make sure that the connectionStateCache does not have two keys for the same connection,
// i.e. one key with the counterparty connection ID, and one without.

ccp := NewCosmosChainProcessor(zap.NewNop(), &CosmosProvider{})
ccp := NewCosmosChainProcessor(zap.NewNop(), &CosmosProvider{}, nil)
c := processor.NewIBCMessagesCache()

// Initialize connectionStateCache with populated connection ID and counterparty connection ID.
Expand Down Expand Up @@ -118,7 +118,7 @@ func TestChannelStateCache(t *testing.T) {
// will populate the channelStateCache with a key that has an empty counterparty channel ID.
// The MsgChannelOpenTry needs to replace this key with a key that has the counterparty channel ID.

ccp := NewCosmosChainProcessor(zap.NewNop(), &CosmosProvider{})
ccp := NewCosmosChainProcessor(zap.NewNop(), &CosmosProvider{}, nil)
c := processor.NewIBCMessagesCache()

// Observe MsgChannelOpenInit, which does not have counterparty channel ID.
Expand Down Expand Up @@ -151,7 +151,7 @@ func TestChannelStateCache(t *testing.T) {
// We need to make sure that the channelStateCache does not have two keys for the same channel,
// i.e. one key with the counterparty channel ID, and one without.

ccp := NewCosmosChainProcessor(zap.NewNop(), &CosmosProvider{})
ccp := NewCosmosChainProcessor(zap.NewNop(), &CosmosProvider{}, nil)
c := processor.NewIBCMessagesCache()

// Initialize channelStateCache with populated channel ID and counterparty channel ID.
Expand Down
12 changes: 10 additions & 2 deletions relayer/chains/cosmos/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"os"
"time"

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/cosmos/cosmos-sdk/types/module"
commitmenttypes "github.com/cosmos/ibc-go/v5/modules/core/23-commitment/types"
ibcexported "github.com/cosmos/ibc-go/v5/modules/core/exported"
tmclient "github.com/cosmos/ibc-go/v5/modules/light-clients/07-tendermint/types"
"github.com/cosmos/relayer/v2/relayer/processor"
"github.com/cosmos/relayer/v2/relayer/provider"
"github.com/gogo/protobuf/proto"
lens "github.com/strangelove-ventures/lens/client"
Expand Down Expand Up @@ -62,11 +64,13 @@ func (pc CosmosProviderConfig) NewProvider(log *zap.Logger, homepath string, deb
return nil, err
}
pc.ChainName = chainName
return &CosmosProvider{
log: log,
m := processor.NewPrometheusMetrics()

return &CosmosProvider{
log: log,
ChainClient: *cc,
PCfg: pc,
metrics: m,
}, nil
}

Expand Down Expand Up @@ -95,6 +99,10 @@ type CosmosProvider struct {

lens.ChainClient
PCfg CosmosProviderConfig

// metrics to monitor the provider
TotalFees sdk.Coins
metrics *processor.PrometheusMetrics
}

type CosmosIBCHeader struct {
Expand Down
34 changes: 26 additions & 8 deletions relayer/chains/cosmos/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"math/big"
"strings"
"time"

Expand Down Expand Up @@ -71,9 +72,11 @@ func (cc *CosmosProvider) SendMessage(ctx context.Context, msg provider.RelayerM
// sent and executed successfully is returned.
func (cc *CosmosProvider) SendMessages(ctx context.Context, msgs []provider.RelayerMessage, memo string) (*provider.RelayerTxResponse, bool, error) {
var resp *sdk.TxResponse
var fees sdk.Coins

if err := retry.Do(func() error {
txBytes, err := cc.buildMessages(ctx, msgs, memo)
txBytes, f, err := cc.buildMessages(ctx, msgs, memo)
fees = f
if err != nil {
errMsg := err.Error()

Expand Down Expand Up @@ -179,10 +182,13 @@ func (cc *CosmosProvider) SendMessages(ctx context.Context, msgs []provider.Rela
// transaction was successfully executed.
if rlyResp.Code != 0 {
cc.LogFailedTx(rlyResp, nil, msgs)
cc.UpdateFeesSpent(cc.ChainId(), cc.Key(), fees)
return rlyResp, false, fmt.Errorf("transaction failed with code: %d", resp.Code)
}

cc.LogSuccessTx(resp, msgs)
cc.UpdateFeesSpent(cc.ChainId(), cc.Key(), fees)

return rlyResp, true, nil
}

Expand All @@ -208,11 +214,11 @@ func parseEventsFromTxResponse(resp *sdk.TxResponse) []provider.RelayerEvent {
return events
}

func (cc *CosmosProvider) buildMessages(ctx context.Context, msgs []provider.RelayerMessage, memo string) ([]byte, error) {
func (cc *CosmosProvider) buildMessages(ctx context.Context, msgs []provider.RelayerMessage, memo string) ([]byte, sdk.Coins, error) {
// Query account details
txf, err := cc.PrepareFactory(cc.TxFactory())
if err != nil {
return nil, err
return nil, sdk.Coins{}, err
}

if memo != "" {
Expand All @@ -225,7 +231,7 @@ func (cc *CosmosProvider) buildMessages(ctx context.Context, msgs []provider.Rel
// If users pass gas adjustment, then calculate gas
_, adjusted, err := cc.CalculateGas(ctx, txf, CosmosMsgs(msgs...)...)
if err != nil {
return nil, err
return nil, sdk.Coins{}, err
}

// Set the gas amount on the transaction factory
Expand All @@ -240,7 +246,7 @@ func (cc *CosmosProvider) buildMessages(ctx context.Context, msgs []provider.Rel
}
return nil
}, retry.Context(ctx), rtyAtt, rtyDel, rtyErr); err != nil {
return nil, err
return nil, sdk.Coins{}, err
}

// Attach the signature to the transaction
Expand All @@ -257,11 +263,13 @@ func (cc *CosmosProvider) buildMessages(ctx context.Context, msgs []provider.Rel
}
return nil
}, retry.Context(ctx), rtyAtt, rtyDel, rtyErr); err != nil {
return nil, err
return nil, sdk.Coins{}, err
}

done()

fees := txb.GetTx().GetFee()

var txBytes []byte
// Generate the transaction bytes
if err := retry.Do(func() error {
Expand All @@ -272,10 +280,10 @@ func (cc *CosmosProvider) buildMessages(ctx context.Context, msgs []provider.Rel
}
return nil
}, retry.Context(ctx), rtyAtt, rtyDel, rtyErr); err != nil {
return nil, err
return nil, sdk.Coins{}, err
}

return txBytes, nil
return txBytes, fees, nil
}

// MsgCreateClient creates an sdk.Msg to update the client on src with consensus state from dst
Expand Down Expand Up @@ -1075,3 +1083,13 @@ func (cc *CosmosProvider) NewClientState(
AllowUpdateAfterMisbehaviour: allowUpdateAfterMisbehaviour,
}, nil
}

func (cc *CosmosProvider) UpdateFeesSpent(chain, key string, fees sdk.Coins) {
cc.TotalFees = cc.TotalFees.Add(fees...)

for _, fee := range cc.TotalFees {
// Convert to a big float to get a float64 for metrics
f, _ := big.NewFloat(0.0).SetInt(fee.Amount.BigInt()).Float64()
cc.metrics.SetFeesSpent(chain, key, fee.GetDenom(), f)
}
}
8 changes: 4 additions & 4 deletions relayer/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ func (c *Chain) CreateOpenChannels(

return processor.NewEventProcessor().
WithChainProcessors(
srcPathChain.chainProcessor(c.log),
dstPathChain.chainProcessor(c.log),
srcPathChain.chainProcessor(c.log, nil),
dstPathChain.chainProcessor(c.log, nil),
).
WithPathProcessors(pp).
WithInitialBlockHistory(0).
Expand Down Expand Up @@ -135,8 +135,8 @@ func (c *Chain) CloseChannel(

return processor.NewEventProcessor().
WithChainProcessors(
srcPathChain.chainProcessor(c.log),
dstPathChain.chainProcessor(c.log),
srcPathChain.chainProcessor(c.log, nil),
dstPathChain.chainProcessor(c.log, nil),
).
WithPathProcessors(processor.NewPathProcessor(
c.log,
Expand Down
4 changes: 2 additions & 2 deletions relayer/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,8 @@ func (c *Chain) CreateOpenConnections(

return modified, processor.NewEventProcessor().
WithChainProcessors(
srcpathChain.chainProcessor(c.log),
dstpathChain.chainProcessor(c.log),
srcpathChain.chainProcessor(c.log, nil),
dstpathChain.chainProcessor(c.log, nil),
).
WithPathProcessors(pp).
WithInitialBlockHistory(initialBlockHistory).
Expand Down
Loading

0 comments on commit ce8c984

Please sign in to comment.