Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: replace time.After with time.NewTicker #1650

Merged
merged 9 commits into from
Jun 28, 2021
8 changes: 6 additions & 2 deletions dot/network/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,15 @@ func (d *discovery) start() error {
// get all currently connected peers and use them to bootstrap the DHT
peers := d.h.Network().Peers()

t := time.NewTicker(startDHTTimeout)
defer t.Stop()
for {
if len(peers) > 0 {
break
}

select {
case <-time.After(startDHTTimeout):
case <-t.C:
logger.Debug("no peers yet, waiting to start DHT...")
// wait for peers to connect before starting DHT, otherwise DHT bootstrap nodes
// will be empty and we will fail to fill the routing table
Expand Down Expand Up @@ -169,11 +171,13 @@ func (d *discovery) advertise() {
}

func (d *discovery) checkPeerCount() {
t := time.NewTicker(connectToPeersTimeout)
defer t.Stop()
for {
select {
case <-d.ctx.Done():
return
case <-time.After(connectToPeersTimeout):
case <-t.C:
if len(d.h.Network().Peers()) > d.minPeers {
continue
}
Expand Down
48 changes: 30 additions & 18 deletions dot/network/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,9 @@ const (
badPeerThreshold int = -2
protectedPeerThreshold int = 7

defaultSlotDuration = time.Second * 6
defaultSlotDuration = time.Second * 6
defaultHandleResponseQueueDuration = time.Second
defaultPrunePeersDuration = time.Second * 30
)

var (
Expand Down Expand Up @@ -132,26 +134,30 @@ type syncQueue struct {
goal int64 // goal block number we are trying to sync to
currStart, currEnd int64 // the start and end of the BlockResponse we are currently handling; 0 and 0 if we are not currently handling any

benchmarker *syncBenchmarker
benchmarker *syncBenchmarker
handleResponseQueueDuration time.Duration
prunePeersDuration time.Duration
}

func newSyncQueue(s *Service) *syncQueue {
ctx, cancel := context.WithCancel(s.ctx)

return &syncQueue{
s: s,
slotDuration: defaultSlotDuration,
ctx: ctx,
cancel: cancel,
peerScore: new(sync.Map),
requestData: new(sync.Map),
requestDataByHash: new(sync.Map),
justificationRequestData: new(sync.Map),
requestCh: make(chan *syncRequest, blockRequestBufferSize),
responses: []*types.BlockData{},
responseCh: make(chan []*types.BlockData, blockResponseBufferSize),
benchmarker: newSyncBenchmarker(),
buf: make([]byte, maxBlockResponseSize),
s: s,
slotDuration: defaultSlotDuration,
ctx: ctx,
cancel: cancel,
peerScore: new(sync.Map),
requestData: new(sync.Map),
requestDataByHash: new(sync.Map),
justificationRequestData: new(sync.Map),
requestCh: make(chan *syncRequest, blockRequestBufferSize),
responses: []*types.BlockData{},
responseCh: make(chan []*types.BlockData, blockResponseBufferSize),
benchmarker: newSyncBenchmarker(),
buf: make([]byte, maxBlockResponseSize),
handleResponseQueueDuration: defaultHandleResponseQueueDuration,
prunePeersDuration: defaultPrunePeersDuration,
}
}

Expand All @@ -176,10 +182,12 @@ func (q *syncQueue) syncAtHead() {
q.s.syncer.SetSyncing(true)
q.s.noGossip = true // don't gossip messages until we're at the head

t := time.NewTicker(q.slotDuration * 2)
defer t.Stop()
for {
select {
// sleep for average block time TODO: make this configurable from slot duration
case <-time.After(q.slotDuration * 2):
case <-t.C:
case <-q.ctx.Done():
return
}
Expand Down Expand Up @@ -214,9 +222,11 @@ func (q *syncQueue) syncAtHead() {
}

func (q *syncQueue) handleResponseQueue() {
t := time.NewTicker(q.handleResponseQueueDuration)
defer t.Stop()
for {
select {
case <-time.After(time.Second):
case <-t.C:
case <-q.ctx.Done():
return
}
Expand Down Expand Up @@ -260,9 +270,11 @@ func (q *syncQueue) handleResponseQueue() {

// prune peers with low score and connect to new peers
func (q *syncQueue) prunePeers() {
t := time.NewTicker(q.prunePeersDuration)
defer t.Stop()
for {
select {
case <-time.After(time.Second * 30):
case <-t.C:
case <-q.ctx.Done():
return
}
Expand Down
18 changes: 12 additions & 6 deletions dot/telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,10 @@ type Message struct {

// Handler struct for holding telemetry related things
type Handler struct {
msg chan Message
connections []*telemetryConnection
log log.Logger
msg chan Message
connections []*telemetryConnection
log log.Logger
sendMessageTimeout time.Duration
}

// KeyValue object to hold key value pairs used in telemetry messages
Expand All @@ -56,14 +57,17 @@ var (
handlerInstance *Handler
)

const defaultMessageTimeout = time.Second

// GetInstance singleton pattern to for accessing TelemetryHandler
func GetInstance() *Handler { //nolint
if handlerInstance == nil {
once.Do(
func() {
handlerInstance = &Handler{
msg: make(chan Message, 256),
log: log.New("pkg", "telemetry"),
msg: make(chan Message, 256),
log: log.New("pkg", "telemetry"),
sendMessageTimeout: defaultMessageTimeout,
}
go handlerInstance.startListening()
})
Expand Down Expand Up @@ -109,10 +113,12 @@ func (h *Handler) AddConnections(conns []*genesis.TelemetryEndpoint) {

// SendMessage sends Message to connected telemetry listeners
func (h *Handler) SendMessage(msg *Message) error {
t := time.NewTicker(h.sendMessageTimeout)
defer t.Stop()
select {
case h.msg <- *msg:

case <-time.After(time.Second * 1):
case <-t.C:
return errors.New("timeout sending message")
}
return nil
Expand Down
4 changes: 3 additions & 1 deletion lib/grandpa/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,11 +177,13 @@ func (s *Service) handleNetworkMessage(from peer.ID, msg NotificationsMessage) (
}

func (s *Service) sendNeighbourMessage() {
t := time.NewTicker(neighbourMessageInterval)
defer t.Stop()
for {
select {
case <-s.ctx.Done():
return
case <-time.After(neighbourMessageInterval):
case <-t.C:
if s.neighbourMessage == nil {
continue
}
Expand Down