From b5c624f8771fbd1c2ec66f48e2fd7c79ac887e85 Mon Sep 17 00:00:00 2001 From: Manu Garg Date: Tue, 27 Jul 2021 14:22:25 -0700 Subject: [PATCH] Fix data race errors in UDP tests. (#638) Wait for various goroutines to finish. PiperOrigin-RevId: 387192294 --- probes/udp/udp_test.go | 52 +++++++++++++++++--------- probes/udplistener/udplistener_test.go | 18 ++++++--- 2 files changed, 47 insertions(+), 23 deletions(-) diff --git a/probes/udp/udp_test.go b/probes/udp/udp_test.go index 4e11d174..1b1ab3be 100644 --- a/probes/udp/udp_test.go +++ b/probes/udp/udp_test.go @@ -1,4 +1,4 @@ -// Copyright 2017-2019 The Cloudprober Authors. +// Copyright 2017-2021 The Cloudprober Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -14,6 +14,7 @@ // Workaround to skip UDP tests using a tag, until // https://github.com/google/cloudprober/issues/199 is fixed. +//go:build !skip_udp_probe_test // +build !skip_udp_probe_test package udp @@ -96,7 +97,10 @@ func startUDPServer(ctx context.Context, t *testing.T, drop bool, delay time.Dur const numTxPorts = 2 -func runProbe(ctx context.Context, t *testing.T, interval, timeout time.Duration, probesToSend int, scs *serverConnStats, conf configpb.ProbeConf) *Probe { +func runProbe(t *testing.T, interval, timeout time.Duration, probesToSend int, scs *serverConnStats, conf *configpb.ProbeConf) *Probe { + ctx, cancelCtx := context.WithCancel(context.Background()) + var wg sync.WaitGroup + sysvars.Init(&logger.Logger{}, nil) p := &Probe{} ipVersion := 6 @@ -110,7 +114,7 @@ func runProbe(ctx context.Context, t *testing.T, interval, timeout time.Duration Targets: targets.StaticTargets("localhost"), Interval: interval, Timeout: timeout, - ProbeConf: &conf, + ProbeConf: conf, StatsExportInterval: 10 * time.Second, } if err := p.Init("udp", opts); err != nil { @@ -120,11 +124,19 @@ func runProbe(ctx context.Context, t *testing.T, interval, timeout time.Duration p.initProbeRunResults() for _, conn := range p.connList { - go p.recvLoop(ctx, conn) + wg.Add(1) + go func(c *net.UDPConn) { + defer wg.Done() + p.recvLoop(ctx, c) + }(conn) } time.Sleep(time.Second) + + wg.Add(1) go func() { + defer wg.Done() + flushTicker := time.NewTicker(p.flushIntv) for { select { @@ -155,6 +167,9 @@ func runProbe(ctx context.Context, t *testing.T, interval, timeout time.Duration } t.Logf("Echo server stats: %v", scs.msgCt) + cancelCtx() + wg.Wait() + return p } @@ -179,18 +194,18 @@ func TestSuccessMultipleCasesResultPerPort(t *testing.T) { } for _, c := range cases { - ctx, cancelCtx := context.WithCancel(context.Background()) + ctx, cancelServerCtx := context.WithCancel(context.Background()) port, scs := startUDPServer(ctx, t, false, c.delay*time.Millisecond) t.Logf("Case(%s): started server on port %d with delay %v", c.name, port, c.delay) - conf := configpb.ProbeConf{ + conf := &configpb.ProbeConf{ UseAllTxPortsPerProbe: proto.Bool(c.useAllPorts), Port: proto.Int32(int32(port)), ExportMetricsByPort: proto.Bool(true), } - p := runProbe(ctx, t, c.interval*time.Millisecond, c.timeout*time.Millisecond, c.probeCount, scs, conf) - cancelCtx() + p := runProbe(t, c.interval*time.Millisecond, c.timeout*time.Millisecond, c.probeCount, scs, conf) + cancelServerCtx() if len(p.connList) != numTxPorts { t.Errorf("Case(%s): len(p.connList)=%d, want %d", c.name, len(p.connList), numTxPorts) @@ -231,15 +246,17 @@ func TestSuccessMultipleCasesDefaultResult(t *testing.T) { } for _, c := range cases { - ctx, cancelCtx := context.WithCancel(context.Background()) + ctx, cancelServerCtx := context.WithCancel(context.Background()) port, scs := startUDPServer(ctx, t, false, c.delay*time.Millisecond) t.Logf("Case(%s): started server on port %d with delay %v", c.name, port, c.delay) - conf := configpb.ProbeConf{ + conf := &configpb.ProbeConf{ UseAllTxPortsPerProbe: proto.Bool(c.useAllPorts), Port: proto.Int32(int32(port)), - ExportMetricsByPort: proto.Bool(false)} - p := runProbe(ctx, t, c.interval*time.Millisecond, c.timeout*time.Millisecond, c.probeCount, scs, conf) - cancelCtx() + ExportMetricsByPort: proto.Bool(false), + } + + p := runProbe(t, c.interval*time.Millisecond, c.timeout*time.Millisecond, c.probeCount, scs, conf) + cancelServerCtx() if len(p.connList) != numTxPorts { t.Errorf("Case(%s): len(p.connList)=%d, want %d", c.name, len(p.connList), numTxPorts) @@ -323,20 +340,19 @@ func TestLossAndDelayed(t *testing.T) { } for _, c := range cases { - ctx, cancelCtx := context.WithCancel(context.Background()) + ctx, cancelServerCtx := context.WithCancel(context.Background()) port, scs := startUDPServer(ctx, t, c.drop, c.delay*time.Millisecond) t.Logf("Case(%s): started server on port %d with loss %v delay %v", c.name, port, c.drop, c.delay) - conf := configpb.ProbeConf{ + conf := &configpb.ProbeConf{ UseAllTxPortsPerProbe: proto.Bool(true), Port: proto.Int32(int32(port)), ExportMetricsByPort: proto.Bool(true), } - p := runProbe(ctx, t, c.interval*time.Millisecond, c.timeout*time.Millisecond, int(pktCount), scs, conf) - - cancelCtx() + p := runProbe(t, c.interval*time.Millisecond, c.timeout*time.Millisecond, int(pktCount), scs, conf) + cancelServerCtx() if len(p.connList) != numTxPorts { t.Errorf("Case(%s): len(p.connList)=%d, want %d", c.name, len(p.connList), numTxPorts) diff --git a/probes/udplistener/udplistener_test.go b/probes/udplistener/udplistener_test.go index d3ef1cd1..74e2adfb 100644 --- a/probes/udplistener/udplistener_test.go +++ b/probes/udplistener/udplistener_test.go @@ -1,4 +1,4 @@ -// Copyright 2018 The Cloudprober Authors. +// Copyright 2018-2021 The Cloudprober Authors. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -145,8 +145,7 @@ func sendPktsAndCollectReplies(ctx context.Context, t *testing.T, srvPort int, i } func runProbe(ctx context.Context, t *testing.T, inp *inputState) ([]int, chan statskeeper.ProbeResult, *probeRunResult, *probeErr) { - ctx, cancel := context.WithCancel(ctx) - defer cancel() + ctx, cancelCtx := context.WithCancel(ctx) sysvars.Init(&logger.Logger{}, nil) p := &Probe{} @@ -179,11 +178,20 @@ func runProbe(ctx context.Context, t *testing.T, inp *inputState) ([]int, chan s p.updateTargets() resultsChan := make(chan statskeeper.ProbeResult, 10) - go p.probeLoop(ctx, resultsChan) + + var wg sync.WaitGroup + + wg.Add(1) + go func() { + defer wg.Done() + p.probeLoop(ctx, resultsChan) + }() + time.Sleep(interval) // Wait for echo loop to be active. rxSeq := sendPktsAndCollectReplies(ctx, t, port, inp) - cancel() + cancelCtx() + wg.Wait() return rxSeq, resultsChan, p.res[localhost], p.errs }