Skip to content

Commit

Permalink
roachtests/cdc: add roachtest for kafka throttling
Browse files Browse the repository at this point in the history
This patch adds a new roachtest to observe kafka throttling behaviour. It's a
bit tricky to find the passing condition here since we are expecting throttling
behaviour and would like to assert that the latency is above the target.

Part of: cockroachdb#117618
Release note:none
  • Loading branch information
wenyihu6 committed Mar 25, 2024
1 parent 1834e9d commit 6ee508c
Showing 1 changed file with 49 additions and 7 deletions.
56 changes: 49 additions & 7 deletions pkg/cmd/roachtest/tests/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,10 @@ func (ct *cdcTester) setupSink(args feedArgs) string {
kafka.install(ct.ctx)
kafka.start(ct.ctx, "kafka")

if args.kafkaQuotaBytesPerSec > 0 {
kafka.setProducerQuota(ct.ctx, args.kafkaQuotaBytesPerSec)
}

if args.kafkaChaos {
ct.mon.Go(func(ctx context.Context) error {
period, downTime := 2*time.Minute, 20*time.Second
Expand Down Expand Up @@ -505,13 +509,14 @@ func makeDefaultFeatureFlags() cdcFeatureFlags {
}

type feedArgs struct {
sinkType sinkType
targets []string
opts map[string]string
kafkaChaos bool
assumeRole string
tolerateErrors bool
sinkURIOverride string
sinkType sinkType
targets []string
opts map[string]string
kafkaChaos bool
assumeRole string
tolerateErrors bool
sinkURIOverride string
kafkaQuotaBytesPerSec int
cdcFeatureFlags
}

Expand Down Expand Up @@ -1291,6 +1296,33 @@ func registerCDC(r registry.Registry) {
ct.waitForWorkload()
},
})
r.Add(registry.TestSpec{
Name: "cdc/kafka-quota",
Owner: `cdc`,
Benchmark: true,
Cluster: r.MakeClusterSpec(4, spec.CPU(16)),
Leases: registry.MetamorphicLeases,
CompatibleClouds: registry.AllExceptAWS,
Suites: registry.Suites(registry.Nightly),
RequiresLicense: true,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
ct := newCDCTester(ctx, t, c)
defer ct.Close()

ct.runTPCCWorkload(tpccArgs{warehouses: 100, duration: "10m"})

feed := ct.newChangefeed(feedArgs{
sinkType: kafkaSink,
targets: allTpccTargets,
opts: map[string]string{"initial_scan": "'no'"},
kafkaQuotaBytesPerSec: 1024,
})
ct.runFeedLatencyVerifier(feed, latencyTargets{
steadyLatency: 5 * time.Minute,
})
ct.waitForWorkload()
},
})
r.Add(registry.TestSpec{
Name: "cdc/crdb-chaos",
Owner: `cdc`,
Expand Down Expand Up @@ -2098,6 +2130,16 @@ type kafkaManager struct {
useKafka2 bool
}

func (k kafkaManager) setProducerQuota(ctx context.Context, bytesPerSecond int) {
k.t.Status("setting producer quota to %d bytes per second for all users", bytesPerSecond)
k.c.Run(ctx, option.WithNodes(k.kafkaSinkNode), filepath.Join(k.binDir(), "kafka-configs"),
"--bootstrap-server", "localhost:9092",
"--alter",
"--add-config", fmt.Sprintf("producer_byte_rate=%d", bytesPerSecond),
"--entity-type", "users",
"--entity-default")
}

func (k kafkaManager) basePath() string {
if k.c.IsLocal() {
return `/tmp/confluent`
Expand Down

0 comments on commit 6ee508c

Please sign in to comment.