Skip to content

Commit

Permalink
sleep for 1.2s blocktime in between each broadcast to injective
Browse files Browse the repository at this point in the history
  • Loading branch information
dbrajovic committed Jan 10, 2024
1 parent 7f76831 commit 655e399
Showing 1 changed file with 58 additions and 8 deletions.
66 changes: 58 additions & 8 deletions orchestrator/cosmos/broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package cosmos

import (
"context"
"fmt"
"sync"
"time"

sdk "github.com/cosmos/cosmos-sdk/types"
Expand Down Expand Up @@ -100,13 +100,16 @@ func (s *peggyBroadcastClient) AccFromAddress() sdk.AccAddress {
return s.broadcastClient.FromAddress()
}

const injectiveBlockTime = 1200 * time.Millisecond

type peggyBroadcastClient struct {
daemonQueryClient types.QueryClient
broadcastClient chainclient.ChainClient
ethSignerFn keystore.SignerFn
ethPersonalSignFn keystore.PersonalSignFn

svcTags metrics.Tags
svcTags metrics.Tags
broadcastMux sync.Mutex
}

func (s *peggyBroadcastClient) UpdatePeggyOrchestratorAddresses(
Expand Down Expand Up @@ -135,14 +138,16 @@ func (s *peggyBroadcastClient) UpdatePeggyOrchestratorAddresses(
Orchestrator: orchestratorAddr.String(),
}

res, err := s.broadcastClient.SyncBroadcastMsg(msg)
fmt.Println("Response of set eth address", "res", res)
if err != nil {
s.broadcastMux.Lock()
defer s.broadcastMux.Unlock()

if _, err := s.broadcastClient.SyncBroadcastMsg(msg); err != nil {
metrics.ReportFuncError(s.svcTags)
err = errors.Wrap(err, "broadcasting MsgSetOrchestratorAddresses failed")
return err
return errors.Wrap(err, "broadcasting MsgSetOrchestratorAddresses failed")
}

time.Sleep(injectiveBlockTime)

return nil
}

Expand Down Expand Up @@ -184,11 +189,16 @@ func (s *peggyBroadcastClient) SendValsetConfirm(
Signature: ethcmn.Bytes2Hex(signature),
}

s.broadcastMux.Lock()
defer s.broadcastMux.Unlock()

if _, err := s.broadcastClient.SyncBroadcastMsg(msg); err != nil {
metrics.ReportFuncError(s.svcTags)
return errors.Wrap(err, "failed to broadcast MsgValsetConfirm")
}

time.Sleep(injectiveBlockTime)

return nil
}

Expand Down Expand Up @@ -225,12 +235,16 @@ func (s *peggyBroadcastClient) SendBatchConfirm(
EthSigner: ethFrom.Hex(),
TokenContract: batch.TokenContract,
}
s.broadcastMux.Lock()
defer s.broadcastMux.Unlock()

if _, err := s.broadcastClient.SyncBroadcastMsg(msg); err != nil {
metrics.ReportFuncError(s.svcTags)
return errors.Wrap(err, "broadcasting MsgConfirmBatch failed")
}

time.Sleep(injectiveBlockTime)

return nil
}

Expand Down Expand Up @@ -265,11 +279,16 @@ func (s *peggyBroadcastClient) sendOldDepositClaims(
Data: "",
}

s.broadcastMux.Lock()
defer s.broadcastMux.Unlock()

