-
Notifications
You must be signed in to change notification settings - Fork 3.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
changefeedccl: use correct channel size in parallelio #118476
Conversation
It looks like your PR touches production code but doesn't add or edit any test code. Did you consider adding tests to your PR? 🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
I stress tested |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good find, thanks for the quick fix! I have a question about memory accounting
Reviewable status: complete! 1 of 0 LGTMs obtained (waiting on @jayshrivastava and @miretskiy)
pkg/ccl/changefeedccl/parallel_io.go
line 97 at r1 (raw file):
// NB: The size of these channels should not be less than the quota. This prevents the producer from // blocking on sending requests which have been admitted. requestCh: make(chan AdmittedIORequest, quota),
Do we do memory accounting for the channel size?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: complete! 1 of 0 LGTMs obtained (waiting on @miretskiy and @rharding6373)
pkg/ccl/changefeedccl/parallel_io.go
line 97 at r1 (raw file):
Previously, rharding6373 (Rachael Harding) wrote…
Do we do memory accounting for the channel size?
We don't need to memory account because the requests are batches which contain cdc events and these cdc events are accounted for. The overhead is considered negligible. They get freed by batch.alloc.Release
after the result comes back:
cockroach/pkg/ccl/changefeedccl/batching_sink.go
Lines 346 to 367 in cb674c2
handleResult := func(result IOResult) { | |
req, err := result.Consume() | |
batch, _ := req.(*sinkBatch) | |
if err != nil { | |
s.handleError(err) | |
} else { | |
s.metrics.recordEmittedBatch( | |
batch.bufferTime, batch.numMessages, batch.mvcc, batch.numKVBytes, sinkDoesNotCompress, | |
) | |
} | |
inflight -= batch.numMessages | |
if (err != nil || inflight == 0) && sinkFlushWaiter != nil { | |
close(sinkFlushWaiter) | |
sinkFlushWaiter = nil | |
} | |
batch.alloc.Release(ctx) | |
freeSinkBatchEvent(batch) | |
} |
Event allocs are created here here
cockroach/pkg/ccl/changefeedccl/kvevent/blocking_buffer.go
Lines 271 to 275 in 3595935
alloc, err := b.AcquireMemory(ctx, n) | |
if err != nil { | |
return err | |
} | |
e.alloc = alloc |
Previously, the channels used for sending requests and receiving results were too small. This meant that a caller could block on sending a request even after acquiring quota. This change ensures that the size of the channels is large enough so that this blocking does not occur. Closes: cockroachdb#118463 Closes: cockroachdb#118462 Closes: cockroachdb#118461 Closes: cockroachdb#118460 Closes: cockroachdb#118459 Closes: cockroachdb#118458 Epic: none
bd7b5ea
to
b5799ba
Compare
bors r+ |
Build succeeded: |
Previously, the channels used for sending requests and receiving results were too small. This meant that a caller could block on sending a request even after acquiring quota. This change ensures that the size of the channels is large enough so that this blocking does not occur.
Closes: #118463
Closes: #118462
Closes: #118461
Closes: #118460
Closes: #118459
Closes: #118458
Epic: none