Skip to content

Commit

Permalink
changefeedccl: use correct channel size in parallelio
Browse files Browse the repository at this point in the history
Previously, the channels used for sending requests and receiving results were
too small. This meant that a caller could block on sending a request even
after acquiring quota. This change ensures that the size of the channels
is large enough so that this blocking does not occur.

Closes: cockroachdb#118463
Closes: cockroachdb#118462
Closes: cockroachdb#118461
Closes: cockroachdb#118460
Closes: cockroachdb#118459
Closes: cockroachdb#118458
Epic: none
  • Loading branch information
jayshrivastava authored and wenyihu6 committed Feb 21, 2024
1 parent b10da3d commit 9140d91
Showing 1 changed file with 11 additions and 5 deletions.
16 changes: 11 additions & 5 deletions pkg/ccl/changefeedccl/parallel_io.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/intsets"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
Expand Down Expand Up @@ -84,15 +85,18 @@ func NewParallelIO(
metrics metricsRecorder,
settings *cluster.Settings,
) *ParallelIO {
quota := uint64(requestQuota.Get(&settings.SV))
wg := ctxgroup.WithContext(ctx)
io := &ParallelIO{
retryOpts: retryOpts,
wg: wg,
metrics: metrics,
ioHandler: handler,
quota: quotapool.NewIntPool("changefeed-parallel-io", uint64(requestQuota.Get(&settings.SV))),
requestCh: make(chan AdmittedIORequest, numWorkers),
resultCh: make(chan IOResult, numWorkers),
quota: quotapool.NewIntPool("changefeed-parallel-io", quota),
// NB: The size of these channels should not be less than the quota. This prevents the producer from
// blocking on sending requests which have been admitted.
requestCh: make(chan AdmittedIORequest, quota),
resultCh: make(chan IOResult, quota),
doneCh: make(chan struct{}),
}

Expand Down Expand Up @@ -161,8 +165,10 @@ var requestQuota = settings.RegisterIntSetting(
"changefeed.parallel_io.request_quota",
"the number of requests which can be admitted into the parallelio"+
" system before blocking the producer",
128,
settings.PositiveInt,
int64(util.ConstantWithMetamorphicTestChoice(
"changefeed.parallel_io.request_quota",
128, 16, 32, 64, 256).(int)),
settings.IntInRange(1, 256),
settings.WithVisibility(settings.Reserved),
)

Expand Down

0 comments on commit 9140d91

Please sign in to comment.