-
Notifications
You must be signed in to change notification settings - Fork 43
/
Copy pathmain.go
290 lines (246 loc) Β· 10.4 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
package main
import (
"context"
"fmt"
"math/rand"
"time"
"golang.org/x/sync/errgroup"
"github.com/libp2p/go-libp2p"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/testground/sdk-go/network"
"github.com/testground/sdk-go/run"
"github.com/testground/sdk-go/runtime"
"github.com/testground/sdk-go/sync"
compat "github.com/libp2p/test-plans/ping/go/compat"
)
var testcases = map[string]interface{}{
"ping": run.InitializedTestCaseFn(runPing), // we don't need the type conversion, but it's here for instructional purposes.
}
// A test plan is just a standard Go program which receives a bunch of
// environment variables to configure itself. A test plan ONLY depends on the
// lightweight Testground SDK (https://github.com/testground/sdk-go/).
//
// A central object is the RunEnv (runtime environment), which encapsulates the
// contract between Testground and this test plan. Read more: https://docs.testground.ai/concepts-and-architecture/runtime.
//
// Other key objects are:
//
// * sync.Client (https://pkg.go.dev/github.com/testground/sdk-go/sync):
// used to coordinate instances with one another via synchronisations
// primitives like signals, barriers, pubsub. In the future, we plan to
// support more sophisticated patterns like locks, semaphores, etc.
// * network.Client (https://pkg.go.dev/github.com/testground/sdk-go/network):
// used to manipulate network configurations. The network.Client uses the
// sync service to communicate with the sidecar containers that manage
// the network configurations "from the outside". In other words, network
// configuration is NOT managed locally by the SDK. Rather, the SDK sends
// commands to the sidecar, and awaits until those commands are applied.
func main() {
// Delegate this run to the SDK. InvokeMap takes a map of test case names
// and test case functions, and dispatches accordingly depending on the test
// case being run. The run.Invoke* functions are entrypoint functions.
run.InvokeMap(testcases)
}
// runPing is the test case logic.
//
// Its signature conforms to the SDK's run.InitializedTestCaseFn type. As a
// result, the Testground SDK will perform a few useful preparation steps
// for us:
//
// 1. Initializing a sync client, bound to this runenv. Refer to the main()
// docs for more info.
// 2. Initializing a net client, using the above sync client. Refer to the
// main() docs for more info.
// 3. Waiting for the network to initialize.
// 4. Claiming a global sequence number, which uniquely identifies this instance within the run.
// 5. Claiming a group-scoped sequence number, which uniquely identifies this instance within its group.
func runPing(runenv *runtime.RunEnv, initCtx *run.InitContext) error {
// π Consume test parameters from the runtime environment.
var (
secureChannel = runenv.StringParam("secure_channel")
maxLatencyMs = runenv.IntParam("max_latency_ms")
iterations = runenv.IntParam("iterations")
)
// We can record messages anytime; RecordMessage supports fmt-style
// formatting strings.
runenv.RecordMessage("started test instance; params: secure_channel=%s, max_latency_ms=%d, iterations=%d", secureChannel, maxLatencyMs, iterations)
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Minute)
defer cancel()
// π£ Wait until all instances in this test run have signalled.
//
// This is API sugar for waiting until all runenv.TestInstanceCount signals
// have been made on state "initialized_global" (run.StateInitializedGlobal).
//
// By convention, Must* methods in the testground SDK will panic upon error.
//
// The sdk's run.Invoke* entrypoint functions catch these panics, record a
// CRASH event, and abort the test immediately.
initCtx.MustWaitAllInstancesInitialized(ctx)
// π₯ Now all instances are ready for action.
//
// Note: In large test runs, the scheduler might take a few minutes to
// schedule all instances in a cluster.
// In containerised runs (local:docker, cluster:k8s runners), Testground
// instances get attached two networks:
//
// * a data network
// * a control network
//
// The data network is where standard test traffic flows. The control
// network connects us ONLY with the sync service, InfluxDB, etc. All
// traffic shaping rules are applied to the data network. Thanks to this
// separation, we can simulate disconnected scenarios by detaching the data
// network adapter, or blocking all incoming/outgoing traffic on that
// network.
//
// We need to listen on (and advertise) our data network IP address, so we
// obtain it from the NetClient.
ip := initCtx.NetClient.MustGetDataNetworkIP()
// βοΈ Let's construct the libp2p node.
listenAddr := fmt.Sprintf("/ip4/%s/tcp/0", ip)
host, err := compat.NewLibp2(ctx,
secureChannel,
libp2p.ListenAddrStrings(listenAddr),
)
if err != nil {
return fmt.Errorf("failed to instantiate libp2p instance: %w", err)
}
defer host.Close()
// π§ Now we instantiate the ping service.
//
// This adds a stream handler to our Host so it can process inbound pings,
// and the returned PingService instance allows us to perform outbound pings.
ping := ping.NewPingService(host)
// Record our listen addrs.
runenv.RecordMessage("my listen addrs: %v", host.Addrs())
// Obtain our own address info, and use the sync service to publish it to a
// 'peersTopic' topic, where others will read from.
var (
hostId = host.ID()
ai = &peer.AddrInfo{ID: hostId, Addrs: host.Addrs()}
// the peers topic where all instances will advertise their AddrInfo.
peersTopic = sync.NewTopic("peers", new(peer.AddrInfo))
// initialize a slice to store the AddrInfos of all other peers in the run.
peers = make([]*peer.AddrInfo, 0, runenv.TestInstanceCount)
)
// Publish our own.
initCtx.SyncClient.MustPublish(ctx, peersTopic, ai)
// Now subscribe to the peers topic and consume all addresses, storing them
// in the peers slice.
peersCh := make(chan *peer.AddrInfo)
sctx, scancel := context.WithCancel(ctx)
sub := initCtx.SyncClient.MustSubscribe(sctx, peersTopic, peersCh)
// Receive the expected number of AddrInfos.
for len(peers) < cap(peers) {
select {
case ai := <-peersCh:
peers = append(peers, ai)
case err := <-sub.Done():
return err
}
}
scancel() // cancels the Subscription.
// β¨
// β¨ Now we know about all other libp2p hosts in this test.
// β¨
// This is a closure that pings all peers in the test in parallel, and
// records the latency value as a message and as a result datapoint.
pingPeers := func(tag string) error {
g, gctx := errgroup.WithContext(ctx)
for _, ai := range peers {
if ai.ID == hostId {
continue
}
id := ai.ID // capture the ID locally for safe use within the closure.
g.Go(func() error {
// a context for the continuous stream of pings.
pctx, cancel := context.WithCancel(gctx)
defer cancel()
res := <-ping.Ping(pctx, id)
if res.Error != nil {
return res.Error
}
// record a message.
runenv.RecordMessage("ping result (%s) from peer %s: %s", tag, id, res.RTT)
// record a result point; these points will be batch-inserted
// into InfluxDB when the test concludes.
//
// ping-result is the metric name, and round and peer are tags.
point := fmt.Sprintf("ping-result,round=%s,peer=%s", tag, id)
runenv.R().RecordPoint(point, float64(res.RTT.Milliseconds()))
return nil
})
}
return g.Wait()
}
// βοΈ Connect to all other peers.
//
// Note: we sidestep simultaneous connect issues by ONLY connecting to peers
// who published their addresses before us (this is enough to dedup and avoid
// two peers dialling each other at the same time).
//
// We can do this because sync service pubsub is ordered.
for _, ai := range peers {
if ai.ID == hostId {
break
}
runenv.RecordMessage("Dial peer: %s", ai.ID)
if err := host.Connect(ctx, *ai); err != nil {
return err
}
}
runenv.RecordMessage("done dialling my peers")
// Wait for all peers to signal that they're done with the connection phase.
initCtx.SyncClient.MustSignalAndWait(ctx, "connected", runenv.TestInstanceCount)
// π‘ Let's ping all our peers without any traffic shaping rules.
if err := pingPeers("initial"); err != nil {
return err
}
// π Wait for all peers to have finished the initial round.
initCtx.SyncClient.MustSignalAndWait(ctx, "initial", runenv.TestInstanceCount)
// π π π
//
// Here is where the fun begins. We will perform `iterations` rounds of
// randomly altering our network latency, waiting for all other peers to
// do too. We will record our observations for each round.
//
// π π π
// Let's initialize the random seed to the current timestamp + our global sequence number.
// Otherwise all instances will end up generating the same "random" latencies π€¦β
rand.Seed(time.Now().UnixNano() + initCtx.GlobalSeq)
for i := 1; i <= iterations; i++ {
runenv.RecordMessage("β‘οΈ ITERATION ROUND %d", i)
// π€Ή Let's calculate our new latency.
latency := time.Duration(rand.Int31n(int32(maxLatencyMs))) * time.Millisecond
runenv.RecordMessage("(round %d) my latency: %s", i, latency)
// π Let's ask the NetClient to reconfigure our network.
//
// The sidecar will apply the network latency from the outside, and will
// signal on the CallbackState in the sync service. Since we want to wait
// for ALL instances to configure their networks for this round before
// we proceed, we set the CallbackTarget to the total number of instances
// partitipating in this test run. MustConfigureNetwork will block until
// that many signals have been received. We use a unique state ID for
// each round.
//
// Read more about the sidecar: https://docs.testground.ai/concepts-and-architecture/sidecar
initCtx.NetClient.MustConfigureNetwork(ctx, &network.Config{
Network: "default",
Enable: true,
Default: network.LinkShape{Latency: latency},
CallbackState: sync.State(fmt.Sprintf("network-configured-%d", i)),
CallbackTarget: runenv.TestInstanceCount,
})
if err := pingPeers(fmt.Sprintf("iteration-%d", i)); err != nil {
return err
}
// Signal that we're done with this round and wait for others to be
// done before we repeat and switch our latencies, or exit the loop and
// close the host.
doneState := sync.State(fmt.Sprintf("done-%d", i))
initCtx.SyncClient.MustSignalAndWait(ctx, doneState, runenv.TestInstanceCount)
}
_ = host.Close()
return nil
}