diff --git a/pkg/cmd/roachtest/tests/cdc.go b/pkg/cmd/roachtest/tests/cdc.go index 15a8371474a9..417dd4686bfe 100644 --- a/pkg/cmd/roachtest/tests/cdc.go +++ b/pkg/cmd/roachtest/tests/cdc.go @@ -248,6 +248,10 @@ func (ct *cdcTester) setupSink(args feedArgs) string { kafka.install(ct.ctx) kafka.start(ct.ctx, "kafka") + if args.kafkaQuotaBytesPerSec > 0 { + kafka.setProducerQuota(ct.ctx, args.kafkaQuotaBytesPerSec) + } + if args.kafkaChaos { ct.mon.Go(func(ctx context.Context) error { period, downTime := 2*time.Minute, 20*time.Second @@ -505,13 +509,14 @@ func makeDefaultFeatureFlags() cdcFeatureFlags { } type feedArgs struct { - sinkType sinkType - targets []string - opts map[string]string - kafkaChaos bool - assumeRole string - tolerateErrors bool - sinkURIOverride string + sinkType sinkType + targets []string + opts map[string]string + kafkaChaos bool + assumeRole string + tolerateErrors bool + sinkURIOverride string + kafkaQuotaBytesPerSec int cdcFeatureFlags } @@ -1291,6 +1296,33 @@ func registerCDC(r registry.Registry) { ct.waitForWorkload() }, }) + r.Add(registry.TestSpec{ + Name: "cdc/kafka-quota", + Owner: `cdc`, + Benchmark: true, + Cluster: r.MakeClusterSpec(4, spec.CPU(16)), + Leases: registry.MetamorphicLeases, + CompatibleClouds: registry.AllExceptAWS, + Suites: registry.Suites(registry.Nightly), + RequiresLicense: true, + Run: func(ctx context.Context, t test.Test, c cluster.Cluster) { + ct := newCDCTester(ctx, t, c) + defer ct.Close() + + ct.runTPCCWorkload(tpccArgs{warehouses: 100, duration: "10m"}) + + feed := ct.newChangefeed(feedArgs{ + sinkType: kafkaSink, + targets: allTpccTargets, + opts: map[string]string{"initial_scan": "'no'"}, + kafkaQuotaBytesPerSec: 1024, + }) + ct.runFeedLatencyVerifier(feed, latencyTargets{ + steadyLatency: 5 * time.Minute, + }) + ct.waitForWorkload() + }, + }) r.Add(registry.TestSpec{ Name: "cdc/crdb-chaos", Owner: `cdc`, @@ -2098,6 +2130,16 @@ type kafkaManager struct { useKafka2 bool } +func (k kafkaManager) setProducerQuota(ctx context.Context, bytesPerSecond int) { + k.t.Status("setting producer quota to %d bytes per second for all users", bytesPerSecond) + k.c.Run(ctx, option.WithNodes(k.kafkaSinkNode), filepath.Join(k.binDir(), "kafka-configs"), + "--bootstrap-server", "localhost:9092", + "--alter", + "--add-config", fmt.Sprintf("producer_byte_rate=%d", bytesPerSecond), + "--entity-type", "users", + "--entity-default") +} + func (k kafkaManager) basePath() string { if k.c.IsLocal() { return `/tmp/confluent`