Skip to content
This repository has been archived by the owner on Nov 5, 2021. It is now read-only.

Commit

Permalink
Fix data race errors in UDP tests. (#638)
Browse files Browse the repository at this point in the history
Wait for various goroutines to finish.

PiperOrigin-RevId: 387192294
  • Loading branch information
manugarg authored Jul 27, 2021
1 parent 2ca2b72 commit b5c624f
Show file tree
Hide file tree
Showing 2 changed files with 47 additions and 23 deletions.
52 changes: 34 additions & 18 deletions probes/udp/udp_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2017-2019 The Cloudprober Authors.
// Copyright 2017-2021 The Cloudprober Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -14,6 +14,7 @@

// Workaround to skip UDP tests using a tag, until
// https://github.com/google/cloudprober/issues/199 is fixed.
//go:build !skip_udp_probe_test
// +build !skip_udp_probe_test

package udp
Expand Down Expand Up @@ -96,7 +97,10 @@ func startUDPServer(ctx context.Context, t *testing.T, drop bool, delay time.Dur

const numTxPorts = 2

func runProbe(ctx context.Context, t *testing.T, interval, timeout time.Duration, probesToSend int, scs *serverConnStats, conf configpb.ProbeConf) *Probe {
func runProbe(t *testing.T, interval, timeout time.Duration, probesToSend int, scs *serverConnStats, conf *configpb.ProbeConf) *Probe {
ctx, cancelCtx := context.WithCancel(context.Background())
var wg sync.WaitGroup

sysvars.Init(&logger.Logger{}, nil)
p := &Probe{}
ipVersion := 6
Expand All @@ -110,7 +114,7 @@ func runProbe(ctx context.Context, t *testing.T, interval, timeout time.Duration
Targets: targets.StaticTargets("localhost"),
Interval: interval,
Timeout: timeout,
ProbeConf: &conf,
ProbeConf: conf,
StatsExportInterval: 10 * time.Second,
}
if err := p.Init("udp", opts); err != nil {
Expand All @@ -120,11 +124,19 @@ func runProbe(ctx context.Context, t *testing.T, interval, timeout time.Duration
p.initProbeRunResults()

for _, conn := range p.connList {
go p.recvLoop(ctx, conn)
wg.Add(1)
go func(c *net.UDPConn) {
defer wg.Done()
p.recvLoop(ctx, c)
}(conn)
}

time.Sleep(time.Second)

wg.Add(1)
go func() {
defer wg.Done()

flushTicker := time.NewTicker(p.flushIntv)
for {
select {
Expand Down Expand Up @@ -155,6 +167,9 @@ func runProbe(ctx context.Context, t *testing.T, interval, timeout time.Duration
}
t.Logf("Echo server stats: %v", scs.msgCt)

cancelCtx()
wg.Wait()

return p
}

Expand All @@ -179,18 +194,18 @@ func TestSuccessMultipleCasesResultPerPort(t *testing.T) {
}

for _, c := range cases {
ctx, cancelCtx := context.WithCancel(context.Background())
ctx, cancelServerCtx := context.WithCancel(context.Background())
port, scs := startUDPServer(ctx, t, false, c.delay*time.Millisecond)
t.Logf("Case(%s): started server on port %d with delay %v", c.name, port, c.delay)

conf := configpb.ProbeConf{
conf := &configpb.ProbeConf{
UseAllTxPortsPerProbe: proto.Bool(c.useAllPorts),
Port: proto.Int32(int32(port)),
ExportMetricsByPort: proto.Bool(true),
}

p := runProbe(ctx, t, c.interval*time.Millisecond, c.timeout*time.Millisecond, c.probeCount, scs, conf)
cancelCtx()
p := runProbe(t, c.interval*time.Millisecond, c.timeout*time.Millisecond, c.probeCount, scs, conf)
cancelServerCtx()

if len(p.connList) != numTxPorts {
t.Errorf("Case(%s): len(p.connList)=%d, want %d", c.name, len(p.connList), numTxPorts)
Expand Down Expand Up @@ -231,15 +246,17 @@ func TestSuccessMultipleCasesDefaultResult(t *testing.T) {
}

for _, c := range cases {
ctx, cancelCtx := context.WithCancel(context.Background())
ctx, cancelServerCtx := context.WithCancel(context.Background())
port, scs := startUDPServer(ctx, t, false, c.delay*time.Millisecond)
t.Logf("Case(%s): started server on port %d with delay %v", c.name, port, c.delay)
conf := configpb.ProbeConf{
conf := &configpb.ProbeConf{
UseAllTxPortsPerProbe: proto.Bool(c.useAllPorts),
Port: proto.Int32(int32(port)),
ExportMetricsByPort: proto.Bool(false)}
p := runProbe(ctx, t, c.interval*time.Millisecond, c.timeout*time.Millisecond, c.probeCount, scs, conf)
cancelCtx()
ExportMetricsByPort: proto.Bool(false),
}

p := runProbe(t, c.interval*time.Millisecond, c.timeout*time.Millisecond, c.probeCount, scs, conf)
cancelServerCtx()

if len(p.connList) != numTxPorts {
t.Errorf("Case(%s): len(p.connList)=%d, want %d", c.name, len(p.connList), numTxPorts)
Expand Down Expand Up @@ -323,20 +340,19 @@ func TestLossAndDelayed(t *testing.T) {
}

for _, c := range cases {
ctx, cancelCtx := context.WithCancel(context.Background())
ctx, cancelServerCtx := context.WithCancel(context.Background())
port, scs := startUDPServer(ctx, t, c.drop, c.delay*time.Millisecond)

t.Logf("Case(%s): started server on port %d with loss %v delay %v", c.name, port, c.drop, c.delay)

conf := configpb.ProbeConf{
conf := &configpb.ProbeConf{
UseAllTxPortsPerProbe: proto.Bool(true),
Port: proto.Int32(int32(port)),
ExportMetricsByPort: proto.Bool(true),
}

p := runProbe(ctx, t, c.interval*time.Millisecond, c.timeout*time.Millisecond, int(pktCount), scs, conf)

cancelCtx()
p := runProbe(t, c.interval*time.Millisecond, c.timeout*time.Millisecond, int(pktCount), scs, conf)
cancelServerCtx()

if len(p.connList) != numTxPorts {
t.Errorf("Case(%s): len(p.connList)=%d, want %d", c.name, len(p.connList), numTxPorts)
Expand Down
18 changes: 13 additions & 5 deletions probes/udplistener/udplistener_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2018 The Cloudprober Authors.
// Copyright 2018-2021 The Cloudprober Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -145,8 +145,7 @@ func sendPktsAndCollectReplies(ctx context.Context, t *testing.T, srvPort int, i
}

func runProbe(ctx context.Context, t *testing.T, inp *inputState) ([]int, chan statskeeper.ProbeResult, *probeRunResult, *probeErr) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
ctx, cancelCtx := context.WithCancel(ctx)

sysvars.Init(&logger.Logger{}, nil)
p := &Probe{}
Expand Down Expand Up @@ -179,11 +178,20 @@ func runProbe(ctx context.Context, t *testing.T, inp *inputState) ([]int, chan s

p.updateTargets()
resultsChan := make(chan statskeeper.ProbeResult, 10)
go p.probeLoop(ctx, resultsChan)

var wg sync.WaitGroup

wg.Add(1)
go func() {
defer wg.Done()
p.probeLoop(ctx, resultsChan)
}()

time.Sleep(interval) // Wait for echo loop to be active.

rxSeq := sendPktsAndCollectReplies(ctx, t, port, inp)
cancel()
cancelCtx()
wg.Wait()

return rxSeq, resultsChan, p.res[localhost], p.errs
}
Expand Down

0 comments on commit b5c624f

Please sign in to comment.