Skip to content
This repository has been archived by the owner on Nov 5, 2021. It is now read-only.

Commit

Permalink
Add an RDS provider for file-based targets.
Browse files Browse the repository at this point in the history
To avoid breaking existing users, continuing providing "file_targets" (directly), but implement them using RDS file targets provider.

Main reasoning behind this RDS provider is to allow multiple probes to use the same targets file without each probe having to load the same file again and again.

Ref: #634
PiperOrigin-RevId: 387196102
  • Loading branch information
manugarg committed Jul 27, 2021
1 parent 2ca2b72 commit 9de2210
Show file tree
Hide file tree
Showing 16 changed files with 1,130 additions and 603 deletions.
52 changes: 34 additions & 18 deletions probes/udp/udp_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2017-2019 The Cloudprober Authors.
// Copyright 2017-2021 The Cloudprober Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -14,6 +14,7 @@

// Workaround to skip UDP tests using a tag, until
// https://github.com/google/cloudprober/issues/199 is fixed.
//go:build !skip_udp_probe_test
// +build !skip_udp_probe_test

package udp
Expand Down Expand Up @@ -96,7 +97,10 @@ func startUDPServer(ctx context.Context, t *testing.T, drop bool, delay time.Dur

const numTxPorts = 2

func runProbe(ctx context.Context, t *testing.T, interval, timeout time.Duration, probesToSend int, scs *serverConnStats, conf configpb.ProbeConf) *Probe {
func runProbe(t *testing.T, interval, timeout time.Duration, probesToSend int, scs *serverConnStats, conf *configpb.ProbeConf) *Probe {
ctx, cancelCtx := context.WithCancel(context.Background())
var wg sync.WaitGroup

sysvars.Init(&logger.Logger{}, nil)
p := &Probe{}
ipVersion := 6
Expand All @@ -110,7 +114,7 @@ func runProbe(ctx context.Context, t *testing.T, interval, timeout time.Duration
Targets: targets.StaticTargets("localhost"),
Interval: interval,
Timeout: timeout,
ProbeConf: &conf,
ProbeConf: conf,
StatsExportInterval: 10 * time.Second,
}
if err := p.Init("udp", opts); err != nil {
Expand All @@ -120,11 +124,19 @@ func runProbe(ctx context.Context, t *testing.T, interval, timeout time.Duration
p.initProbeRunResults()

for _, conn := range p.connList {
go p.recvLoop(ctx, conn)
wg.Add(1)
go func(c *net.UDPConn) {
defer wg.Done()
p.recvLoop(ctx, c)
}(conn)
}

time.Sleep(time.Second)

wg.Add(1)
go func() {
defer wg.Done()

flushTicker := time.NewTicker(p.flushIntv)
for {
select {
Expand Down Expand Up @@ -155,6 +167,9 @@ func runProbe(ctx context.Context, t *testing.T, interval, timeout time.Duration
}
t.Logf("Echo server stats: %v", scs.msgCt)

cancelCtx()
wg.Wait()

return p
}

Expand All @@ -179,18 +194,18 @@ func TestSuccessMultipleCasesResultPerPort(t *testing.T) {
}

for _, c := range cases {
ctx, cancelCtx := context.WithCancel(context.Background())
ctx, cancelServerCtx := context.WithCancel(context.Background())
port, scs := startUDPServer(ctx, t, false, c.delay*time.Millisecond)
t.Logf("Case(%s): started server on port %d with delay %v", c.name, port, c.delay)

conf := configpb.ProbeConf{
conf := &configpb.ProbeConf{
UseAllTxPortsPerProbe: proto.Bool(c.useAllPorts),
Port: proto.Int32(int32(port)),
ExportMetricsByPort: proto.Bool(true),
}

p := runProbe(ctx, t, c.interval*time.Millisecond, c.timeout*time.Millisecond, c.probeCount, scs, conf)
cancelCtx()
p := runProbe(t, c.interval*time.Millisecond, c.timeout*time.Millisecond, c.probeCount, scs, conf)
cancelServerCtx()

if len(p.connList) != numTxPorts {
t.Errorf("Case(%s): len(p.connList)=%d, want %d", c.name, len(p.connList), numTxPorts)
Expand Down Expand Up @@ -231,15 +246,17 @@ func TestSuccessMultipleCasesDefaultResult(t *testing.T) {
}

for _, c := range cases {
ctx, cancelCtx := context.WithCancel(context.Background())
ctx, cancelServerCtx := context.WithCancel(context.Background())
port, scs := startUDPServer(ctx, t, false, c.delay*time.Millisecond)
t.Logf("Case(%s): started server on port %d with delay %v", c.name, port, c.delay)
conf := configpb.ProbeConf{
conf := &configpb.ProbeConf{
UseAllTxPortsPerProbe: proto.Bool(c.useAllPorts),
Port: proto.Int32(int32(port)),
ExportMetricsByPort: proto.Bool(false)}
p := runProbe(ctx, t, c.interval*time.Millisecond, c.timeout*time.Millisecond, c.probeCount, scs, conf)
cancelCtx()
ExportMetricsByPort: proto.Bool(false),
}

p := runProbe(t, c.interval*time.Millisecond, c.timeout*time.Millisecond, c.probeCount, scs, conf)
cancelServerCtx()

if len(p.connList) != numTxPorts {
t.Errorf("Case(%s): len(p.connList)=%d, want %d", c.name, len(p.connList), numTxPorts)
Expand Down Expand Up @@ -323,20 +340,19 @@ func TestLossAndDelayed(t *testing.T) {
}

for _, c := range cases {
ctx, cancelCtx := context.WithCancel(context.Background())
ctx, cancelServerCtx := context.WithCancel(context.Background())
port, scs := startUDPServer(ctx, t, c.drop, c.delay*time.Millisecond)

t.Logf("Case(%s): started server on port %d with loss %v delay %v", c.name, port, c.drop, c.delay)

conf := configpb.ProbeConf{
conf := &configpb.ProbeConf{
UseAllTxPortsPerProbe: proto.Bool(true),
Port: proto.Int32(int32(port)),
ExportMetricsByPort: proto.Bool(true),
}

p := runProbe(ctx, t, c.interval*time.Millisecond, c.timeout*time.Millisecond, int(pktCount), scs, conf)

cancelCtx()
p := runProbe(t, c.interval*time.Millisecond, c.timeout*time.Millisecond, int(pktCount), scs, conf)
cancelServerCtx()

if len(p.connList) != numTxPorts {
t.Errorf("Case(%s): len(p.connList)=%d, want %d", c.name, len(p.connList), numTxPorts)
Expand Down
18 changes: 13 additions & 5 deletions probes/udplistener/udplistener_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2018 The Cloudprober Authors.
// Copyright 2018-2021 The Cloudprober Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -145,8 +145,7 @@ func sendPktsAndCollectReplies(ctx context.Context, t *testing.T, srvPort int, i
}

func runProbe(ctx context.Context, t *testing.T, inp *inputState) ([]int, chan statskeeper.ProbeResult, *probeRunResult, *probeErr) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
ctx, cancelCtx := context.WithCancel(ctx)

sysvars.Init(&logger.Logger{}, nil)
p := &Probe{}
Expand Down Expand Up @@ -179,11 +178,20 @@ func runProbe(ctx context.Context, t *testing.T, inp *inputState) ([]int, chan s

p.updateTargets()
resultsChan := make(chan statskeeper.ProbeResult, 10)
go p.probeLoop(ctx, resultsChan)

var wg sync.WaitGroup

wg.Add(1)
go func() {
defer wg.Done()
p.probeLoop(ctx, resultsChan)
}()

time.Sleep(interval) // Wait for echo loop to be active.

rxSeq := sendPktsAndCollectReplies(ctx, t, port, inp)
cancel()
cancelCtx()
wg.Wait()

return rxSeq, resultsChan, p.res[localhost], p.errs
}
Expand Down
Loading

0 comments on commit 9de2210

Please sign in to comment.