if txResponse, err := s.broadcastClient.SyncBroadcastMsg(msg); err != nil {
metrics.ReportFuncError(s.svcTags)
log.WithError(err).Errorln("broadcasting MsgDepositClaim failed")
return err
} else {
time.Sleep(injectiveBlockTime)

log.WithFields(log.Fields{
"event_nonce": oldDeposit.EventNonce.String(),
"tx_hash": txResponse.TxResponse.TxHash,
Expand Down Expand Up @@ -311,11 +330,16 @@ func (s *peggyBroadcastClient) sendDepositClaims(
Data: deposit.Data,
}

s.broadcastMux.Lock()
defer s.broadcastMux.Unlock()

if txResponse, err := s.broadcastClient.SyncBroadcastMsg(msg); err != nil {
metrics.ReportFuncError(s.svcTags)
log.WithError(err).Errorln("broadcasting MsgDepositClaim failed")
return err
} else {
time.Sleep(injectiveBlockTime)

log.WithFields(log.Fields{
"event_nonce": deposit.EventNonce.String(),
"tx_hash": txResponse.TxResponse.TxHash,
Expand Down Expand Up @@ -349,11 +373,16 @@ func (s *peggyBroadcastClient) sendWithdrawClaims(
Orchestrator: s.AccFromAddress().String(),
}

s.broadcastMux.Lock()
defer s.broadcastMux.Unlock()

if txResponse, err := s.broadcastClient.SyncBroadcastMsg(msg); err != nil {
metrics.ReportFuncError(s.svcTags)
log.WithError(err).Errorln("broadcasting MsgWithdrawClaim failed")
return err
} else {
time.Sleep(injectiveBlockTime)

log.WithFields(log.Fields{
"event_nonce": withdraw.EventNonce.String(),
"tx_hash": txResponse.TxResponse.TxHash,
Expand Down Expand Up @@ -398,11 +427,16 @@ func (s *peggyBroadcastClient) sendValsetUpdateClaims(
Orchestrator: s.AccFromAddress().String(),
}

s.broadcastMux.Lock()
defer s.broadcastMux.Unlock()

if txResponse, err := s.broadcastClient.SyncBroadcastMsg(msg); err != nil {
metrics.ReportFuncError(s.svcTags)
log.WithError(err).Errorln("broadcasting MsgValsetUpdatedClaim failed")
return err
} else {
time.Sleep(injectiveBlockTime)

log.WithFields(log.Fields{
"event_nonce": valsetUpdate.EventNonce.String(),
"tx_hash": txResponse.TxResponse.TxHash,
Expand Down Expand Up @@ -440,11 +474,16 @@ func (s *peggyBroadcastClient) sendErc20DeployedClaims(
Orchestrator: s.AccFromAddress().String(),
}

s.broadcastMux.Lock()
defer s.broadcastMux.Unlock()

if txResponse, err := s.broadcastClient.SyncBroadcastMsg(msg); err != nil {
metrics.ReportFuncError(s.svcTags)
log.WithError(err).Errorln("broadcasting MsgERC20DeployedClaim failed")
return err
} else {
time.Sleep(injectiveBlockTime)

log.WithFields(log.Fields{
"event_nonce": erc20Deployed.EventNonce.String(),
"tx_hash": txResponse.TxResponse.TxHash,
Expand Down Expand Up @@ -521,7 +560,7 @@ func (s *peggyBroadcastClient) SendEthereumClaims(
// Considering blockTime=1s on Injective chain, Adding Sleep to make sure new event is
// sent only after previous event is executed successfully.
// Otherwise it will through `non contiguous event nonce` failing CheckTx.
time.Sleep(1200 * time.Millisecond)
//time.Sleep(1200 * time.Millisecond)
}
return nil
}
Expand Down Expand Up @@ -553,11 +592,17 @@ func (s *peggyBroadcastClient) SendToEth(
Amount: amount,
BridgeFee: fee, // TODO: use exactly that fee for transaction
}

s.broadcastMux.Lock()
defer s.broadcastMux.Unlock()

if _, err := s.broadcastClient.SyncBroadcastMsg(msg); err != nil {
metrics.ReportFuncError(s.svcTags)
return errors.Wrap(err, "broadcasting MsgSendToEth failed")
}

time.Sleep(injectiveBlockTime)

return nil
}

Expand All @@ -583,10 +628,15 @@ func (s *peggyBroadcastClient) SendRequestBatch(
Orchestrator: s.AccFromAddress().String(),
}

s.broadcastMux.Lock()
defer s.broadcastMux.Unlock()

if _, err := s.broadcastClient.SyncBroadcastMsg(msg); err != nil {
metrics.ReportFuncError(s.svcTags)
return errors.Wrap(err, "broadcasting MsgRequestBatch failed")
}

time.Sleep(injectiveBlockTime)

return nil
}

0 comments on commit 655e399

Please sign in to comment.